source: fedd/federation/experiment_control.py @ 9a52a80

Last change on this file since 9a52a80 was 9a52a80, checked in by Ted Faber <faber@…>, 8 years ago

Static routing

  • Property mode set to 100644
File size: 99.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46from deter import topology_to_route_file
47import list_log
48
49
50class nullHandler(logging.Handler):
51    def emit(self, record): pass
52
53fl = logging.getLogger("fedd.experiment_control")
54fl.addHandler(nullHandler())
55
56class experiment_control_local(experiment_control_legacy):
57    """
58    Control of experiments that this system can directly access.
59
60    Includes experiment creation, termination and information dissemination.
61    Thred safe.
62    """
63
64    class ssh_cmd_timeout(RuntimeError): pass
65   
66    call_RequestAccess = service_caller('RequestAccess')
67    call_ReleaseAccess = service_caller('ReleaseAccess')
68    call_StartSegment = service_caller('StartSegment')
69    call_TerminateSegment = service_caller('TerminateSegment')
70    call_InfoSegment = service_caller('InfoSegment')
71    call_OperationSegment = service_caller('OperationSegment')
72    call_Ns2Topdl = service_caller('Ns2Topdl')
73
74    def __init__(self, config=None, auth=None):
75        """
76        Intialize the various attributes, most from the config object
77        """
78
79        def parse_tarfile_list(tf):
80            """
81            Parse a tarfile list from the configuration.  This is a set of
82            paths and tarfiles separated by spaces.
83            """
84            rv = [ ]
85            if tf is not None:
86                tl = tf.split()
87                while len(tl) > 1:
88                    p, t = tl[0:2]
89                    del tl[0:2]
90                    rv.append((p, t))
91            return rv
92
93        self.list_log = list_log.list_log
94
95        self.cert_file = config.get("experiment_control", "cert_file")
96        if self.cert_file:
97            self.cert_pwd = config.get("experiment_control", "cert_pwd")
98        else:
99            self.cert_file = config.get("globals", "cert_file")
100            self.cert_pwd = config.get("globals", "cert_pwd")
101
102        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
103                or config.get("globals", "trusted_certs")
104
105        self.repodir = config.get("experiment_control", "repodir")
106        self.repo_url = config.get("experiment_control", "repo_url", 
107                "https://users.isi.deterlab.net:23235");
108
109        self.exp_stem = "fed-stem"
110        self.log = logging.getLogger("fedd.experiment_control")
111        set_log_level(config, "experiment_control", self.log)
112        self.muxmax = 2
113        self.nthreads = 10
114        self.randomize_experiments = False
115
116        self.splitter = None
117        self.ssh_keygen = "/usr/bin/ssh-keygen"
118        self.ssh_identity_file = None
119
120
121        self.debug = config.getboolean("experiment_control", "create_debug")
122        self.cleanup = not config.getboolean("experiment_control", 
123                "leave_tmpfiles")
124        self.state_filename = config.get("experiment_control", 
125                "experiment_state")
126        self.store_filename = config.get("experiment_control", 
127                "synch_store")
128        self.store_url = config.get("experiment_control", "store_url")
129        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
130        self.grouper_url = config.get("experiment_control", "grouper_url")
131        self.fedkit = parse_tarfile_list(\
132                config.get("experiment_control", "fedkit"))
133        self.gatewaykit = parse_tarfile_list(\
134                config.get("experiment_control", "gatewaykit"))
135
136        dt = config.get("experiment_control", "direct_transit")
137        self.auth_type = config.get('experiment_control', 'auth_type') \
138                or 'legacy'
139        self.auth_dir = config.get('experiment_control', 'auth_dir')
140        self.routing = config.get('experiment_control', 'routing')
141        # XXX: document this!
142        self.info_cache_limit = \
143                config.getint('experiment_control', 'info_cache', 600)
144        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
145        else: self.direct_transit = [ ]
146        # NB for internal master/slave ops, not experiment setup
147        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
148
149        self.overrides = set([])
150        ovr = config.get('experiment_control', 'overrides')
151        if ovr:
152            for o in ovr.split(","):
153                o = o.strip()
154                if o.startswith('fedid:'): o = o[len('fedid:'):]
155                self.overrides.add(fedid(hexstr=o))
156
157        self.state = { }
158        self.state_lock = Lock()
159        self.tclsh = "/usr/local/bin/otclsh"
160        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
161                config.get("experiment_control", "tcl_splitter",
162                        "/usr/testbed/lib/ns2ir/parse.tcl")
163        mapdb_file = config.get("experiment_control", "mapdb")
164        self.trace_file = sys.stderr
165
166        self.def_expstart = \
167                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
168                "/tmp/federate";
169        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
170                "FEDDIR/hosts";
171        self.def_gwstart = \
172                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
173                "/tmp/bridge.log";
174        self.def_mgwstart = \
175                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
176                "/tmp/bridge.log";
177        self.def_gwimage = "FBSD61-TUNNEL2";
178        self.def_gwtype = "pc";
179        self.local_access = { }
180
181        if self.auth_type == 'legacy':
182            if auth:
183                self.auth = auth
184            else:
185                self.log.error( "[access]: No authorizer initialized, " +\
186                        "creating local one.")
187                auth = authorizer()
188            self.get_access = self.legacy_get_access
189        elif self.auth_type == 'abac':
190            self.auth = abac_authorizer(load=self.auth_dir, 
191                    update=os.path.join(self.auth_dir,'update'))
192        else:
193            raise service_error(service_error.internal, 
194                    "Unknown auth_type: %s" % self.auth_type)
195
196        if mapdb_file:
197            self.read_mapdb(mapdb_file)
198        else:
199            self.log.warn("[experiment_control] No testbed map, using defaults")
200            self.tbmap = { 
201                    'deter':'https://users.isi.deterlab.net:23235',
202                    'emulab':'https://users.isi.deterlab.net:23236',
203                    'ucb':'https://users.isi.deterlab.net:23237',
204                    }
205
206        # Grab saved state.  OK to do this w/o locking because it's read only
207        # and only one thread should be in existence that can see self.state at
208        # this point.
209        if self.state_filename:
210            self.read_state()
211
212        if self.store_filename:
213            self.read_store()
214        else:
215            self.log.warning("No saved synch store")
216            self.synch_store = synch_store
217
218        # Dispatch tables
219        self.soap_services = {\
220                'New': soap_handler('New', self.new_experiment),
221                'Create': soap_handler('Create', self.create_experiment),
222                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
223                'Vis': soap_handler('Vis', self.get_vis),
224                'Info': soap_handler('Info', self.get_info),
225                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
226                'Operation': soap_handler('Operation', self.do_operation),
227                'Terminate': soap_handler('Terminate', 
228                    self.terminate_experiment),
229                'GetValue': soap_handler('GetValue', self.GetValue),
230                'SetValue': soap_handler('SetValue', self.SetValue),
231        }
232
233        self.xmlrpc_services = {\
234                'New': xmlrpc_handler('New', self.new_experiment),
235                'Create': xmlrpc_handler('Create', self.create_experiment),
236                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
237                'Vis': xmlrpc_handler('Vis', self.get_vis),
238                'Info': xmlrpc_handler('Info', self.get_info),
239                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
240                'Terminate': xmlrpc_handler('Terminate',
241                    self.terminate_experiment),
242                'Operation': xmlrpc_handler('Operation', self.do_operation),
243                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
244                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
245        }
246
247    # Call while holding self.state_lock
248    def write_state(self):
249        """
250        Write a new copy of experiment state after copying the existing state
251        to a backup.
252
253        State format is a simple pickling of the state dictionary.
254        """
255        if os.access(self.state_filename, os.W_OK):
256            copy_file(self.state_filename, \
257                    "%s.bak" % self.state_filename)
258        try:
259            f = open(self.state_filename, 'w')
260            pickle.dump(self.state, f)
261        except EnvironmentError, e:
262            self.log.error("Can't write file %s: %s" % \
263                    (self.state_filename, e))
264        except pickle.PicklingError, e:
265            self.log.error("Pickling problem: %s" % e)
266        except TypeError, e:
267            self.log.error("Pickling problem (TypeError): %s" % e)
268
269    @staticmethod
270    def get_alloc_ids(exp):
271        """
272        Used by read_store and read state.  This used to be worse.
273        """
274
275        return [ a.allocID for a in exp.get_all_allocations() ]
276
277
278    # Call while holding self.state_lock
279    def read_state(self):
280        """
281        Read a new copy of experiment state.  Old state is overwritten.
282
283        State format is a simple pickling of the state dictionary.
284        """
285       
286        try:
287            f = open(self.state_filename, "r")
288            self.state = pickle.load(f)
289            self.log.debug("[read_state]: Read state from %s" % \
290                    self.state_filename)
291        except EnvironmentError, e:
292            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
293                    % (self.state_filename, e))
294        except pickle.UnpicklingError, e:
295            self.log.warning(("[read_state]: No saved state: " + \
296                    "Unpickling failed: %s") % e)
297       
298        for s in self.state.values():
299            try:
300
301                eid = s.fedid
302                if eid : 
303                    if self.auth_type == 'legacy':
304                        # XXX: legacy
305                        # Give the owner rights to the experiment
306                        #self.auth.set_attribute(s['owner'], eid)
307                        # And holders of the eid as well
308                        self.auth.set_attribute(eid, eid)
309                        # allow overrides to control experiments as well
310                        for o in self.overrides:
311                            self.auth.set_attribute(o, eid)
312                        # Set permissions to allow reading of the software
313                        # repo, if any, as well.
314                        for a in self.get_alloc_ids(s):
315                            self.auth.set_attribute(a, 'repo/%s' % eid)
316                else:
317                    raise KeyError("No experiment id")
318            except KeyError, e:
319                self.log.warning("[read_state]: State ownership or identity " +\
320                        "misformatted in %s: %s" % (self.state_filename, e))
321
322    def read_mapdb(self, file):
323        """
324        Read a simple colon separated list of mappings for the
325        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
326        also adds testbeds to active if they include , active after
327        their name.
328        """
329
330        self.tbmap = { }
331        lineno =0
332        try:
333            f = open(file, "r")
334            for line in f:
335                lineno += 1
336                line = line.strip()
337                if line.startswith('#') or len(line) == 0:
338                    continue
339                try:
340                    label, url = line.split(':', 1)
341                    self.tbmap[label] = url
342                except ValueError, e:
343                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
344                            "map db: %s %s" % (lineno, line, e))
345        except EnvironmentError, e:
346            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
347                    "open %s: %s" % (file, e))
348        else:
349            f.close()
350
351    def read_store(self):
352        try:
353            self.synch_store = synch_store()
354            self.synch_store.load(self.store_filename)
355            self.log.debug("[read_store]: Read store from %s" % \
356                    self.store_filename)
357        except EnvironmentError, e:
358            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
359                    % (self.state_filename, e))
360            self.synch_store = synch_store()
361
362        # Set the initial permissions on data in the store.  XXX: This ad hoc
363        # authorization attribute initialization is getting out of hand.
364        # XXX: legacy
365        if self.auth_type == 'legacy':
366            for k in self.synch_store.all_keys():
367                try:
368                    if k.startswith('fedid:'):
369                        fid = fedid(hexstr=k[6:46])
370                        if self.state.has_key(fid):
371                            for a in self.get_alloc_ids(self.state[fid]):
372                                self.auth.set_attribute(a, k)
373                except ValueError, e:
374                    self.log.warn("Cannot deduce permissions for %s" % k)
375
376
377    def write_store(self):
378        """
379        Write a new copy of synch_store after writing current state
380        to a backup.  We use the internal synch_store pickle method to avoid
381        incinsistent data.
382
383        State format is a simple pickling of the store.
384        """
385        if os.access(self.store_filename, os.W_OK):
386            copy_file(self.store_filename, \
387                    "%s.bak" % self.store_filename)
388        try:
389            self.synch_store.save(self.store_filename)
390        except EnvironmentError, e:
391            self.log.error("Can't write file %s: %s" % \
392                    (self.store_filename, e))
393        except TypeError, e:
394            self.log.error("Pickling problem (TypeError): %s" % e)
395
396    # XXX this may belong somewhere else
397
398    def get_grouper_updates(self, fid):
399        if self.grouper_url is None: return
400        d = tempfile.mkdtemp()
401        try:
402            fstr = "%s" % fid
403            # XXX locking
404            zipname = os.path.join(d, 'grouper.zip')
405            dest = os.path.join(self.auth_dir, 'update')
406            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
407            f = open(zipname, 'w')
408            f.write(resp.read())
409            f.close()
410            zf = zipfile.ZipFile(zipname, 'r')
411            zf.extractall(dest)
412            zf.close()
413        except URLError, e:
414            self.log.error("Cannot connect to grouper: %s" % e)
415            pass
416        finally:
417            shutil.rmtree(d)
418
419
420    def remove_dirs(self, dir):
421        """
422        Remove the directory tree and all files rooted at dir.  Log any errors,
423        but continue.
424        """
425        self.log.debug("[removedirs]: removing %s" % dir)
426        try:
427            for path, dirs, files in os.walk(dir, topdown=False):
428                for f in files:
429                    os.remove(os.path.join(path, f))
430                for d in dirs:
431                    os.rmdir(os.path.join(path, d))
432            os.rmdir(dir)
433        except EnvironmentError, e:
434            self.log.error("Error deleting directory tree in %s" % e);
435
436    @staticmethod
437    def make_temp_certfile(expcert, tmpdir):
438        """
439        make a protected copy of the access certificate so the experiment
440        controller can act as the experiment principal.  mkstemp is the most
441        secure way to do that. The directory should be created by
442        mkdtemp.  Return the filename.
443        """
444        if expcert and tmpdir:
445            try:
446                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
447                f = os.fdopen(certf, 'w')
448                print >> f, expcert
449                f.close()
450            except EnvironmentError, e:
451                raise service_error(service_error.internal, 
452                        "Cannot create temp cert file?")
453            return certfn
454        else:
455            return None
456
457       
458    def generate_ssh_keys(self, dest, type="rsa" ):
459        """
460        Generate a set of keys for the gateways to use to talk.
461
462        Keys are of type type and are stored in the required dest file.
463        """
464        valid_types = ("rsa", "dsa")
465        t = type.lower();
466        if t not in valid_types: raise ValueError
467        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
468
469        try:
470            trace = open("/dev/null", "w")
471        except EnvironmentError:
472            raise service_error(service_error.internal,
473                    "Cannot open /dev/null??");
474
475        # May raise CalledProcessError
476        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
477        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
478        if rv != 0:
479            raise service_error(service_error.internal, 
480                    "Cannot generate nonce ssh keys.  %s return code %d" \
481                            % (self.ssh_keygen, rv))
482
483    def generate_seer_certs(self, destdir):
484        '''
485        Create a SEER ca cert and a node cert in destdir/ca.pem and
486        destdir/node.pem respectively.  These will be distributed throughout
487        the federated experiment.  This routine reports errors via
488        service_errors.
489        '''
490        openssl = '/usr/bin/openssl'
491        # All the filenames and parameters we need for openssl calls below
492        ca_key =os.path.join(destdir, 'ca.key') 
493        ca_pem = os.path.join(destdir, 'ca.pem')
494        node_key =os.path.join(destdir, 'node.key') 
495        node_pem = os.path.join(destdir, 'node.pem')
496        node_req = os.path.join(destdir, 'node.req')
497        node_signed = os.path.join(destdir, 'node.signed')
498        days = '%s' % (365 * 10)
499        serial = '%s' % random.randint(0, 1<<16)
500
501        try:
502            # Sequence of calls to create a CA key, create a ca cert, create a
503            # node key, node signing request, and finally a signed node
504            # certificate.
505            sequence = (
506                    (openssl, 'genrsa', '-out', ca_key, '1024'),
507                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
508                        ca_pem, '-days', days, '-subj', 
509                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
510                    (openssl, 'genrsa', '-out', node_key, '1024'),
511                    (openssl, 'req', '-new', '-key', node_key, '-out', 
512                        node_req, '-days', days, '-subj', 
513                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
514                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
515                        '-set_serial', serial, '-req', '-in', node_req, 
516                        '-out', node_signed, '-days', days),
517                )
518            # Do all that stuff; bail if there's an error, and push all the
519            # output to dev/null.
520            for cmd in sequence:
521                trace = open("/dev/null", "w")
522                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523                if rv != 0:
524                    raise service_error(service_error.internal, 
525                            "Cannot generate SEER certs.  %s return code %d" \
526                                    % (' '.join(cmd), rv))
527            # Concatinate the node key and signed certificate into node.pem
528            f = open(node_pem, 'w')
529            for comp in (node_signed, node_key):
530                g = open(comp, 'r')
531                f.write(g.read())
532                g.close()
533            f.close()
534
535            # Throw out intermediaries.
536            for fn in (ca_key, node_key, node_req, node_signed):
537                os.unlink(fn)
538
539        except EnvironmentError, e:
540            # Any difficulties with the file system wind up here
541            raise service_error(service_error.internal,
542                    "File error on  %s while creating SEER certs: %s" % \
543                            (e.filename, e.strerror))
544
545
546
547    def gentopo(self, str):
548        """
549        Generate the topology data structure from the splitter's XML
550        representation of it.
551
552        The topology XML looks like:
553            <experiment>
554                <nodes>
555                    <node><vname></vname><ips>ip1:ip2</ips></node>
556                </nodes>
557                <lans>
558                    <lan>
559                        <vname></vname><vnode></vnode><ip></ip>
560                        <bandwidth></bandwidth><member>node:port</member>
561                    </lan>
562                </lans>
563        """
564        class topo_parse:
565            """
566            Parse the topology XML and create the dats structure.
567            """
568            def __init__(self):
569                # Typing of the subelements for data conversion
570                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
571                self.int_subelements = ( 'bandwidth',)
572                self.float_subelements = ( 'delay',)
573                # The final data structure
574                self.nodes = [ ]
575                self.lans =  [ ]
576                self.topo = { \
577                        'node': self.nodes,\
578                        'lan' : self.lans,\
579                    }
580                self.element = { }  # Current element being created
581                self.chars = ""     # Last text seen
582
583            def end_element(self, name):
584                # After each sub element the contents is added to the current
585                # element or to the appropriate list.
586                if name == 'node':
587                    self.nodes.append(self.element)
588                    self.element = { }
589                elif name == 'lan':
590                    self.lans.append(self.element)
591                    self.element = { }
592                elif name in self.str_subelements:
593                    self.element[name] = self.chars
594                    self.chars = ""
595                elif name in self.int_subelements:
596                    self.element[name] = int(self.chars)
597                    self.chars = ""
598                elif name in self.float_subelements:
599                    self.element[name] = float(self.chars)
600                    self.chars = ""
601
602            def found_chars(self, data):
603                self.chars += data.rstrip()
604
605
606        tp = topo_parse();
607        parser = xml.parsers.expat.ParserCreate()
608        parser.EndElementHandler = tp.end_element
609        parser.CharacterDataHandler = tp.found_chars
610
611        parser.Parse(str)
612
613        return tp.topo
614       
615
616    def genviz(self, topo):
617        """
618        Generate the visualization the virtual topology
619        """
620
621        neato = "/usr/local/bin/neato"
622        # These are used to parse neato output and to create the visualization
623        # file.
624        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
625        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
626                "%s</type></node>"
627
628        try:
629            # Node names
630            nodes = [ n['vname'] for n in topo['node'] ]
631            topo_lans = topo['lan']
632        except KeyError, e:
633            raise service_error(service_error.internal, "Bad topology: %s" %e)
634
635        lans = { }
636        links = { }
637
638        # Walk through the virtual topology, organizing the connections into
639        # 2-node connections (links) and more-than-2-node connections (lans).
640        # When a lan is created, it's added to the list of nodes (there's a
641        # node in the visualization for the lan).
642        for l in topo_lans:
643            if links.has_key(l['vname']):
644                if len(links[l['vname']]) < 2:
645                    links[l['vname']].append(l['vnode'])
646                else:
647                    nodes.append(l['vname'])
648                    lans[l['vname']] = links[l['vname']]
649                    del links[l['vname']]
650                    lans[l['vname']].append(l['vnode'])
651            elif lans.has_key(l['vname']):
652                lans[l['vname']].append(l['vnode'])
653            else:
654                links[l['vname']] = [ l['vnode'] ]
655
656
657        # Open up a temporary file for dot to turn into a visualization
658        try:
659            df, dotname = tempfile.mkstemp()
660            dotfile = os.fdopen(df, 'w')
661        except EnvironmentError:
662            raise service_error(service_error.internal,
663                    "Failed to open file in genviz")
664
665        try:
666            dnull = open('/dev/null', 'w')
667        except EnvironmentError:
668            service_error(service_error.internal,
669                    "Failed to open /dev/null in genviz")
670
671        # Generate a dot/neato input file from the links, nodes and lans
672        try:
673            print >>dotfile, "graph G {"
674            for n in nodes:
675                print >>dotfile, '\t"%s"' % n
676            for l in links.keys():
677                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
678            for l in lans.keys():
679                for n in lans[l]:
680                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
681            print >>dotfile, "}"
682            dotfile.close()
683        except TypeError:
684            raise service_error(service_error.internal,
685                    "Single endpoint link in vtopo")
686        except EnvironmentError:
687            raise service_error(service_error.internal, "Cannot write dot file")
688
689        # Use dot to create a visualization
690        try:
691            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
692                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
693                stderr=dnull, close_fds=True)
694        except EnvironmentError:
695            raise service_error(service_error.internal, 
696                    "Cannot generate visualization: is graphviz available?")
697        dnull.close()
698
699        # Translate dot to vis format
700        vis_nodes = [ ]
701        vis = { 'node': vis_nodes }
702        for line in dot.stdout:
703            m = vis_re.match(line)
704            if m:
705                vn = m.group(1)
706                vis_node = {'name': vn, \
707                        'x': float(m.group(2)),\
708                        'y' : float(m.group(3)),\
709                    }
710                if vn in links.keys() or vn in lans.keys():
711                    vis_node['type'] = 'lan'
712                else:
713                    vis_node['type'] = 'node'
714                vis_nodes.append(vis_node)
715        rv = dot.wait()
716
717        os.remove(dotname)
718        # XXX: graphviz seems to use low return codes for warnings, like
719        # "couldn't find font"
720        if rv < 2 : return vis
721        else: return None
722
723
724    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
725            cert_pwd=None):
726        """
727        Release access to testbed through fedd
728        """
729
730        if not uri and tbmap:
731            uri = tbmap.get(tb, None)
732        if not uri:
733            raise service_error(service_error.server_config, 
734                    "Unknown testbed: %s" % tb)
735
736        if self.local_access.has_key(uri):
737            resp = self.local_access[uri].ReleaseAccess(\
738                    { 'ReleaseAccessRequestBody' : 
739                        {'allocID': {'fedid': aid}},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.subs = { }
783            self.tb = { }
784            self.proof = None
785
786        def make_map(self, resp):
787            if 'segmentdescription' not in resp  or \
788                    'topdldescription' not in resp['segmentdescription']:
789                self.log.warn('No topology returned from startsegment')
790                return 
791
792            top = topdl.Topology(
793                    **resp['segmentdescription']['topdldescription'])
794
795            for e in top.elements:
796                if isinstance(e, topdl.Computer):
797                    self.node[e.name] = e
798                elif isinstance(e, topdl.Testbed):
799                    self.tb[e.uri] = e
800            for s in top.substrates:
801                self.subs[s.name] = s
802
803        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
804            req = {
805                    'allocID': { 'fedid' : aid }, 
806                    'segmentdescription': { 
807                        'topdldescription': topo.to_dict(),
808                    },
809                }
810
811            if connInfo:
812                req['connection'] = connInfo
813
814            import_svcs = [ s for m in masters.values() \
815                    for s in m if self.testbed in s.importers]
816
817            if import_svcs or self.testbed in masters:
818                req['service'] = []
819
820            for s in import_svcs:
821                for r in s.reqs:
822                    sr = copy.deepcopy(r)
823                    sr['visibility'] = 'import';
824                    req['service'].append(sr)
825
826            for s in masters.get(self.testbed, []):
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'export';
830                    req['service'].append(sr)
831
832            if attrs:
833                req['fedAttr'] = attrs
834
835            try:
836                self.log.debug("Calling StartSegment at %s " % uri)
837                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
838                        self.trusted_certs)
839                if r.has_key('StartSegmentResponseBody'):
840                    lval = r['StartSegmentResponseBody'].get('allocationLog',
841                            None)
842                    if lval and self.log_collector:
843                        for line in  lval.splitlines(True):
844                            self.log_collector.write(line)
845                    self.make_map(r['StartSegmentResponseBody'])
846                    if 'proof' in r: self.proof = r['proof']
847                    self.response = r
848                else:
849                    raise service_error(service_error.internal, 
850                            "Bad response!?: %s" %r)
851                return True
852            except service_error, e:
853                self.log.error("Start segment failed on %s: %s" % \
854                        (self.testbed, e))
855                return False
856
857
858
859    class terminate_segment:
860        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
861                cert_pwd=None, trusted_certs=None, caller=None):
862            self.log = log
863            self.debug = debug
864            self.cert_file = cert_file
865            self.cert_pwd = cert_pwd
866            self.trusted_certs = None
867            self.caller = caller
868            self.testbed = testbed
869
870        def __call__(self, uri, aid ):
871            req = {
872                    'allocID': {'fedid': aid }, 
873                }
874            self.log.info("Calling terminate segment")
875            try:
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                self.log.info("Terminate segment succeeded")
879                return True
880            except service_error, e:
881                self.log.error("Terminate segment failed on %s: %s" % \
882                        (self.testbed, e))
883                return False
884
885    class info_segment(start_segment):
886        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
887                cert_pwd=None, trusted_certs=None, caller=None,
888                log_collector=None):
889            experiment_control_local.start_segment.__init__(self, debug, 
890                    log, testbed, cert_file, cert_pwd, trusted_certs, 
891                    caller, log_collector)
892
893        def __call__(self, uri, aid):
894            req = { 'allocID': { 'fedid' : aid } }
895
896            try:
897                self.log.debug("Calling InfoSegment at %s " % uri)
898                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
899                        self.trusted_certs)
900                if r.has_key('InfoSegmentResponseBody'):
901                    self.make_map(r['InfoSegmentResponseBody'])
902                    if 'proof' in r: self.proof = r['proof']
903                    self.response = r
904                else:
905                    raise service_error(service_error.internal, 
906                            "Bad response!?: %s" %r)
907                return True
908            except service_error, e:
909                self.log.error("Info segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
912
913    class operation_segment:
914        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
915                cert_pwd=None, trusted_certs=None, caller=None,
916                log_collector=None):
917            self.log = log
918            self.debug = debug
919            self.cert_file = cert_file
920            self.cert_pwd = cert_pwd
921            self.trusted_certs = None
922            self.caller = caller
923            self.testbed = testbed
924            self.status = None
925
926        def __call__(self, uri, aid, op, targets, params):
927            req = { 
928                    'allocID': { 'fedid' : aid },
929                    'operation': op,
930                    'target': targets,
931                    }
932            if params: req['parameter'] = params
933
934
935            try:
936                self.log.debug("Calling OperationSegment at %s " % uri)
937                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
938                        self.trusted_certs)
939                if 'OperationSegmentResponseBody' in r:
940                    r = r['OperationSegmentResponseBody']
941                    if 'status' in r:
942                        self.status = r['status']
943                else:
944                    raise service_error(service_error.internal, 
945                            "Bad response!?: %s" %r)
946                return True
947            except service_error, e:
948                self.log.error("Operation segment failed on %s: %s" % \
949                        (self.testbed, e))
950                return False
951
952    def annotate_topology(self, top, data):
953        # These routines do various parts of the annotation
954        def add_new_names(nl, l):
955            """ add any names in nl to the list in l """
956            for n in nl:
957                if n not in l: l.append(n)
958       
959        def merge_services(ne, e):
960            for ns in ne.service:
961                # NB: the else is on the for
962                for s in e.service:
963                    if ns.name == s.name:
964                        s.importer = ns.importer
965                        s.param = ns.param
966                        s.description = ns.description
967                        s.status = ns.status
968                        break
969                else:
970                    e.service.append(ns)
971       
972        def merge_oses(ne, e):
973            """
974            Merge the operating system entries of ne into e
975            """
976            for nos in ne.os:
977                # NB: the else is on the for
978                for os in e.os:
979                    if nos.name == os.name:
980                        os.version = nos.version
981                        os.version = nos.distribution
982                        os.version = nos.distributionversion
983                        for a in nos.attribute:
984                            if os.get_attribute(a.attribute):
985                                os.remove_attribute(a.attribute)
986                            os.set_attribute(a.attribute, a.value)
987                        break
988                else:
989                    # If both nodes have one OS, this is a replacement
990                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
991                    else: e.os.append(nos)
992
993        # Annotate the topology with embedding info
994        for e in top.elements:
995            if isinstance(e, topdl.Computer):
996                for s in data:
997                    ne = s.node.get(e.name, None)
998                    if ne is not None:
999                        add_new_names(ne.localname, e.localname)
1000                        e.status = ne.status
1001                        merge_services(ne, e)
1002                        add_new_names(ne.operation, e.operation)
1003                        if ne.os: merge_oses(ne, e)
1004                        break
1005            elif isinstance(e,topdl.Testbed):
1006                for s in data:
1007                    ne = s.tb.get(e.uri, None)
1008                    if ne is not None:
1009                        add_new_names(ne.localname, e.localname)
1010                        add_new_names(ne.operation, e.operation)
1011                        merge_services(ne, e)
1012                        for a in ne.attribute:
1013                            e.set_attribute(a.attribute, a.value)
1014        # Annotate substrates
1015        for s in top.substrates:
1016            for d in data:
1017                ss = d.subs.get(s.name, None)
1018                if ss is not None:
1019                    if ss.capacity is not None:
1020                        s.capacity = ss.capacity
1021                    if s.latency is not None:
1022                        s.latency = ss.latency
1023
1024
1025
1026    def allocate_resources(self, allocated, masters, eid, expid, 
1027            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1028            attrs=None, connInfo={}, tbmap=None, expcert=None):
1029
1030        started = { }           # Testbeds where a sub-experiment started
1031                                # successfully
1032
1033        # XXX
1034        fail_soft = False
1035
1036        if tbmap is None: tbmap = { }
1037
1038        log = alloc_log or self.log
1039
1040        tp = thread_pool(self.nthreads)
1041        threads = [ ]
1042        starters = [ ]
1043
1044        if expcert:
1045            cert = expcert
1046            pw = None
1047        else:
1048            cert = self.cert_file
1049            pw = self.cert_pwd
1050
1051        for tb in allocated.keys():
1052            # Create and start a thread to start the segment, and save it
1053            # to get the return value later
1054            tb_attrs = copy.copy(attrs)
1055            tp.wait_for_slot()
1056            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1057            base, suffix = split_testbed(tb)
1058            if suffix:
1059                tb_attrs.append({'attribute': 'experiment_name', 
1060                    'value': "%s-%s" % (eid, suffix)})
1061            else:
1062                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1063            if not uri:
1064                raise service_error(service_error.internal, 
1065                        "Unknown testbed %s !?" % tb)
1066
1067            aid = tbparams[tb].allocID
1068            if not aid:
1069                raise service_error(service_error.internal, 
1070                        "No alloc id for testbed %s !?" % tb)
1071
1072            s = self.start_segment(log=log, debug=self.debug,
1073                    testbed=tb, cert_file=cert,
1074                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1075                    caller=self.call_StartSegment,
1076                    log_collector=log_collector)
1077            starters.append(s)
1078            t  = pooled_thread(\
1079                    target=s, name=tb,
1080                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1081                    pdata=tp, trace_file=self.trace_file)
1082            threads.append(t)
1083            t.start()
1084
1085        # Wait until all finish (keep pinging the log, though)
1086        mins = 0
1087        revoked = False
1088        while not tp.wait_for_all_done(60.0):
1089            mins += 1
1090            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1091                    % mins)
1092            if not revoked and \
1093                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1094                # a testbed has failed.  Revoke this experiment's
1095                # synchronizarion values so that sub experiments will not
1096                # deadlock waiting for synchronization that will never happen
1097                self.log.info("A subexperiment has failed to swap in, " + \
1098                        "revoking synch keys")
1099                var_key = "fedid:%s" % expid
1100                for k in self.synch_store.all_keys():
1101                    if len(k) > 45 and k[0:46] == var_key:
1102                        self.synch_store.revoke_key(k)
1103                revoked = True
1104
1105        failed = [ t.getName() for t in threads if not t.rv ]
1106        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1107
1108        # If one failed clean up, unless fail_soft is set
1109        if failed:
1110            if not fail_soft:
1111                tp.clear()
1112                for tb in succeeded:
1113                    # Create and start a thread to stop the segment
1114                    tp.wait_for_slot()
1115                    uri = tbparams[tb].uri
1116                    t  = pooled_thread(\
1117                            target=self.terminate_segment(log=log,
1118                                testbed=tb,
1119                                cert_file=cert, 
1120                                cert_pwd=pw,
1121                                trusted_certs=self.trusted_certs,
1122                                caller=self.call_TerminateSegment),
1123                            args=(uri, tbparams[tb].allocID),
1124                            name=tb,
1125                            pdata=tp, trace_file=self.trace_file)
1126                    t.start()
1127                # Wait until all finish (if any are being stopped)
1128                if succeeded:
1129                    tp.wait_for_all_done()
1130
1131                # release the allocations
1132                for tb in tbparams.keys():
1133                    try:
1134                        self.release_access(tb, tbparams[tb].allocID, 
1135                                tbmap=tbmap, uri=tbparams[tb].uri,
1136                                cert_file=cert, cert_pwd=pw)
1137                    except service_error, e:
1138                        self.log.warn("Error releasing access: %s" % e.desc)
1139                # Remove the placeholder
1140                self.state_lock.acquire()
1141                self.state[eid].status = 'failed'
1142                self.state[eid].updated()
1143                if self.state_filename: self.write_state()
1144                self.state_lock.release()
1145                # Remove the repo dir
1146                self.remove_dirs("%s/%s" %(self.repodir, expid))
1147                # Walk up tmpdir, deleting as we go
1148                if self.cleanup:
1149                    self.remove_dirs(tmpdir)
1150                else:
1151                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1152
1153
1154                log.error("Swap in failed on %s" % ",".join(failed))
1155                return
1156        else:
1157            # Walk through the successes and gather the proofs
1158            proofs = { }
1159            for s in starters:
1160                if s.proof:
1161                    proofs[s.testbed] = s.proof
1162            self.annotate_topology(top, starters)
1163            log.info("[start_segment]: Experiment %s active" % eid)
1164
1165
1166        # Walk up tmpdir, deleting as we go
1167        if self.cleanup:
1168            self.remove_dirs(tmpdir)
1169        else:
1170            log.debug("[start_experiment]: not removing %s" % tmpdir)
1171
1172        # Insert the experiment into our state and update the disk copy.
1173        self.state_lock.acquire()
1174        self.state[expid].status = 'active'
1175        self.state[eid] = self.state[expid]
1176        self.state[eid].top = top
1177        self.state[eid].updated()
1178        # Append startup proofs
1179        for f in self.state[eid].get_all_allocations():
1180            if f.tb in proofs:
1181                f.proof.append(proofs[f.tb])
1182
1183        if self.state_filename: self.write_state()
1184        self.state_lock.release()
1185        return
1186
1187
1188    def add_kit(self, e, kit):
1189        """
1190        Add a Software object created from the list of (install, location)
1191        tuples passed as kit  to the software attribute of an object e.  We
1192        do this enough to break out the code, but it's kind of a hack to
1193        avoid changing the old tuple rep.
1194        """
1195
1196        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1197
1198        if isinstance(e.software, list): e.software.extend(s)
1199        else: e.software = s
1200
1201    def append_experiment_authorization(self, expid, attrs, 
1202            need_state_lock=True):
1203        """
1204        Append the authorization information to system state
1205        """
1206
1207        for p, a in attrs:
1208            if p and a:
1209                # Improperly configured overrides can add bad principals.
1210                self.auth.set_attribute(p, a)
1211            else:
1212                self.log.debug("Bad append experiment_authorization %s %s" \
1213                        % (p, a))
1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
1236        self.auth.save()
1237
1238
1239    def create_experiment_state(self, fid, req, expid, expcert,
1240            state='starting'):
1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
1247        failed, it is overwritten.  Otherwise new letters are added until a
1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
1268                    status = self.state[eid].status
1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
1286            self.state[expid] = self.state[eid]
1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
1301            self.state[expid] = self.state[eid]
1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
1316        Add an ip4_address attribute to all the hosts in the topology that have
1317        not already been assigned one by the user, based on the shared
1318        substrates on which they sit.  An /etc/hosts file is also created and
1319        returned as a list of hostfiles entries.  We also return the allocator,
1320        because we may need to allocate IPs to portals
1321        (specifically DRAGON portals).
1322        """
1323        subs = sorted(top.substrates, 
1324                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1325                reverse=True)
1326        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1327        ifs = { }
1328        hosts = [ ]
1329
1330        assigned = { }
1331        assigned_subs = set()
1332        # Look for IP networks that were already assigned to the experiment by
1333        # the user.  We assume that users are smart in making that assignment,
1334        # but we check them later.
1335        for e in top.elements:
1336            if not isinstance(e, topdl.Computer): continue
1337            for inf in e.interface:
1338                a = inf.get_attribute('ip4_address')
1339                if not a: continue
1340                for s in inf.substrate:
1341                    assigned_subs.add(s)
1342                a = ip_addr(a)
1343                n = ip_addr(inf.get_attribute('ip4_netmask'))
1344
1345                sz = 0x100000000 - long(n)
1346                net = (long(a) & long(n)) & 0xffffffff
1347                if net in assigned:
1348                    if sz > assigned[net]: assigned[net] = sz
1349                assigned[net] = sz
1350
1351        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1352        assigned[long(ip_addr('10.0.23.0'))] = 256
1353        # end of hack
1354
1355        # Walk all substrates, smallest to largest, assigning IP addresses to
1356        # the unassigned and checking the assigned.  Build an /etc/hosts file
1357        # as well.
1358        for idx, s in enumerate(subs):
1359            if s.name not in assigned_subs:
1360                net_size = len(s.interfaces)+2
1361
1362                # Get an ip allocation for this substrate.  Make sure the
1363                # allocation does not collide with any user allocations.
1364                ok = False
1365                while not ok:
1366                    a = ips.allocate(net_size)
1367                    if not a:
1368                        raise service_error(service_error.req, 
1369                                "Cannot allocate IP addresses")
1370                    base, num = a
1371                    if num < net_size: 
1372                        raise service_error(service_error.internal,
1373                                "Allocator returned wrong number of IPs??")
1374                    # Check for overlap.  An assigned network could contain an
1375                    # endpoint of the new allocation or the new allocation
1376                    # could contain an endpoint of an assigned network.
1377                    # NB the else is attached to the for - if the loop is not
1378                    # exited by a break, ok = True.
1379                    for n, sz in assigned.items():
1380                        if base >= n and base < n + sz:
1381                            ok = False
1382                            break
1383                        if base + num > n and base + num  < n + sz:
1384                            ok = False
1385                            break
1386                        if n >= base and n < base + num:
1387                            ok = False
1388                            break
1389                        if n+sz > base and n+sz < base + num:
1390                            ok = False
1391                            break
1392                    else:
1393                        ok = True
1394                           
1395                mask = ips.min_alloc
1396                while mask < net_size:
1397                    mask *= 2
1398
1399                netmask = ((2**32-1) ^ (mask-1))
1400            else:
1401                base = 0
1402
1403            # Add the attributes and build hosts
1404            base += 1
1405            for i in s.interfaces:
1406                # Add address and masks to interfaces on unassigned substrates
1407                if s.name not in assigned_subs:
1408                    i.attribute.append(
1409                            topdl.Attribute('ip4_address', 
1410                                "%s" % ip_addr(base)))
1411                    i.attribute.append(
1412                            topdl.Attribute('ip4_netmask', 
1413                                "%s" % ip_addr(int(netmask))))
1414
1415                # Make /etc/hosts entries
1416                addr = i.get_attribute('ip4_address')
1417                if addr is None:
1418                    raise service_error(service_error.req, 
1419                            "Partially assigned substrate %s" %s.name)
1420                hname = i.element.name
1421                if hname in ifs:
1422                    hosts.append("%s\t%s-%s %s-%d" % \
1423                            (addr, hname, s.name, hname, ifs[hname]))
1424                else:
1425                    # First IP allocated it the default ip according to hosts,
1426                    # so the extra alias is here.
1427                    ifs[hname] = 0
1428                    hosts.append("%s\t%s-%s %s-%d %s" % \
1429                            (addr, hname, s.name, hname, ifs[hname], hname))
1430
1431                ifs[hname] += 1
1432                base += 1
1433        return hosts, ips
1434
1435    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1436            tbparam, masters, tbmap, expid=None, expcert=None):
1437        for tb in testbeds:
1438            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1439                    expcert)
1440            allocated[tb] = 1
1441
1442    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1443            expcert=None):
1444        """
1445        Get access to testbed through fedd and set the parameters for that tb
1446        """
1447        def get_export_project(svcs):
1448            """
1449            Look through for the list of federated_service for this testbed
1450            objects for a project_export service, and extract the project
1451            parameter.
1452            """
1453
1454            pe = [s for s in svcs if s.name=='project_export']
1455            if len(pe) == 1:
1456                return pe[0].params.get('project', None)
1457            elif len(pe) == 0:
1458                return None
1459            else:
1460                raise service_error(service_error.req,
1461                        "More than one project export is not supported")
1462
1463        def add_services(svcs, type, slist, keys):
1464            """
1465            Add the given services to slist.  type is import or export.  Also
1466            add a mapping entry from the assigned id to the original service
1467            record.
1468            """
1469            for i, s in enumerate(svcs):
1470                idx = '%s%d' % (type, i)
1471                keys[idx] = s
1472                sr = {'id': idx, 'name': s.name, 'visibility': type }
1473                if s.params:
1474                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1475                            for k, v in s.params.items()]
1476                slist.append(sr)
1477
1478        uri = tbmap.get(testbed_base(tb), None)
1479        if not uri:
1480            raise service_error(service_error.server_config, 
1481                    "Unknown testbed: %s" % tb)
1482
1483        export_svcs = masters.get(tb,[])
1484        import_svcs = [ s for m in masters.values() \
1485                for s in m \
1486                    if tb in s.importers ]
1487
1488        export_project = get_export_project(export_svcs)
1489        # Compose the credential list so that IDs come before attributes
1490        creds = set()
1491        keys = set()
1492        certs = self.auth.get_creds_for_principal(fid)
1493        # Append credenials about this experiment controller - e.g. that it is
1494        # trusted.
1495        certs.update(self.auth.get_creds_for_principal(
1496            fedid(file=self.cert_file)))
1497        if expid:
1498            certs.update(self.auth.get_creds_for_principal(expid))
1499        for c in certs:
1500            keys.add(c.issuer_cert())
1501            creds.add(c.attribute_cert())
1502        creds = list(keys) + list(creds)
1503
1504        if expcert: cert, pw = expcert, None
1505        else: cert, pw = self.cert_file, self.cert_pw
1506
1507        # Request credentials
1508        req = {
1509                'abac_credential': creds,
1510            }
1511        # Make the service request from the services we're importing and
1512        # exporting.  Keep track of the export request ids so we can
1513        # collect the resulting info from the access response.
1514        e_keys = { }
1515        if import_svcs or export_svcs:
1516            slist = []
1517            add_services(import_svcs, 'import', slist, e_keys)
1518            add_services(export_svcs, 'export', slist, e_keys)
1519            req['service'] = slist
1520
1521        if self.local_access.has_key(uri):
1522            # Local access call
1523            req = { 'RequestAccessRequestBody' : req }
1524            r = self.local_access[uri].RequestAccess(req, 
1525                    fedid(file=self.cert_file))
1526            r = { 'RequestAccessResponseBody' : r }
1527        else:
1528            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1529
1530        if r.has_key('RequestAccessResponseBody'):
1531            # Through to here we have a valid response, not a fault.
1532            # Access denied is a fault, so something better or worse than
1533            # access denied has happened.
1534            r = r['RequestAccessResponseBody']
1535            self.log.debug("[get_access] Access granted")
1536        else:
1537            raise service_error(service_error.protocol,
1538                        "Bad proxy response")
1539        if 'proof' not in r:
1540            raise service_error(service_error.protocol,
1541                        "Bad access response (no access proof)")
1542
1543        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1544                tb=tb, uri=uri, proof=[r['proof']], 
1545                services=masters.get(tb, None))
1546
1547        # Collect the responses corresponding to the services this testbed
1548        # exports.  These will be the service requests that we will include in
1549        # the start segment requests (with appropriate visibility values) to
1550        # import and export the segments.
1551        for s in r.get('service', []):
1552            id = s.get('id', None)
1553            # Note that this attaches the response to the object in the masters
1554            # data structure.  (The e_keys index disappears when this fcn
1555            # returns)
1556            if id and id in e_keys:
1557                e_keys[id].reqs.append(s)
1558
1559        # Add attributes to parameter space.  We don't allow attributes to
1560        # overlay any parameters already installed.
1561        for a in r.get('fedAttr', []):
1562            try:
1563                if a['attribute']:
1564                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1565            except KeyError:
1566                self.log.error("Bad attribute in response: %s" % a)
1567
1568
1569    def split_topology(self, top, topo, testbeds):
1570        """
1571        Create the sub-topologies that are needed for experiment instantiation.
1572        """
1573        for tb in testbeds:
1574            topo[tb] = top.clone()
1575            # copy in for loop allows deletions from the original
1576            for e in [ e for e in topo[tb].elements]:
1577                etb = e.get_attribute('testbed')
1578                # NB: elements without a testbed attribute won't appear in any
1579                # sub topologies. 
1580                if not etb or etb != tb:
1581                    for i in e.interface:
1582                        for s in i.subs:
1583                            try:
1584                                s.interfaces.remove(i)
1585                            except ValueError:
1586                                raise service_error(service_error.internal,
1587                                        "Can't remove interface??")
1588                    topo[tb].elements.remove(e)
1589            topo[tb].make_indices()
1590
1591    def confirm_software(self, top):
1592        """
1593        Make sure that the software to be loaded in the topo is all available
1594        before we begin making access requests, etc.  This is a subset of
1595        wrangle_software.
1596        """
1597        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1598        pkgs.update([x.location for e in top.elements for x in e.software])
1599
1600        for pkg in pkgs:
1601            loc = pkg
1602
1603            scheme, host, path = urlparse(loc)[0:3]
1604            dest = os.path.basename(path)
1605            if not scheme:
1606                if not loc.startswith('/'):
1607                    loc = "/%s" % loc
1608                loc = "file://%s" %loc
1609            # NB: if scheme was found, loc == pkg
1610            try:
1611                u = urlopen(loc)
1612                u.close()
1613            except Exception, e:
1614                raise service_error(service_error.req, 
1615                        "Cannot open %s: %s" % (loc, e))
1616        return True
1617
1618    def wrangle_software(self, expid, top, topo, tbparams):
1619        """
1620        Copy software out to the repository directory, allocate permissions and
1621        rewrite the segment topologies to look for the software in local
1622        places.
1623        """
1624
1625        # Copy the rpms and tarfiles to a distribution directory from
1626        # which the federants can retrieve them
1627        linkpath = "%s/software" %  expid
1628        softdir ="%s/%s" % ( self.repodir, linkpath)
1629        softmap = { }
1630
1631        # self.fedkit and self.gateway kit are lists of tuples of
1632        # (install_location, download_location) this extracts the download
1633        # locations.
1634        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1635        pkgs.update([x.location for e in top.elements for x in e.software])
1636        try:
1637            os.makedirs(softdir)
1638        except EnvironmentError, e:
1639            raise service_error(
1640                    "Cannot create software directory: %s" % e)
1641        # The actual copying.  Everything's converted into a url for copying.
1642        auth_attrs = set()
1643        for pkg in pkgs:
1644            loc = pkg
1645
1646            scheme, host, path = urlparse(loc)[0:3]
1647            dest = os.path.basename(path)
1648            if not scheme:
1649                if not loc.startswith('/'):
1650                    loc = "/%s" % loc
1651                loc = "file://%s" %loc
1652            # NB: if scheme was found, loc == pkg
1653            try:
1654                u = urlopen(loc)
1655            except Exception, e:
1656                raise service_error(service_error.req, 
1657                        "Cannot open %s: %s" % (loc, e))
1658            try:
1659                f = open("%s/%s" % (softdir, dest) , "w")
1660                self.log.debug("Writing %s/%s" % (softdir,dest) )
1661                data = u.read(4096)
1662                while data:
1663                    f.write(data)
1664                    data = u.read(4096)
1665                f.close()
1666                u.close()
1667            except Exception, e:
1668                raise service_error(service_error.internal,
1669                        "Could not copy %s: %s" % (loc, e))
1670            path = re.sub("/tmp", "", linkpath)
1671            # XXX
1672            softmap[pkg] = \
1673                    "%s/%s/%s" %\
1674                    ( self.repo_url, path, dest)
1675
1676            # Allow the individual segments to access the software by assigning
1677            # an attribute to each testbed allocation that encodes the data to
1678            # be released.  This expression collects the data for each run of
1679            # the loop.
1680            auth_attrs.update([
1681                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1682                        for tb in tbparams.keys()])
1683
1684        self.append_experiment_authorization(expid, auth_attrs)
1685
1686        # Convert the software locations in the segments into the local
1687        # copies on this host
1688        for soft in [ s for tb in topo.values() \
1689                for e in tb.elements \
1690                    if getattr(e, 'software', False) \
1691                        for s in e.software ]:
1692            if softmap.has_key(soft.location):
1693                soft.location = softmap[soft.location]
1694
1695
1696    def new_experiment(self, req, fid):
1697        """
1698        The external interface to empty initial experiment creation called from
1699        the dispatcher.
1700
1701        Creates a working directory, splits the incoming description using the
1702        splitter script and parses out the avrious subsections using the
1703        lcasses above.  Once each sub-experiment is created, use pooled threads
1704        to instantiate them and start it all up.
1705        """
1706        self.log.info("New experiment call started for %s" % fid)
1707        req = req.get('NewRequestBody', None)
1708        if not req:
1709            raise service_error(service_error.req,
1710                    "Bad request format (no NewRequestBody)")
1711
1712        # import may partially succeed so always save credentials and warn
1713        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1714            self.log.debug("Failed to import delegation credentials(!)")
1715        self.get_grouper_updates(fid)
1716        self.auth.update()
1717        self.auth.save()
1718
1719        try:
1720            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1721                    with_proof=True)
1722        except service_error, e:
1723            self.log.info("New experiment call for %s: access denied" % fid)
1724            raise e
1725
1726
1727        if not access_ok:
1728            self.log.info("New experiment call for %s: Access denied" % fid)
1729            raise service_error(service_error.access, "New access denied",
1730                    proof=[proof])
1731
1732        try:
1733            tmpdir = tempfile.mkdtemp(prefix="split-")
1734        except EnvironmentError:
1735            raise service_error(service_error.internal, "Cannot create tmp dir")
1736
1737        # Generate an ID for the experiment (slice) and a certificate that the
1738        # allocator can use to prove they own it.  We'll ship it back through
1739        # the encrypted connection.  If the requester supplied one, use it.
1740        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1741            expcert = req['experimentAccess']['X509']
1742            expid = fedid(certstr=expcert)
1743            self.state_lock.acquire()
1744            if expid in self.state:
1745                self.state_lock.release()
1746                raise service_error(service_error.req, 
1747                        'fedid %s identifies an existing experiment' % expid)
1748            self.state_lock.release()
1749        else:
1750            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1751
1752        #now we're done with the tmpdir, and it should be empty
1753        if self.cleanup:
1754            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1755            os.rmdir(tmpdir)
1756        else:
1757            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1758
1759        eid = self.create_experiment_state(fid, req, expid, expcert, 
1760                state='empty')
1761
1762        rv = {
1763                'experimentID': [
1764                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1765                ],
1766                'experimentStatus': 'empty',
1767                'experimentAccess': { 'X509' : expcert },
1768                'proof': proof.to_dict(),
1769            }
1770
1771        self.log.info("New experiment call succeeded for %s" % fid)
1772        return rv
1773
1774    # create_experiment sub-functions
1775
1776    @staticmethod
1777    def get_experiment_key(req, field='experimentID'):
1778        """
1779        Parse the experiment identifiers out of the request (the request body
1780        tag has been removed).  Specifically this pulls either the fedid or the
1781        localname out of the experimentID field.  A fedid is preferred.  If
1782        neither is present or the request does not contain the fields,
1783        service_errors are raised.
1784        """
1785        # Get the experiment access
1786        exp = req.get(field, None)
1787        if exp:
1788            if exp.has_key('fedid'):
1789                key = exp['fedid']
1790            elif exp.has_key('localname'):
1791                key = exp['localname']
1792            else:
1793                raise service_error(service_error.req, "Unknown lookup type")
1794        else:
1795            raise service_error(service_error.req, "No request?")
1796
1797        return key
1798
1799    def get_experiment_ids_and_start(self, key, tmpdir):
1800        """
1801        Get the experiment name, id and access certificate from the state, and
1802        set the experiment state to 'starting'.  returns a triple (fedid,
1803        localname, access_cert_file). The access_cert_file is a copy of the
1804        contents of the access certificate, created in the tempdir with
1805        restricted permissions.  If things are confused, raise an exception.
1806        """
1807
1808        expid = eid = None
1809        self.state_lock.acquire()
1810        if key in self.state:
1811            exp = self.state[key]
1812            exp.status = "starting"
1813            exp.updated()
1814            expid = exp.fedid
1815            eid = exp.localname
1816            expcert = exp.identity
1817        self.state_lock.release()
1818
1819        # make a protected copy of the access certificate so the experiment
1820        # controller can act as the experiment principal.
1821        if expcert:
1822            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1823            if not expcert_file:
1824                raise service_error(service_error.internal, 
1825                        "Cannot create temp cert file?")
1826        else:
1827            expcert_file = None
1828
1829        return (eid, expid, expcert_file)
1830
1831    def get_topology(self, req, tmpdir):
1832        """
1833        Get the ns2 content and put it into a file for parsing.  Call the local
1834        or remote parser and return the topdl.Topology.  Errors result in
1835        exceptions.  req is the request and tmpdir is a work directory.
1836        """
1837
1838        # The tcl parser needs to read a file so put the content into that file
1839        descr=req.get('experimentdescription', None)
1840        if descr:
1841            if 'ns2description' in descr:
1842                file_content=descr['ns2description']
1843            elif 'topdldescription' in descr:
1844                return topdl.Topology(**descr['topdldescription'])
1845            else:
1846                raise service_error(service_error.req, 
1847                        'Unknown experiment description type')
1848        else:
1849            raise service_error(service_error.req, "No experiment description")
1850
1851
1852        if self.splitter_url:
1853            self.log.debug("Calling remote topdl translator at %s" % \
1854                    self.splitter_url)
1855            top = self.remote_ns2topdl(self.splitter_url, file_content)
1856        else:
1857            tclfile = os.path.join(tmpdir, "experiment.tcl")
1858            if file_content:
1859                try:
1860                    f = open(tclfile, 'w')
1861                    f.write(file_content)
1862                    f.close()
1863                except EnvironmentError:
1864                    raise service_error(service_error.internal,
1865                            "Cannot write temp experiment description")
1866            else:
1867                raise service_error(service_error.req, 
1868                        "Only ns2descriptions supported")
1869            pid = "dummy"
1870            gid = "dummy"
1871            eid = "dummy"
1872
1873            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1874                str(self.muxmax), '-m', 'dummy']
1875
1876            tclcmd.extend([pid, gid, eid, tclfile])
1877
1878            self.log.debug("running local splitter %s", " ".join(tclcmd))
1879            # This is just fantastic.  As a side effect the parser copies
1880            # tb_compat.tcl into the current directory, so that directory
1881            # must be writable by the fedd user.  Doing this in the
1882            # temporary subdir ensures this is the case.
1883            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1884                    cwd=tmpdir)
1885            split_data = tclparser.stdout
1886
1887            top = topdl.topology_from_xml(file=split_data, top="experiment")
1888            os.remove(tclfile)
1889
1890        return top
1891
1892    def get_testbed_services(self, req, testbeds):
1893        """
1894        Parse the services section of the request into two dicts mapping
1895        testbed to lists of federated_service objects.  The first dict maps all
1896        exporters of services to those service objects, the second maps
1897        testbeds to service objects only for services requiring portals.
1898        """
1899
1900        # Sanity check the services.  Exports or imports from unknown testbeds
1901        # cause an exception.
1902        for s in req.get('service', []):
1903            for t in s.get('import', []):
1904                if t not in testbeds:
1905                    raise service_error(service_error.req, 
1906                            'Service import to unknown testbed: %s' %t)
1907            for t in s.get('export', []):
1908                if t not in testbeds:
1909                    raise service_error(service_error.req, 
1910                            'Service export from unknown testbed: %s' %t)
1911
1912
1913        # We construct both dicts here because deriving the second is more
1914        # complex than it looks - both the keys and lists can differ, and it's
1915        # much easier to generate both in one pass.
1916        masters = { }
1917        pmasters = { }
1918        for s in req.get('service', []):
1919            # If this is a service request with the importall field
1920            # set, fill it out.
1921
1922            if s.get('importall', False):
1923                s['import'] = [ tb for tb in testbeds \
1924                        if tb not in s.get('export',[])]
1925                del s['importall']
1926
1927            # Add the service to masters
1928            for tb in s.get('export', []):
1929                if s.get('name', None):
1930
1931                    params = { }
1932                    for a in s.get('fedAttr', []):
1933                        params[a.get('attribute', '')] = a.get('value','')
1934
1935                    fser = federated_service(name=s['name'],
1936                            exporter=tb, importers=s.get('import',[]),
1937                            params=params)
1938                    if fser.name == 'hide_hosts' \
1939                            and 'hosts' not in fser.params:
1940                        fser.params['hosts'] = \
1941                                ",".join(tb_hosts.get(fser.exporter, []))
1942                    if tb in masters: masters[tb].append(fser)
1943                    else: masters[tb] = [fser]
1944
1945                    if fser.portal:
1946                        if tb in pmasters: pmasters[tb].append(fser)
1947                        else: pmasters[tb] = [fser]
1948                else:
1949                    self.log.error('Testbed service does not have name " + \
1950                            "and importers')
1951        return masters, pmasters
1952
1953    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1954        """
1955        Create the ssh keys necessary for interconnecting the portal nodes and
1956        the global hosts file for letting each segment know about the IP
1957        addresses in play.  If we have computed static routes, copy them into
1958        the repo. Save these into the repo.  Add attributes to the autorizer
1959        allowing access controllers to download them and return a set of
1960        attributes that inform the segments where to find this stuff.  May
1961        raise service_errors in if there are problems.
1962        """
1963        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1964        gw_secretkey_base = "fed.%s" % self.ssh_type
1965        keydir = os.path.join(tmpdir, 'keys')
1966        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1967        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1968        route = os.path.join(tmpdir, 'route.tgz')
1969
1970        try:
1971            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1972        except ValueError:
1973            raise service_error(service_error.server_config, 
1974                    "Bad key type (%s)" % self.ssh_type)
1975
1976        self.generate_seer_certs(keydir)
1977
1978        # Copy configuration files into the remote file store
1979        # The config urlpath
1980        configpath = "/%s/config" % expid
1981        # The config file system location
1982        configdir ="%s%s" % ( self.repodir, configpath)
1983        route_conf = os.path.join(configdir, 'route.tgz')
1984        try:
1985            os.makedirs(configdir)
1986        except EnvironmentError, e:
1987            raise service_error(service_error.internal,
1988                    "Cannot create config directory: %s" % e)
1989        try:
1990            f = open("%s/hosts" % configdir, "w")
1991            print >> f, string.join(hosts, '\n')
1992            f.close()
1993        except EnvironmentError, e:
1994            raise service_error(service_error.internal, 
1995                    "Cannot write hosts file: %s" % e)
1996        try:
1997            if os.path.exists(route):
1998                copy_file(route, route_conf)
1999
2000            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2001            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2002            copy_file(os.path.join(keydir, 'ca.pem'), 
2003                    os.path.join(configdir, 'ca.pem'))
2004            copy_file(os.path.join(keydir, 'node.pem'), 
2005                    os.path.join(configdir, 'node.pem'))
2006        except EnvironmentError, e:
2007            raise service_error(service_error.internal, 
2008                    "Cannot copy keyfiles: %s" % e)
2009
2010        # Allow the individual testbeds to access the configuration files,
2011        # again by setting an attribute for the relevant pathnames on each
2012        # allocation principal.  Yeah, that's a long list comprehension.
2013        self.append_experiment_authorization(expid, set([
2014            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2015                    for tb in tbparams.keys() \
2016                        for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz',
2017                            gw_secretkey_base, gw_pubkey_base)]))
2018
2019        attrs = [ 
2020                {
2021                    'attribute': 'ssh_pubkey', 
2022                    'value': '%s/%s/config/%s' % \
2023                            (self.repo_url, expid, gw_pubkey_base)
2024                },
2025                {
2026                    'attribute': 'ssh_secretkey', 
2027                    'value': '%s/%s/config/%s' % \
2028                            (self.repo_url, expid, gw_secretkey_base)
2029                },
2030                {
2031                    'attribute': 'hosts', 
2032                    'value': '%s/%s/config/hosts' % \
2033                            (self.repo_url, expid)
2034                },
2035                {
2036                    'attribute': 'seer_ca_pem', 
2037                    'value': '%s/%s/config/%s' % \
2038                            (self.repo_url, expid, 'ca.pem')
2039                },
2040                {
2041                    'attribute': 'seer_node_pem', 
2042                    'value': '%s/%s/config/%s' % \
2043                            (self.repo_url, expid, 'node.pem')
2044                },
2045            ]
2046        # Add info about static routes if we have some
2047        if os.path.exists(route_conf):
2048            attrs.append(
2049                {
2050                    'attribute': 'route.tgz', 
2051                    'value': '%s/%s/config/%s' % \
2052                            (self.repo_url, expid, 'route.tgz')
2053                }
2054            )
2055        return attrs
2056
2057
2058    def get_vtopo(self, req, fid):
2059        """
2060        Return the stored virtual topology for this experiment
2061        """
2062        rv = None
2063        state = None
2064        self.log.info("vtopo call started for %s" %  fid)
2065
2066        req = req.get('VtopoRequestBody', None)
2067        if not req:
2068            raise service_error(service_error.req,
2069                    "Bad request format (no VtopoRequestBody)")
2070        exp = req.get('experiment', None)
2071        if exp:
2072            if exp.has_key('fedid'):
2073                key = exp['fedid']
2074                keytype = "fedid"
2075            elif exp.has_key('localname'):
2076                key = exp['localname']
2077                keytype = "localname"
2078            else:
2079                raise service_error(service_error.req, "Unknown lookup type")
2080        else:
2081            raise service_error(service_error.req, "No request?")
2082
2083        try:
2084            proof = self.check_experiment_access(fid, key)
2085        except service_error, e:
2086            self.log.info("vtopo call failed for %s: access denied" %  fid)
2087            raise e
2088
2089        self.state_lock.acquire()
2090        # XXX: this needs to be recalculated
2091        if key in self.state:
2092            if self.state[key].top is not None:
2093                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2094                rv = { 'experiment' : {keytype: key },
2095                        'vtopo': vtopo,
2096                        'proof': proof.to_dict(), 
2097                    }
2098            else:
2099                state = self.state[key].status
2100        self.state_lock.release()
2101
2102        if rv: 
2103            self.log.info("vtopo call completed for %s %s " % \
2104                (key, fid))
2105            return rv
2106        else: 
2107            if state:
2108                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2109                    (key, fid))
2110                raise service_error(service_error.partial, 
2111                        "Not ready: %s" % state)
2112            else:
2113                self.log.info("vtopo call completed for %s %s (No experiment)"\
2114                        % (key, fid))
2115                raise service_error(service_error.req, "No such experiment")
2116
2117    def get_vis(self, req, fid):
2118        """
2119        Return the stored visualization for this experiment
2120        """
2121        rv = None
2122        state = None
2123
2124        self.log.info("vis call started for %s" %  fid)
2125        req = req.get('VisRequestBody', None)
2126        if not req:
2127            raise service_error(service_error.req,
2128                    "Bad request format (no VisRequestBody)")
2129        exp = req.get('experiment', None)
2130        if exp:
2131            if exp.has_key('fedid'):
2132                key = exp['fedid']
2133                keytype = "fedid"
2134            elif exp.has_key('localname'):
2135                key = exp['localname']
2136                keytype = "localname"
2137            else:
2138                raise service_error(service_error.req, "Unknown lookup type")
2139        else:
2140            raise service_error(service_error.req, "No request?")
2141
2142        try:
2143            proof = self.check_experiment_access(fid, key)
2144        except service_error, e:
2145            self.log.info("vis call failed for %s: access denied" %  fid)
2146            raise e
2147
2148        self.state_lock.acquire()
2149        # Generate the visualization
2150        if key in self.state:
2151            if self.state[key].top is not None:
2152                try:
2153                    vis = self.genviz(
2154                            topdl.topology_to_vtopo(self.state[key].top))
2155                except service_error, e:
2156                    self.state_lock.release()
2157                    raise e
2158                rv =  { 'experiment' : {keytype: key },
2159                        'vis': vis,
2160                        'proof': proof.to_dict(), 
2161                        }
2162            else:
2163                state = self.state[key].status
2164        self.state_lock.release()
2165
2166        if rv: 
2167            self.log.info("vis call completed for %s %s " % \
2168                (key, fid))
2169            return rv
2170        else:
2171            if state:
2172                self.log.info("vis call completed for %s %s (not ready)" % \
2173                    (key, fid))
2174                raise service_error(service_error.partial, 
2175                        "Not ready: %s" % state)
2176            else:
2177                self.log.info("vis call completed for %s %s (no experiment)" % \
2178                    (key, fid))
2179                raise service_error(service_error.req, "No such experiment")
2180
2181    @staticmethod
2182    def needs_route_computation(req):
2183        '''
2184        Walk the request services looking for a static routing request
2185        '''
2186        for s in req.get('service', []):
2187            if s.get('name', '') == 'static_routing':
2188                return True
2189        return False
2190
2191    def compute_static_routes(self, tmpdir, top):
2192        '''
2193        Compute a set of static routes for the topology.  The result is a file
2194        called route.tgz in tmpdir that contains a dinrectory, route, with a
2195        file per node in the topology that lists the prefix and router for all
2196        the routes in that node's routing table.  Exceptions from the route
2197        calculation program and the tar creation call are not caught.
2198        '''
2199        if self.routing is None:
2200            raise service_error(service_error.server, 
2201                    'Cannot provide staticroutes, no routing program specified')
2202        rg = tempfile.NamedTemporaryFile()
2203        outdir = os.path.join(tmpdir, 'route')
2204        tarfile = os.path.join(tmpdir, 'route.tgz')
2205        topology_to_route_file(top, file=rg)
2206        os.mkdir(outdir)
2207        try:
2208            for cmd in (
2209                    [self.routing, '--input', rg.name,'--output', outdir],
2210                    ['tar', '-C', tmpdir, '-czf', tarfile, 'route']):
2211                subprocess.check_call(cmd)
2212        except subprocess.CalledProcessError, e:
2213            raise service_error(service_error.internal, 
2214                    'Cannot call %s: %s' % (' '.join(cmd), e.returncode))
2215        rg.close()
2216        shutil.rmtree(outdir)
2217
2218
2219   
2220    def save_federant_information(self, allocated, tbparams, eid, top):
2221        """
2222        Store the various data that have changed in the experiment state
2223        between when it was started and the beginning of resource allocation.
2224        This is basically the information about each local allocation.  This
2225        fills in the values of the placeholder allocation in the state.  It
2226        also collects the access proofs and returns them as dicts for a
2227        response message.
2228        """
2229        self.state_lock.acquire()
2230        exp = self.state[eid]
2231        exp.top = top.clone()
2232        # save federant information
2233        for k in allocated.keys():
2234            exp.add_allocation(tbparams[k])
2235            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2236                type="testbed", localname=[k], 
2237                service=[ s.to_topdl() for s in tbparams[k].services]))
2238
2239        # Access proofs for the response message
2240        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2241                    for p in tbparams[k].proof]
2242        exp.updated()
2243        if self.state_filename: 
2244            self.write_state()
2245        self.state_lock.release()
2246        return proofs
2247
2248    def clear_placeholder(self, eid, expid, tmpdir):
2249        """
2250        Clear the placeholder and remove any allocated temporary dir.
2251        """
2252
2253        self.state_lock.acquire()
2254        del self.state[eid]
2255        del self.state[expid]
2256        if self.state_filename: self.write_state()
2257        self.state_lock.release()
2258        if tmpdir and self.cleanup:
2259            self.remove_dirs(tmpdir)
2260
2261    # end of create_experiment sub-functions
2262
2263    def create_experiment(self, req, fid):
2264        """
2265        The external interface to experiment creation called from the
2266        dispatcher.
2267
2268        Creates a working directory, splits the incoming description using the
2269        splitter script and parses out the various subsections using the
2270        classes above.  Once each sub-experiment is created, use pooled threads
2271        to instantiate them and start it all up.
2272        """
2273
2274        self.log.info("Create experiment call started for %s" % fid)
2275        req = req.get('CreateRequestBody', None)
2276        if req:
2277            key = self.get_experiment_key(req)
2278        else:
2279            raise service_error(service_error.req,
2280                    "Bad request format (no CreateRequestBody)")
2281
2282        # Import information from the requester
2283        # import may partially succeed so always save credentials and warn
2284        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2285            self.log.debug("Failed to import delegation credentials(!)")
2286        self.get_grouper_updates(fid)
2287        self.auth.update()
2288        self.auth.save()
2289
2290        try:
2291            # Make sure that the caller can talk to us
2292            proof = self.check_experiment_access(fid, key)
2293        except service_error, e:
2294            self.log.info("Create experiment call failed for %s: access denied"\
2295                    % fid)
2296            raise e
2297
2298
2299        # Install the testbed map entries supplied with the request into a copy
2300        # of the testbed map.
2301        tbmap = dict(self.tbmap)
2302        for m in req.get('testbedmap', []):
2303            if 'testbed' in m and 'uri' in m:
2304                tbmap[m['testbed']] = m['uri']
2305
2306        # a place to work
2307        try:
2308            tmpdir = tempfile.mkdtemp(prefix="split-")
2309            os.mkdir(tmpdir+"/keys")
2310        except EnvironmentError:
2311            raise service_error(service_error.internal, "Cannot create tmp dir")
2312
2313        tbparams = { }
2314
2315        eid, expid, expcert_file = \
2316                self.get_experiment_ids_and_start(key, tmpdir)
2317
2318        # This catches exceptions to clear the placeholder if necessary
2319        try: 
2320            if not (eid and expid):
2321                raise service_error(service_error.internal, 
2322                        "Cannot find local experiment info!?")
2323
2324            top = self.get_topology(req, tmpdir)
2325            self.confirm_software(top)
2326            # Assign the IPs
2327            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2328            if self.needs_route_computation(req):
2329                self.compute_static_routes(tmpdir, top)
2330            # Find the testbeds to look up
2331            tb_hosts = { }
2332            testbeds = [ ]
2333            for e in top.elements:
2334                if isinstance(e, topdl.Computer):
2335                    tb = e.get_attribute('testbed')
2336                    # Put nodes not in a testbed into the 'default' testbed if
2337                    # 'default' is in the fedd map, the instantiation will
2338                    # work.
2339                    if tb is None:
2340                        tb = 'default'
2341                        e.set_attribute('testbed', tb)
2342                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2343                    else: 
2344                        tb_hosts[tb] = [ e.name ]
2345                        testbeds.append(tb)
2346
2347            masters, pmasters = self.get_testbed_services(req, testbeds)
2348            allocated = { }         # Testbeds we can access
2349            topo ={ }               # Sub topologies
2350            connInfo = { }          # Connection information
2351
2352            self.get_access_to_testbeds(testbeds, fid, allocated, 
2353                    tbparams, masters, tbmap, expid, expcert_file)
2354
2355            # tbactive is the set of testbeds that have NATs in front of their
2356            # portals. They need to initiate connections.
2357            tbactive = set([k for k, v in tbparams.items() \
2358                    if v.get_attribute('nat_portals')])
2359
2360            self.split_topology(top, topo, testbeds)
2361
2362            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2363
2364            part = experiment_partition(self.auth, self.store_url, tbmap,
2365                    self.muxmax, self.direct_transit, tbactive)
2366            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2367                    connInfo, expid)
2368
2369            auth_attrs = set()
2370            # Now get access to the dynamic testbeds (those added above)
2371            for tb in [ t for t in topo if t not in allocated]:
2372                self.get_access(tb, tbparams, fid, masters, tbmap, 
2373                        expid, expcert_file)
2374                allocated[tb] = 1
2375                store_keys = topo[tb].get_attribute('store_keys')
2376                # Give the testbed access to keys it exports or imports
2377                if store_keys:
2378                    auth_attrs.update(set([
2379                        (tbparams[tb].allocID, sk) \
2380                                for sk in store_keys.split(" ")]))
2381
2382            if auth_attrs:
2383                self.append_experiment_authorization(expid, auth_attrs)
2384
2385            # transit and disconnected testbeds may not have a connInfo entry.
2386            # Fill in the blanks.
2387            for t in allocated.keys():
2388                if not connInfo.has_key(t):
2389                    connInfo[t] = { }
2390
2391            self.wrangle_software(expid, top, topo, tbparams)
2392
2393            proofs = self.save_federant_information(allocated, tbparams, 
2394                    eid, top)
2395        except service_error, e:
2396            # If something goes wrong in the parse (usually an access error)
2397            # clear the placeholder state.  From here on out the code delays
2398            # exceptions.  Failing at this point returns a fault to the remote
2399            # caller.
2400
2401            self.log.info("Create experiment call failed for %s %s: %s" % 
2402                    (eid, fid, e))
2403            self.clear_placeholder(eid, expid, tmpdir)
2404            raise e
2405
2406        # Start the background swapper and return the starting state.  From
2407        # here on out, the state will stick around a while.
2408
2409        # Create a logger that logs to the experiment's state object as well as
2410        # to the main log file.
2411        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2412        alloc_collector = self.list_log(self.state[eid].log)
2413        h = logging.StreamHandler(alloc_collector)
2414        # XXX: there should be a global one of these rather than repeating the
2415        # code.
2416        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2417                    '%d %b %y %H:%M:%S'))
2418        alloc_log.addHandler(h)
2419
2420        # Start a thread to do the resource allocation
2421        t  = Thread(target=self.allocate_resources,
2422                args=(allocated, masters, eid, expid, tbparams, 
2423                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2424                    connInfo, tbmap, expcert_file),
2425                name=eid)
2426        t.start()
2427
2428        rv = {
2429                'experimentID': [
2430                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2431                ],
2432                'experimentStatus': 'starting',
2433                'proof': [ proof.to_dict() ] + proofs,
2434            }
2435        self.log.info("Create experiment call succeeded for %s %s" % \
2436                (eid, fid))
2437
2438        return rv
2439   
2440    def get_experiment_fedid(self, key):
2441        """
2442        find the fedid associated with the localname key in the state database.
2443        """
2444
2445        rv = None
2446        self.state_lock.acquire()
2447        if key in self.state:
2448            rv = self.state[key].fedid
2449        self.state_lock.release()
2450        return rv
2451
2452    def check_experiment_access(self, fid, key):
2453        """
2454        Confirm that the fid has access to the experiment.  Though a request
2455        may be made in terms of a local name, the access attribute is always
2456        the experiment's fedid.
2457        """
2458        if not isinstance(key, fedid):
2459            key = self.get_experiment_fedid(key)
2460
2461        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2462
2463        if access_ok:
2464            return proof
2465        else:
2466            raise service_error(service_error.access, "Access Denied",
2467                proof)
2468
2469
2470    def get_handler(self, path, fid):
2471        """
2472        Perhaps surprisingly named, this function handles HTTP GET requests to
2473        this server (SOAP requests are POSTs).
2474        """
2475        self.log.info("Get handler %s %s" % (path, fid))
2476        if len("%s" % fid) == 0:
2477            return (None, None)
2478        # XXX: log proofs?
2479        if self.auth.check_attribute(fid, path):
2480            return ("%s/%s" % (self.repodir, path), "application/binary")
2481        else:
2482            return (None, None)
2483
2484    def update_info(self, key, force=False):
2485        top = None
2486        self.state_lock.acquire()
2487        if key in self.state:
2488            if force or self.state[key].older_than(self.info_cache_limit):
2489                top = self.state[key].top
2490                if top is not None: top = top.clone()
2491                d1, info_params, cert, d2 = \
2492                        self.get_segment_info(self.state[key], need_lock=False)
2493        self.state_lock.release()
2494
2495        if top is None: return
2496
2497        try:
2498            tmpdir = tempfile.mkdtemp(prefix="info-")
2499        except EnvironmentError:
2500            raise service_error(service_error.internal, 
2501                    "Cannot create tmp dir")
2502        cert_file = self.make_temp_certfile(cert, tmpdir)
2503
2504        data = []
2505        try:
2506            for k, (uri, aid) in info_params.items():
2507                info=self.info_segment(log=self.log, testbed=uri,
2508                            cert_file=cert_file, cert_pwd=None,
2509                            trusted_certs=self.trusted_certs,
2510                            caller=self.call_InfoSegment)
2511                info(uri, aid)
2512                data.append(info)
2513        # Clean up the tmpdir no matter what
2514        finally:
2515            if tmpdir: self.remove_dirs(tmpdir)
2516
2517        self.annotate_topology(top, data)
2518        self.state_lock.acquire()
2519        if key in self.state:
2520            self.state[key].top = top
2521            self.state[key].updated()
2522            if self.state_filename: self.write_state()
2523        self.state_lock.release()
2524
2525   
2526    def get_info(self, req, fid):
2527        """
2528        Return all the stored info about this experiment
2529        """
2530        rv = None
2531
2532        self.log.info("Info call started for %s" %  fid)
2533        req = req.get('InfoRequestBody', None)
2534        if not req:
2535            raise service_error(service_error.req,
2536                    "Bad request format (no InfoRequestBody)")
2537        exp = req.get('experiment', None)
2538        legacy = req.get('legacy', False)
2539        fresh = req.get('fresh', False)
2540        if exp:
2541            if exp.has_key('fedid'):
2542                key = exp['fedid']
2543                keytype = "fedid"
2544            elif exp.has_key('localname'):
2545                key = exp['localname']
2546                keytype = "localname"
2547            else:
2548                raise service_error(service_error.req, "Unknown lookup type")
2549        else:
2550            raise service_error(service_error.req, "No request?")
2551
2552        try:
2553            proof = self.check_experiment_access(fid, key)
2554        except service_error, e:
2555            self.log.info("Info call failed for %s: access denied" %  fid)
2556
2557
2558        self.update_info(key, fresh)
2559
2560        self.state_lock.acquire()
2561        if self.state.has_key(key):
2562            rv = self.state[key].get_info()
2563            # Copy the topo if we need legacy annotations
2564            if legacy:
2565                top = self.state[key].top
2566                if top is not None: top = top.clone()
2567        self.state_lock.release()
2568        self.log.info("Gathered Info for %s %s" % (key, fid))
2569
2570        # If the legacy visualization and topology representations are
2571        # requested, calculate them and add them to the return.
2572        if legacy and rv is not None:
2573            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2574            if top is not None:
2575                vtopo = topdl.topology_to_vtopo(top)
2576                if vtopo is not None:
2577                    rv['vtopo'] = vtopo
2578                    try:
2579                        vis = self.genviz(vtopo)
2580                    except service_error, e:
2581                        self.log.debug('Problem generating visualization: %s' \
2582                                % e)
2583                        vis = None
2584                    if vis is not None:
2585                        rv['vis'] = vis
2586        if rv:
2587            self.log.info("Info succeded for %s %s" % (key, fid))
2588            rv['proof'] = proof.to_dict()
2589            return rv
2590        else: 
2591            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2592            raise service_error(service_error.req, "No such experiment")
2593
2594    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2595            results):
2596        """
2597        Call OperateSegment on multiple testbeds and gather the results.
2598        op_params contains the parameters needed to contact that testbed, cert
2599        is a certificate containing the fedid to use, op is the operation,
2600        testbeds is a dict mapping testbed name to targets in that testbed,
2601        params are the parameters to include a,d results is a growing list of
2602        the results of the calls.
2603        """
2604        try:
2605            tmpdir = tempfile.mkdtemp(prefix="info-")
2606        except EnvironmentError:
2607            raise service_error(service_error.internal, 
2608                    "Cannot create tmp dir")
2609        cert_file = self.make_temp_certfile(cert, tmpdir)
2610
2611        try:
2612            for tb, targets in testbeds.items():
2613                if tb in op_params:
2614                    uri, aid = op_params[tb]
2615                    operate=self.operation_segment(log=self.log, testbed=uri,
2616                                cert_file=cert_file, cert_pwd=None,
2617                                trusted_certs=self.trusted_certs,
2618                                caller=self.call_OperationSegment)
2619                    if operate(uri, aid, op, targets, params):
2620                        if operate.status is not None:
2621                            results.extend(operate.status)
2622                            continue
2623                # Something went wrong in a weird way.  Add statuses
2624                # that reflect that to results
2625                for t in targets:
2626                    results.append(operation_status(t, 
2627                        operation_status.federant,
2628                        'Unexpected error on %s' % tb))
2629        # Clean up the tmpdir no matter what
2630        finally:
2631            if tmpdir: self.remove_dirs(tmpdir)
2632
2633    def do_operation(self, req, fid):
2634        """
2635        Find the testbeds holding each target and ask them to carry out the
2636        operation.  Return the statuses.
2637        """
2638        # Map an element to the testbed containing it
2639        def element_to_tb(e):
2640            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2641            elif isinstance(e, topdl.Testbed): return e.name
2642            else: return None
2643        # If d is an operation_status object, make it a dict
2644        def make_dict(d):
2645            if isinstance(d, dict): return d
2646            elif isinstance(d, operation_status): return d.to_dict()
2647            else: return { }
2648
2649        def element_name(e):
2650            if isinstance(e, topdl.Computer): return e.name
2651            elif isinstance(e, topdl.Testbed): 
2652                if e.localname: return e.localname[0]
2653                else: return None
2654            else: return None
2655
2656        self.log.info("Operation call started for %s" %  fid)
2657        req = req.get('OperationRequestBody', None)
2658        if not req:
2659            raise service_error(service_error.req,
2660                    "Bad request format (no OperationRequestBody)")
2661        exp = req.get('experiment', None)
2662        op = req.get('operation', None)
2663        targets = set(req.get('target', []))
2664        params = req.get('parameter', None)
2665
2666        if exp:
2667            if 'fedid' in exp:
2668                key = exp['fedid']
2669                keytype = "fedid"
2670            elif 'localname' in exp:
2671                key = exp['localname']
2672                keytype = "localname"
2673            else:
2674                raise service_error(service_error.req, "Unknown lookup type")
2675        else:
2676            raise service_error(service_error.req, "No request?")
2677
2678        if op is None or not targets:
2679            raise service_error(service_error.req, "No request?")
2680
2681        try:
2682            proof = self.check_experiment_access(fid, key)
2683        except service_error, e:
2684            self.log.info("Operation call failed for %s: access denied" %  fid)
2685            raise e
2686
2687        self.state_lock.acquire()
2688        if key in self.state:
2689            d1, op_params, cert, d2 = \
2690                    self.get_segment_info(self.state[key], need_lock=False,
2691                            key='tb')
2692            top = self.state[key].top
2693            if top is not None:
2694                top = top.clone()
2695        self.state_lock.release()
2696
2697        if top is None:
2698            self.log.info("Operation call failed for %s: not active" %  fid)
2699            raise service_error(service_error.partial, "No topology yet", 
2700                    proof=proof)
2701
2702        testbeds = { }
2703        results = []
2704        for e in top.elements:
2705            ename = element_name(e)
2706            if ename in targets:
2707                tb = element_to_tb(e)
2708                targets.remove(ename)
2709                if tb is not None:
2710                    if tb in testbeds: testbeds[tb].append(ename)
2711                    else: testbeds[tb] = [ ename ]
2712                else:
2713                    results.append(operation_status(e.name, 
2714                        code=operation_status.no_target, 
2715                        description='Cannot map target to testbed'))
2716
2717        for t in targets:
2718            results.append(operation_status(t, operation_status.no_target))
2719
2720        self.operate_on_segments(op_params, cert, op, testbeds, params,
2721                results)
2722
2723        self.log.info("Operation call succeeded for %s" %  fid)
2724        return { 
2725                'experiment': exp, 
2726                'status': [make_dict(r) for r in results],
2727                'proof': proof.to_dict()
2728                }
2729
2730
2731    def get_multi_info(self, req, fid):
2732        """
2733        Return all the stored info that this fedid can access
2734        """
2735        rv = { 'info': [ ], 'proof': [ ] }
2736
2737        self.log.info("Multi Info call started for %s" %  fid)
2738        self.get_grouper_updates(fid)
2739        self.auth.update()
2740        self.auth.save()
2741        self.state_lock.acquire()
2742        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2743            try:
2744                proof = self.check_experiment_access(fid, key)
2745            except service_error, e:
2746                if e.code == service_error.access:
2747                    continue
2748                else:
2749                    self.log.info("Multi Info call failed for %s: %s" %  \
2750                            (e,fid))
2751                    self.state_lock.release()
2752                    raise e
2753
2754            if self.state.has_key(key):
2755                e = self.state[key].get_info()
2756                e['proof'] = proof.to_dict()
2757                rv['info'].append(e)
2758                rv['proof'].append(proof.to_dict())
2759        self.state_lock.release()
2760        self.log.info("Multi Info call succeeded for %s" %  fid)
2761        return rv
2762
2763    def check_termination_status(self, fed_exp, force):
2764        """
2765        Confirm that the experiment is sin a valid state to stop (or force it)
2766        return the state - invalid states for deletion and force settings cause
2767        exceptions.
2768        """
2769        self.state_lock.acquire()
2770        status = fed_exp.status
2771
2772        if status:
2773            if status in ('starting', 'terminating'):
2774                if not force:
2775                    self.state_lock.release()
2776                    raise service_error(service_error.partial, 
2777                            'Experiment still being created or destroyed')
2778                else:
2779                    self.log.warning('Experiment in %s state ' % status + \
2780                            'being terminated by force.')
2781            self.state_lock.release()
2782            return status
2783        else:
2784            # No status??? trouble
2785            self.state_lock.release()
2786            raise service_error(service_error.internal,
2787                    "Experiment has no status!?")
2788
2789    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2790        ids = []
2791        term_params = { }
2792        if need_lock: self.state_lock.acquire()
2793        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2794        expcert = fed_exp.identity
2795        repo = "%s" % fed_exp.fedid
2796
2797        # Collect the allocation/segment ids into a dict keyed by the fedid
2798        # of the allocation that contains a tuple of uri, aid
2799        for i, fed in enumerate(fed_exp.get_all_allocations()):
2800            uri = fed.uri
2801            aid = fed.allocID
2802            if key == 'aid': term_params[aid] = (uri, aid)
2803            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2804
2805        if need_lock: self.state_lock.release()
2806        return ids, term_params, expcert, repo
2807
2808
2809    def get_termination_info(self, fed_exp):
2810        self.state_lock.acquire()
2811        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2812        # Change the experiment state
2813        fed_exp.status = 'terminating'
2814        fed_exp.updated()
2815        if self.state_filename: self.write_state()
2816        self.state_lock.release()
2817
2818        return ids, term_params, expcert, repo
2819
2820
2821    def deallocate_resources(self, term_params, expcert, status, force, 
2822            dealloc_log):
2823        tmpdir = None
2824        # This try block makes sure the tempdir is cleared
2825        try:
2826            # If no expcert, try the deallocation as the experiment
2827            # controller instance.
2828            if expcert and self.auth_type != 'legacy': 
2829                try:
2830                    tmpdir = tempfile.mkdtemp(prefix="term-")
2831                except EnvironmentError:
2832                    raise service_error(service_error.internal, 
2833                            "Cannot create tmp dir")
2834                cert_file = self.make_temp_certfile(expcert, tmpdir)
2835                pw = None
2836            else: 
2837                cert_file = self.cert_file
2838                pw = self.cert_pwd
2839
2840            # Stop everyone.  NB, wait_for_all waits until a thread starts
2841            # and then completes, so we can't wait if nothing starts.  So,
2842            # no tbparams, no start.
2843            if len(term_params) > 0:
2844                tp = thread_pool(self.nthreads)
2845                for k, (uri, aid) in term_params.items():
2846                    # Create and start a thread to stop the segment
2847                    tp.wait_for_slot()
2848                    t  = pooled_thread(\
2849                            target=self.terminate_segment(log=dealloc_log,
2850                                testbed=uri,
2851                                cert_file=cert_file, 
2852                                cert_pwd=pw,
2853                                trusted_certs=self.trusted_certs,
2854                                caller=self.call_TerminateSegment),
2855                            args=(uri, aid), name=k,
2856                            pdata=tp, trace_file=self.trace_file)
2857                    t.start()
2858                # Wait for completions
2859                tp.wait_for_all_done()
2860
2861            # release the allocations (failed experiments have done this
2862            # already, and starting experiments may be in odd states, so we
2863            # ignore errors releasing those allocations
2864            try: 
2865                for k, (uri, aid)  in term_params.items():
2866                    self.release_access(None, aid, uri=uri,
2867                            cert_file=cert_file, cert_pwd=pw)
2868            except service_error, e:
2869                if status != 'failed' and not force:
2870                    raise e
2871
2872        # Clean up the tmpdir no matter what
2873        finally:
2874            if tmpdir: self.remove_dirs(tmpdir)
2875
2876    def terminate_experiment(self, req, fid):
2877        """
2878        Swap this experiment out on the federants and delete the shared
2879        information
2880        """
2881        self.log.info("Terminate experiment call started for %s" % fid)
2882        tbparams = { }
2883        req = req.get('TerminateRequestBody', None)
2884        if not req:
2885            raise service_error(service_error.req,
2886                    "Bad request format (no TerminateRequestBody)")
2887
2888        key = self.get_experiment_key(req, 'experiment')
2889        try:
2890            proof = self.check_experiment_access(fid, key)
2891        except service_error, e:
2892            self.log.info(
2893                    "Terminate experiment call failed for %s: access denied" \
2894                            % fid)
2895            raise e
2896        exp = req.get('experiment', False)
2897        force = req.get('force', False)
2898
2899        dealloc_list = [ ]
2900
2901
2902        # Create a logger that logs to the dealloc_list as well as to the main
2903        # log file.
2904        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2905        dealloc_log.info("Terminating %s " %key)
2906        h = logging.StreamHandler(self.list_log(dealloc_list))
2907        # XXX: there should be a global one of these rather than repeating the
2908        # code.
2909        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2910                    '%d %b %y %H:%M:%S'))
2911        dealloc_log.addHandler(h)
2912
2913        self.state_lock.acquire()
2914        fed_exp = self.state.get(key, None)
2915        self.state_lock.release()
2916        repo = None
2917
2918        if fed_exp:
2919            status = self.check_termination_status(fed_exp, force)
2920            # get_termination_info updates the experiment state
2921            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2922            self.deallocate_resources(term_params, expcert, status, force, 
2923                    dealloc_log)
2924
2925            # Remove the terminated experiment
2926            self.state_lock.acquire()
2927            for id in ids:
2928                self.clear_experiment_authorization(id, need_state_lock=False)
2929                if id in self.state: del self.state[id]
2930
2931            if self.state_filename: self.write_state()
2932            self.state_lock.release()
2933
2934            # Delete any synch points associated with this experiment.  All
2935            # synch points begin with the fedid of the experiment.
2936            fedid_keys = set(["fedid:%s" % f for f in ids \
2937                    if isinstance(f, fedid)])
2938            for k in self.synch_store.all_keys():
2939                try:
2940                    if len(k) > 45 and k[0:46] in fedid_keys:
2941                        self.synch_store.del_value(k)
2942                except synch_store.BadDeletionError:
2943                    pass
2944            self.write_store()
2945
2946            # Remove software and other cached stuff from the filesystem.
2947            if repo:
2948                self.remove_dirs("%s/%s" % (self.repodir, repo))
2949       
2950            self.log.info("Terminate experiment succeeded for %s %s" % \
2951                    (key, fid))
2952            return { 
2953                    'experiment': exp , 
2954                    'deallocationLog': string.join(dealloc_list, ''),
2955                    'proof': [proof.to_dict()],
2956                    }
2957        else:
2958            self.log.info("Terminate experiment failed for %s %s: no state" % \
2959                    (key, fid))
2960            raise service_error(service_error.req, "No saved state")
2961
2962
2963    def GetValue(self, req, fid):
2964        """
2965        Get a value from the synchronized store
2966        """
2967        req = req.get('GetValueRequestBody', None)
2968        if not req:
2969            raise service_error(service_error.req,
2970                    "Bad request format (no GetValueRequestBody)")
2971       
2972        name = req.get('name', None)
2973        wait = req.get('wait', False)
2974        rv = { 'name': name }
2975
2976        if not name:
2977            raise service_error(service_error.req, "No name?")
2978
2979        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2980
2981        if access_ok:
2982            self.log.debug("[GetValue] asking for %s " % name)
2983            try:
2984                v = self.synch_store.get_value(name, wait)
2985            except synch_store.RevokedKeyError:
2986                # No more synch on this key
2987                raise service_error(service_error.federant, 
2988                        "Synch key %s revoked" % name)
2989            if v is not None:
2990                rv['value'] = v
2991            rv['proof'] = proof.to_dict()
2992            self.log.debug("[GetValue] got %s from %s" % (v, name))
2993            return rv
2994        else:
2995            raise service_error(service_error.access, "Access Denied",
2996                    proof=proof)
2997       
2998
2999    def SetValue(self, req, fid):
3000        """
3001        Set a value in the synchronized store
3002        """
3003        req = req.get('SetValueRequestBody', None)
3004        if not req:
3005            raise service_error(service_error.req,
3006                    "Bad request format (no SetValueRequestBody)")
3007       
3008        name = req.get('name', None)
3009        v = req.get('value', '')
3010
3011        if not name:
3012            raise service_error(service_error.req, "No name?")
3013
3014        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
3015
3016        if access_ok:
3017            try:
3018                self.synch_store.set_value(name, v)
3019                self.write_store()
3020                self.log.debug("[SetValue] set %s to %s" % (name, v))
3021            except synch_store.CollisionError:
3022                # Translate into a service_error
3023                raise service_error(service_error.req,
3024                        "Value already set: %s" %name)
3025            except synch_store.RevokedKeyError:
3026                # No more synch on this key
3027                raise service_error(service_error.federant, 
3028                        "Synch key %s revoked" % name)
3029                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
3030        else:
3031            raise service_error(service_error.access, "Access Denied",
3032                    proof=proof)
Note: See TracBrowser for help on using the repository browser.