source: fedd/federation/experiment_control.py @ 187a8f9

Last change on this file since 187a8f9 was 0b217d1, checked in by Ted Faber <faber@…>, 10 years ago

Strip some dead wood

  • Property mode set to 100644
File size: 99.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46from deter import topology_to_route_file
47import list_log
48
49
50class nullHandler(logging.Handler):
51    def emit(self, record): pass
52
53fl = logging.getLogger("fedd.experiment_control")
54fl.addHandler(nullHandler())
55
56class experiment_control_local(experiment_control_legacy):
57    """
58    Control of experiments that this system can directly access.
59
60    Includes experiment creation, termination and information dissemination.
61    Thred safe.
62    """
63
64    class ssh_cmd_timeout(RuntimeError): pass
65   
66    call_RequestAccess = service_caller('RequestAccess')
67    call_ReleaseAccess = service_caller('ReleaseAccess')
68    call_StartSegment = service_caller('StartSegment')
69    call_TerminateSegment = service_caller('TerminateSegment')
70    call_InfoSegment = service_caller('InfoSegment')
71    call_OperationSegment = service_caller('OperationSegment')
72    call_Ns2Topdl = service_caller('Ns2Topdl')
73
74    def __init__(self, config=None, auth=None):
75        """
76        Intialize the various attributes, most from the config object
77        """
78
79        def parse_tarfile_list(tf):
80            """
81            Parse a tarfile list from the configuration.  This is a set of
82            paths and tarfiles separated by spaces.
83            """
84            rv = [ ]
85            if tf is not None:
86                tl = tf.split()
87                while len(tl) > 1:
88                    p, t = tl[0:2]
89                    del tl[0:2]
90                    rv.append((p, t))
91            return rv
92
93        self.list_log = list_log.list_log
94
95        self.cert_file = config.get("experiment_control", "cert_file")
96        if self.cert_file:
97            self.cert_pwd = config.get("experiment_control", "cert_pwd")
98        else:
99            self.cert_file = config.get("globals", "cert_file")
100            self.cert_pwd = config.get("globals", "cert_pwd")
101
102        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
103                or config.get("globals", "trusted_certs")
104
105        self.repodir = config.get("experiment_control", "repodir")
106        self.repo_url = config.get("experiment_control", "repo_url", 
107                "https://users.isi.deterlab.net:23235");
108
109        self.exp_stem = "fed-stem"
110        self.log = logging.getLogger("fedd.experiment_control")
111        set_log_level(config, "experiment_control", self.log)
112        self.muxmax = 2
113        self.nthreads = 10
114        self.randomize_experiments = False
115
116        self.splitter = None
117        self.ssh_keygen = "/usr/bin/ssh-keygen"
118        self.ssh_identity_file = None
119
120
121        self.debug = config.getboolean("experiment_control", "create_debug")
122        self.cleanup = not config.getboolean("experiment_control", 
123                "leave_tmpfiles")
124        self.state_filename = config.get("experiment_control", 
125                "experiment_state")
126        self.store_filename = config.get("experiment_control", 
127                "synch_store")
128        self.store_url = config.get("experiment_control", "store_url")
129        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
130        self.grouper_url = config.get("experiment_control", "grouper_url")
131        self.fedkit = parse_tarfile_list(\
132                config.get("experiment_control", "fedkit"))
133        self.gatewaykit = parse_tarfile_list(\
134                config.get("experiment_control", "gatewaykit"))
135
136        dt = config.get("experiment_control", "direct_transit")
137        self.auth_type = config.get('experiment_control', 'auth_type') \
138                or 'legacy'
139        self.auth_dir = config.get('experiment_control', 'auth_dir')
140        self.routing = config.get('experiment_control', 'routing')
141        self.default_tb = config.get('experiment_control', 'default_testbed')
142        self.info_cache_limit = \
143                config.getint('experiment_control', 'info_cache', 600)
144        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
145        else: self.direct_transit = [ ]
146        # NB for internal master/slave ops, not experiment setup
147        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
148
149        self.overrides = set([])
150        ovr = config.get('experiment_control', 'overrides')
151        if ovr:
152            for o in ovr.split(","):
153                o = o.strip()
154                if o.startswith('fedid:'): o = o[len('fedid:'):]
155                self.overrides.add(fedid(hexstr=o))
156
157        self.state = { }
158        self.state_lock = Lock()
159        self.tclsh = "/usr/local/bin/otclsh"
160        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
161                config.get("experiment_control", "tcl_splitter",
162                        "/usr/testbed/lib/ns2ir/parse.tcl")
163        mapdb_file = config.get("experiment_control", "mapdb")
164        self.trace_file = sys.stderr
165
166        self.def_expstart = \
167                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
168                "/tmp/federate";
169        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
170                "FEDDIR/hosts";
171        self.def_gwstart = \
172                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
173                "/tmp/bridge.log";
174        self.def_mgwstart = \
175                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
176                "/tmp/bridge.log";
177        self.def_gwimage = "FBSD61-TUNNEL2";
178        self.def_gwtype = "pc";
179        self.local_access = { }
180
181        if self.auth_type == 'legacy':
182            if auth:
183                self.auth = auth
184            else:
185                self.log.error( "[access]: No authorizer initialized, " +\
186                        "creating local one.")
187                auth = authorizer()
188            self.get_access = self.legacy_get_access
189        elif self.auth_type == 'abac':
190            self.auth = abac_authorizer(load=self.auth_dir, 
191                    update=os.path.join(self.auth_dir,'update'))
192        else:
193            raise service_error(service_error.internal, 
194                    "Unknown auth_type: %s" % self.auth_type)
195
196        if mapdb_file:
197            self.read_mapdb(mapdb_file)
198        else:
199            self.log.warn("[experiment_control] No testbed map, using defaults")
200            self.tbmap = { 
201                    'deter':'https://users.isi.deterlab.net:23235',
202                    'emulab':'https://users.isi.deterlab.net:23236',
203                    'ucb':'https://users.isi.deterlab.net:23237',
204                    }
205
206        # Grab saved state.  OK to do this w/o locking because it's read only
207        # and only one thread should be in existence that can see self.state at
208        # this point.
209        if self.state_filename:
210            self.read_state()
211
212        if self.store_filename:
213            self.read_store()
214        else:
215            self.log.warning("No saved synch store")
216            self.synch_store = synch_store
217
218        # Dispatch tables
219        self.soap_services = {\
220                'New': soap_handler('New', self.new_experiment),
221                'Create': soap_handler('Create', self.create_experiment),
222                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
223                'Vis': soap_handler('Vis', self.get_vis),
224                'Info': soap_handler('Info', self.get_info),
225                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
226                'Operation': soap_handler('Operation', self.do_operation),
227                'Terminate': soap_handler('Terminate', 
228                    self.terminate_experiment),
229                'GetValue': soap_handler('GetValue', self.GetValue),
230                'SetValue': soap_handler('SetValue', self.SetValue),
231        }
232
233        self.xmlrpc_services = {\
234                'New': xmlrpc_handler('New', self.new_experiment),
235                'Create': xmlrpc_handler('Create', self.create_experiment),
236                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
237                'Vis': xmlrpc_handler('Vis', self.get_vis),
238                'Info': xmlrpc_handler('Info', self.get_info),
239                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
240                'Terminate': xmlrpc_handler('Terminate',
241                    self.terminate_experiment),
242                'Operation': xmlrpc_handler('Operation', self.do_operation),
243                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
244                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
245        }
246
247    # Call while holding self.state_lock
248    def write_state(self):
249        """
250        Write a new copy of experiment state after copying the existing state
251        to a backup.
252
253        State format is a simple pickling of the state dictionary.
254        """
255        if os.access(self.state_filename, os.W_OK):
256            copy_file(self.state_filename, \
257                    "%s.bak" % self.state_filename)
258        try:
259            f = open(self.state_filename, 'w')
260            pickle.dump(self.state, f)
261        except EnvironmentError, e:
262            self.log.error("Can't write file %s: %s" % \
263                    (self.state_filename, e))
264        except pickle.PicklingError, e:
265            self.log.error("Pickling problem: %s" % e)
266        except TypeError, e:
267            self.log.error("Pickling problem (TypeError): %s" % e)
268
269    @staticmethod
270    def get_alloc_ids(exp):
271        """
272        Used by read_store and read state.  This used to be worse.
273        """
274
275        return [ a.allocID for a in exp.get_all_allocations() ]
276
277
278    # Call while holding self.state_lock
279    def read_state(self):
280        """
281        Read a new copy of experiment state.  Old state is overwritten.
282
283        State format is a simple pickling of the state dictionary.
284        """
285       
286        try:
287            f = open(self.state_filename, "r")
288            self.state = pickle.load(f)
289            self.log.debug("[read_state]: Read state from %s" % \
290                    self.state_filename)
291        except EnvironmentError, e:
292            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
293                    % (self.state_filename, e))
294        except pickle.UnpicklingError, e:
295            self.log.warning(("[read_state]: No saved state: " + \
296                    "Unpickling failed: %s") % e)
297       
298        for s in self.state.values():
299            try:
300
301                eid = s.fedid
302                if eid : 
303                    if self.auth_type == 'legacy':
304                        # XXX: legacy
305                        # Give the owner rights to the experiment
306                        #self.auth.set_attribute(s['owner'], eid)
307                        # And holders of the eid as well
308                        self.auth.set_attribute(eid, eid)
309                        # allow overrides to control experiments as well
310                        for o in self.overrides:
311                            self.auth.set_attribute(o, eid)
312                        # Set permissions to allow reading of the software
313                        # repo, if any, as well.
314                        for a in self.get_alloc_ids(s):
315                            self.auth.set_attribute(a, 'repo/%s' % eid)
316                else:
317                    raise KeyError("No experiment id")
318            except KeyError, e:
319                self.log.warning("[read_state]: State ownership or identity " +\
320                        "misformatted in %s: %s" % (self.state_filename, e))
321
322    def read_mapdb(self, file):
323        """
324        Read a simple colon separated list of mappings for the
325        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
326        also adds testbeds to active if they include , active after
327        their name.
328        """
329
330        self.tbmap = { }
331        lineno =0
332        try:
333            f = open(file, "r")
334            for line in f:
335                lineno += 1
336                line = line.strip()
337                if line.startswith('#') or len(line) == 0:
338                    continue
339                try:
340                    label, url = line.split(':', 1)
341                    self.tbmap[label] = url
342                except ValueError, e:
343                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
344                            "map db: %s %s" % (lineno, line, e))
345        except EnvironmentError, e:
346            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
347                    "open %s: %s" % (file, e))
348        else:
349            f.close()
350
351    def read_store(self):
352        try:
353            self.synch_store = synch_store()
354            self.synch_store.load(self.store_filename)
355            self.log.debug("[read_store]: Read store from %s" % \
356                    self.store_filename)
357        except EnvironmentError, e:
358            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
359                    % (self.state_filename, e))
360            self.synch_store = synch_store()
361
362        # Set the initial permissions on data in the store.  XXX: This ad hoc
363        # authorization attribute initialization is getting out of hand.
364        # XXX: legacy
365        if self.auth_type == 'legacy':
366            for k in self.synch_store.all_keys():
367                try:
368                    if k.startswith('fedid:'):
369                        fid = fedid(hexstr=k[6:46])
370                        if self.state.has_key(fid):
371                            for a in self.get_alloc_ids(self.state[fid]):
372                                self.auth.set_attribute(a, k)
373                except ValueError, e:
374                    self.log.warn("Cannot deduce permissions for %s" % k)
375
376
377    def write_store(self):
378        """
379        Write a new copy of synch_store after writing current state
380        to a backup.  We use the internal synch_store pickle method to avoid
381        incinsistent data.
382
383        State format is a simple pickling of the store.
384        """
385        if os.access(self.store_filename, os.W_OK):
386            copy_file(self.store_filename, \
387                    "%s.bak" % self.store_filename)
388        try:
389            self.synch_store.save(self.store_filename)
390        except EnvironmentError, e:
391            self.log.error("Can't write file %s: %s" % \
392                    (self.store_filename, e))
393        except TypeError, e:
394            self.log.error("Pickling problem (TypeError): %s" % e)
395
396    # XXX this may belong somewhere else
397
398    def get_grouper_updates(self, fid):
399        if self.grouper_url is None: return
400        d = tempfile.mkdtemp()
401        try:
402            fstr = "%s" % fid
403            # XXX locking
404            zipname = os.path.join(d, 'grouper.zip')
405            dest = os.path.join(self.auth_dir, 'update')
406            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
407            f = open(zipname, 'w')
408            f.write(resp.read())
409            f.close()
410            zf = zipfile.ZipFile(zipname, 'r')
411            zf.extractall(dest)
412            zf.close()
413        except URLError, e:
414            self.log.error("Cannot connect to grouper: %s" % e)
415            pass
416        finally:
417            shutil.rmtree(d)
418
419
420    def remove_dirs(self, dir):
421        """
422        Remove the directory tree and all files rooted at dir.  Log any errors,
423        but continue.
424        """
425        self.log.debug("[removedirs]: removing %s" % dir)
426        try:
427            for path, dirs, files in os.walk(dir, topdown=False):
428                for f in files:
429                    os.remove(os.path.join(path, f))
430                for d in dirs:
431                    os.rmdir(os.path.join(path, d))
432            os.rmdir(dir)
433        except EnvironmentError, e:
434            self.log.error("Error deleting directory tree in %s" % e);
435
436    @staticmethod
437    def make_temp_certfile(expcert, tmpdir):
438        """
439        make a protected copy of the access certificate so the experiment
440        controller can act as the experiment principal.  mkstemp is the most
441        secure way to do that. The directory should be created by
442        mkdtemp.  Return the filename.
443        """
444        if expcert and tmpdir:
445            try:
446                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
447                f = os.fdopen(certf, 'w')
448                print >> f, expcert
449                f.close()
450            except EnvironmentError, e:
451                raise service_error(service_error.internal, 
452                        "Cannot create temp cert file?")
453            return certfn
454        else:
455            return None
456
457       
458    def generate_ssh_keys(self, dest, type="rsa" ):
459        """
460        Generate a set of keys for the gateways to use to talk.
461
462        Keys are of type type and are stored in the required dest file.
463        """
464        valid_types = ("rsa", "dsa")
465        t = type.lower();
466        if t not in valid_types: raise ValueError
467        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
468
469        try:
470            trace = open("/dev/null", "w")
471        except EnvironmentError:
472            raise service_error(service_error.internal,
473                    "Cannot open /dev/null??");
474
475        # May raise CalledProcessError
476        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
477        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
478        if rv != 0:
479            raise service_error(service_error.internal, 
480                    "Cannot generate nonce ssh keys.  %s return code %d" \
481                            % (self.ssh_keygen, rv))
482
483    def generate_seer_certs(self, destdir):
484        '''
485        Create a SEER ca cert and a node cert in destdir/ca.pem and
486        destdir/node.pem respectively.  These will be distributed throughout
487        the federated experiment.  This routine reports errors via
488        service_errors.
489        '''
490        openssl = '/usr/bin/openssl'
491        # All the filenames and parameters we need for openssl calls below
492        ca_key =os.path.join(destdir, 'ca.key') 
493        ca_pem = os.path.join(destdir, 'ca.pem')
494        node_key =os.path.join(destdir, 'node.key') 
495        node_pem = os.path.join(destdir, 'node.pem')
496        node_req = os.path.join(destdir, 'node.req')
497        node_signed = os.path.join(destdir, 'node.signed')
498        days = '%s' % (365 * 10)
499        serial = '%s' % random.randint(0, 1<<16)
500
501        try:
502            # Sequence of calls to create a CA key, create a ca cert, create a
503            # node key, node signing request, and finally a signed node
504            # certificate.
505            sequence = (
506                    (openssl, 'genrsa', '-out', ca_key, '1024'),
507                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
508                        ca_pem, '-days', days, '-subj', 
509                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
510                    (openssl, 'genrsa', '-out', node_key, '1024'),
511                    (openssl, 'req', '-new', '-key', node_key, '-out', 
512                        node_req, '-days', days, '-subj', 
513                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
514                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
515                        '-set_serial', serial, '-req', '-in', node_req, 
516                        '-out', node_signed, '-days', days),
517                )
518            # Do all that stuff; bail if there's an error, and push all the
519            # output to dev/null.
520            for cmd in sequence:
521                trace = open("/dev/null", "w")
522                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
523                if rv != 0:
524                    raise service_error(service_error.internal, 
525                            "Cannot generate SEER certs.  %s return code %d" \
526                                    % (' '.join(cmd), rv))
527            # Concatinate the node key and signed certificate into node.pem
528            f = open(node_pem, 'w')
529            for comp in (node_signed, node_key):
530                g = open(comp, 'r')
531                f.write(g.read())
532                g.close()
533            f.close()
534
535            # Throw out intermediaries.
536            for fn in (ca_key, node_key, node_req, node_signed):
537                os.unlink(fn)
538
539        except EnvironmentError, e:
540            # Any difficulties with the file system wind up here
541            raise service_error(service_error.internal,
542                    "File error on  %s while creating SEER certs: %s" % \
543                            (e.filename, e.strerror))
544
545
546
547    def gentopo(self, str):
548        """
549        Generate the topology data structure from the splitter's XML
550        representation of it.
551
552        The topology XML looks like:
553            <experiment>
554                <nodes>
555                    <node><vname></vname><ips>ip1:ip2</ips></node>
556                </nodes>
557                <lans>
558                    <lan>
559                        <vname></vname><vnode></vnode><ip></ip>
560                        <bandwidth></bandwidth><member>node:port</member>
561                    </lan>
562                </lans>
563        """
564        class topo_parse:
565            """
566            Parse the topology XML and create the dats structure.
567            """
568            def __init__(self):
569                # Typing of the subelements for data conversion
570                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
571                self.int_subelements = ( 'bandwidth',)
572                self.float_subelements = ( 'delay',)
573                # The final data structure
574                self.nodes = [ ]
575                self.lans =  [ ]
576                self.topo = { \
577                        'node': self.nodes,\
578                        'lan' : self.lans,\
579                    }
580                self.element = { }  # Current element being created
581                self.chars = ""     # Last text seen
582
583            def end_element(self, name):
584                # After each sub element the contents is added to the current
585                # element or to the appropriate list.
586                if name == 'node':
587                    self.nodes.append(self.element)
588                    self.element = { }
589                elif name == 'lan':
590                    self.lans.append(self.element)
591                    self.element = { }
592                elif name in self.str_subelements:
593                    self.element[name] = self.chars
594                    self.chars = ""
595                elif name in self.int_subelements:
596                    self.element[name] = int(self.chars)
597                    self.chars = ""
598                elif name in self.float_subelements:
599                    self.element[name] = float(self.chars)
600                    self.chars = ""
601
602            def found_chars(self, data):
603                self.chars += data.rstrip()
604
605
606        tp = topo_parse();
607        parser = xml.parsers.expat.ParserCreate()
608        parser.EndElementHandler = tp.end_element
609        parser.CharacterDataHandler = tp.found_chars
610
611        parser.Parse(str)
612
613        return tp.topo
614       
615
616    def genviz(self, topo):
617        """
618        Generate the visualization the virtual topology
619        """
620
621        neato = "/usr/local/bin/neato"
622        # These are used to parse neato output and to create the visualization
623        # file.
624        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
625        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
626                "%s</type></node>"
627
628        try:
629            # Node names
630            nodes = [ n['vname'] for n in topo['node'] ]
631            topo_lans = topo['lan']
632        except KeyError, e:
633            raise service_error(service_error.internal, "Bad topology: %s" %e)
634
635        lans = { }
636        links = { }
637
638        # Walk through the virtual topology, organizing the connections into
639        # 2-node connections (links) and more-than-2-node connections (lans).
640        # When a lan is created, it's added to the list of nodes (there's a
641        # node in the visualization for the lan).
642        for l in topo_lans:
643            if links.has_key(l['vname']):
644                if len(links[l['vname']]) < 2:
645                    links[l['vname']].append(l['vnode'])
646                else:
647                    nodes.append(l['vname'])
648                    lans[l['vname']] = links[l['vname']]
649                    del links[l['vname']]
650                    lans[l['vname']].append(l['vnode'])
651            elif lans.has_key(l['vname']):
652                lans[l['vname']].append(l['vnode'])
653            else:
654                links[l['vname']] = [ l['vnode'] ]
655
656
657        # Open up a temporary file for dot to turn into a visualization
658        try:
659            df, dotname = tempfile.mkstemp()
660            dotfile = os.fdopen(df, 'w')
661        except EnvironmentError:
662            raise service_error(service_error.internal,
663                    "Failed to open file in genviz")
664
665        try:
666            dnull = open('/dev/null', 'w')
667        except EnvironmentError:
668            service_error(service_error.internal,
669                    "Failed to open /dev/null in genviz")
670
671        # Generate a dot/neato input file from the links, nodes and lans
672        try:
673            print >>dotfile, "graph G {"
674            for n in nodes:
675                print >>dotfile, '\t"%s"' % n
676            for l in links.keys():
677                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
678            for l in lans.keys():
679                for n in lans[l]:
680                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
681            print >>dotfile, "}"
682            dotfile.close()
683        except TypeError:
684            raise service_error(service_error.internal,
685                    "Single endpoint link in vtopo")
686        except EnvironmentError:
687            raise service_error(service_error.internal, "Cannot write dot file")
688
689        # Use dot to create a visualization
690        try:
691            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
692                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
693                stderr=dnull, close_fds=True)
694        except EnvironmentError:
695            raise service_error(service_error.internal, 
696                    "Cannot generate visualization: is graphviz available?")
697        dnull.close()
698
699        # Translate dot to vis format
700        vis_nodes = [ ]
701        vis = { 'node': vis_nodes }
702        for line in dot.stdout:
703            m = vis_re.match(line)
704            if m:
705                vn = m.group(1)
706                vis_node = {'name': vn, \
707                        'x': float(m.group(2)),\
708                        'y' : float(m.group(3)),\
709                    }
710                if vn in links.keys() or vn in lans.keys():
711                    vis_node['type'] = 'lan'
712                else:
713                    vis_node['type'] = 'node'
714                vis_nodes.append(vis_node)
715        rv = dot.wait()
716
717        os.remove(dotname)
718        # XXX: graphviz seems to use low return codes for warnings, like
719        # "couldn't find font"
720        if rv < 2 : return vis
721        else: return None
722
723
724    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
725            cert_pwd=None):
726        """
727        Release access to testbed through fedd
728        """
729
730        if not uri and tbmap:
731            uri = tbmap.get(tb, None)
732        if not uri:
733            raise service_error(service_error.server_config, 
734                    "Unknown testbed: %s" % tb)
735
736        if self.local_access.has_key(uri):
737            resp = self.local_access[uri].ReleaseAccess(\
738                    { 'ReleaseAccessRequestBody' : 
739                        {'allocID': {'fedid': aid}},}, 
740                    fedid(file=cert_file))
741            resp = { 'ReleaseAccessResponseBody': resp } 
742        else:
743            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
744                    cert_file, cert_pwd, self.trusted_certs)
745
746        # better error coding
747
748    def remote_ns2topdl(self, uri, desc):
749
750        req = {
751                'description' : { 'ns2description': desc },
752            }
753
754        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
755                self.trusted_certs)
756
757        if r.has_key('Ns2TopdlResponseBody'):
758            r = r['Ns2TopdlResponseBody']
759            ed = r.get('experimentdescription', None)
760            if ed.has_key('topdldescription'):
761                return topdl.Topology(**ed['topdldescription'])
762            else:
763                raise service_error(service_error.protocol, 
764                        "Bad splitter response (no output)")
765        else:
766            raise service_error(service_error.protocol, "Bad splitter response")
767
768    class start_segment:
769        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
770                cert_pwd=None, trusted_certs=None, caller=None,
771                log_collector=None):
772            self.log = log
773            self.debug = debug
774            self.cert_file = cert_file
775            self.cert_pwd = cert_pwd
776            self.trusted_certs = None
777            self.caller = caller
778            self.testbed = testbed
779            self.log_collector = log_collector
780            self.response = None
781            self.node = { }
782            self.subs = { }
783            self.tb = { }
784            self.proof = None
785
786        def make_map(self, resp):
787            if 'segmentdescription' not in resp  or \
788                    'topdldescription' not in resp['segmentdescription']:
789                self.log.warn('No topology returned from startsegment')
790                return 
791
792            top = topdl.Topology(
793                    **resp['segmentdescription']['topdldescription'])
794
795            for e in top.elements:
796                if isinstance(e, topdl.Computer):
797                    self.node[e.name] = e
798                elif isinstance(e, topdl.Testbed):
799                    self.tb[e.uri] = e
800            for s in top.substrates:
801                self.subs[s.name] = s
802
803        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
804            req = {
805                    'allocID': { 'fedid' : aid }, 
806                    'segmentdescription': { 
807                        'topdldescription': topo.to_dict(),
808                    },
809                }
810
811            if connInfo:
812                req['connection'] = connInfo
813
814            import_svcs = [ s for m in masters.values() \
815                    for s in m if self.testbed in s.importers]
816
817            if import_svcs or self.testbed in masters:
818                req['service'] = []
819
820            for s in import_svcs:
821                for r in s.reqs:
822                    sr = copy.deepcopy(r)
823                    sr['visibility'] = 'import';
824                    req['service'].append(sr)
825
826            for s in masters.get(self.testbed, []):
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'export';
830                    req['service'].append(sr)
831
832            if attrs:
833                req['fedAttr'] = attrs
834
835            try:
836                self.log.debug("Calling StartSegment at %s " % uri)
837                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
838                        self.trusted_certs)
839                if r.has_key('StartSegmentResponseBody'):
840                    lval = r['StartSegmentResponseBody'].get('allocationLog',
841                            None)
842                    if lval and self.log_collector:
843                        for line in  lval.splitlines(True):
844                            self.log_collector.write(line)
845                    self.make_map(r['StartSegmentResponseBody'])
846                    if 'proof' in r: self.proof = r['proof']
847                    self.response = r
848                else:
849                    raise service_error(service_error.internal, 
850                            "Bad response!?: %s" %r)
851                return True
852            except service_error, e:
853                self.log.error("Start segment failed on %s: %s" % \
854                        (self.testbed, e))
855                return False
856
857
858
859    class terminate_segment:
860        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
861                cert_pwd=None, trusted_certs=None, caller=None):
862            self.log = log
863            self.debug = debug
864            self.cert_file = cert_file
865            self.cert_pwd = cert_pwd
866            self.trusted_certs = None
867            self.caller = caller
868            self.testbed = testbed
869
870        def __call__(self, uri, aid ):
871            req = {
872                    'allocID': {'fedid': aid }, 
873                }
874            self.log.info("Calling terminate segment")
875            try:
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                self.log.info("Terminate segment succeeded")
879                return True
880            except service_error, e:
881                self.log.error("Terminate segment failed on %s: %s" % \
882                        (self.testbed, e))
883                return False
884
885    class info_segment(start_segment):
886        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
887                cert_pwd=None, trusted_certs=None, caller=None,
888                log_collector=None):
889            experiment_control_local.start_segment.__init__(self, debug, 
890                    log, testbed, cert_file, cert_pwd, trusted_certs, 
891                    caller, log_collector)
892
893        def __call__(self, uri, aid):
894            req = { 'allocID': { 'fedid' : aid } }
895
896            try:
897                self.log.debug("Calling InfoSegment at %s " % uri)
898                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
899                        self.trusted_certs)
900                if r.has_key('InfoSegmentResponseBody'):
901                    self.make_map(r['InfoSegmentResponseBody'])
902                    if 'proof' in r: self.proof = r['proof']
903                    self.response = r
904                else:
905                    raise service_error(service_error.internal, 
906                            "Bad response!?: %s" %r)
907                return True
908            except service_error, e:
909                self.log.error("Info segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
912
913    class operation_segment:
914        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
915                cert_pwd=None, trusted_certs=None, caller=None,
916                log_collector=None):
917            self.log = log
918            self.debug = debug
919            self.cert_file = cert_file
920            self.cert_pwd = cert_pwd
921            self.trusted_certs = None
922            self.caller = caller
923            self.testbed = testbed
924            self.status = None
925
926        def __call__(self, uri, aid, op, targets, params):
927            req = { 
928                    'allocID': { 'fedid' : aid },
929                    'operation': op,
930                    'target': targets,
931                    }
932            if params: req['parameter'] = params
933
934
935            try:
936                self.log.debug("Calling OperationSegment at %s " % uri)
937                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
938                        self.trusted_certs)
939                if 'OperationSegmentResponseBody' in r:
940                    r = r['OperationSegmentResponseBody']
941                    if 'status' in r:
942                        self.status = r['status']
943                else:
944                    raise service_error(service_error.internal, 
945                            "Bad response!?: %s" %r)
946                return True
947            except service_error, e:
948                self.log.error("Operation segment failed on %s: %s" % \
949                        (self.testbed, e))
950                return False
951
952    def annotate_topology(self, top, data):
953        # These routines do various parts of the annotation
954        def add_new_names(nl, l):
955            """ add any names in nl to the list in l """
956            for n in nl:
957                if n not in l: l.append(n)
958       
959        def merge_services(ne, e):
960            for ns in ne.service:
961                # NB: the else is on the for
962                for s in e.service:
963                    if ns.name == s.name:
964                        s.importer = ns.importer
965                        s.param = ns.param
966                        s.description = ns.description
967                        s.status = ns.status
968                        break
969                else:
970                    e.service.append(ns)
971       
972        def merge_oses(ne, e):
973            """
974            Merge the operating system entries of ne into e
975            """
976            for nos in ne.os:
977                # NB: the else is on the for
978                for os in e.os:
979                    if nos.name == os.name:
980                        os.version = nos.version
981                        os.version = nos.distribution
982                        os.version = nos.distributionversion
983                        for a in nos.attribute:
984                            if os.get_attribute(a.attribute):
985                                os.remove_attribute(a.attribute)
986                            os.set_attribute(a.attribute, a.value)
987                        break
988                else:
989                    # If both nodes have one OS, this is a replacement
990                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
991                    else: e.os.append(nos)
992
993        # Annotate the topology with embedding info
994        for e in top.elements:
995            if isinstance(e, topdl.Computer):
996                for s in data:
997                    ne = s.node.get(e.name, None)
998                    if ne is not None:
999                        add_new_names(ne.localname, e.localname)
1000                        e.status = ne.status
1001                        merge_services(ne, e)
1002                        add_new_names(ne.operation, e.operation)
1003                        if ne.os: merge_oses(ne, e)
1004                        break
1005            elif isinstance(e,topdl.Testbed):
1006                for s in data:
1007                    ne = s.tb.get(e.uri, None)
1008                    if ne is not None:
1009                        add_new_names(ne.localname, e.localname)
1010                        add_new_names(ne.operation, e.operation)
1011                        merge_services(ne, e)
1012                        for a in ne.attribute:
1013                            e.set_attribute(a.attribute, a.value)
1014        # Annotate substrates
1015        for s in top.substrates:
1016            for d in data:
1017                ss = d.subs.get(s.name, None)
1018                if ss is not None:
1019                    if ss.capacity is not None:
1020                        s.capacity = ss.capacity
1021                    if s.latency is not None:
1022                        s.latency = ss.latency
1023
1024
1025
1026    def allocate_resources(self, allocated, masters, eid, expid, 
1027            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1028            attrs=None, connInfo={}, tbmap=None, expcert=None):
1029
1030        started = { }           # Testbeds where a sub-experiment started
1031                                # successfully
1032
1033        # XXX
1034        fail_soft = False
1035
1036        if tbmap is None: tbmap = { }
1037
1038        log = alloc_log or self.log
1039
1040        tp = thread_pool(self.nthreads)
1041        threads = [ ]
1042        starters = [ ]
1043
1044        if expcert:
1045            cert = expcert
1046            pw = None
1047        else:
1048            cert = self.cert_file
1049            pw = self.cert_pwd
1050
1051        for tb in allocated.keys():
1052            # Create and start a thread to start the segment, and save it
1053            # to get the return value later
1054            tb_attrs = copy.copy(attrs)
1055            tp.wait_for_slot()
1056            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1057            base, suffix = split_testbed(tb)
1058            if suffix:
1059                tb_attrs.append({'attribute': 'experiment_name', 
1060                    'value': "%s-%s" % (eid, suffix)})
1061            else:
1062                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1063            if not uri:
1064                raise service_error(service_error.internal, 
1065                        "Unknown testbed %s !?" % tb)
1066
1067            aid = tbparams[tb].allocID
1068            if not aid:
1069                raise service_error(service_error.internal, 
1070                        "No alloc id for testbed %s !?" % tb)
1071
1072            s = self.start_segment(log=log, debug=self.debug,
1073                    testbed=tb, cert_file=cert,
1074                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1075                    caller=self.call_StartSegment,
1076                    log_collector=log_collector)
1077            starters.append(s)
1078            t  = pooled_thread(\
1079                    target=s, name=tb,
1080                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1081                    pdata=tp, trace_file=self.trace_file)
1082            threads.append(t)
1083            t.start()
1084
1085        # Wait until all finish (keep pinging the log, though)
1086        mins = 0
1087        revoked = False
1088        while not tp.wait_for_all_done(60.0):
1089            mins += 1
1090            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1091                    % mins)
1092            if not revoked and \
1093                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1094                # a testbed has failed.  Revoke this experiment's
1095                # synchronizarion values so that sub experiments will not
1096                # deadlock waiting for synchronization that will never happen
1097                self.log.info("A subexperiment has failed to swap in, " + \
1098                        "revoking synch keys")
1099                var_key = "fedid:%s" % expid
1100                for k in self.synch_store.all_keys():
1101                    if len(k) > 45 and k[0:46] == var_key:
1102                        self.synch_store.revoke_key(k)
1103                revoked = True
1104
1105        failed = [ t.getName() for t in threads if not t.rv ]
1106        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1107
1108        # If one failed clean up, unless fail_soft is set
1109        if failed:
1110            if not fail_soft:
1111                tp.clear()
1112                for tb in succeeded:
1113                    # Create and start a thread to stop the segment
1114                    tp.wait_for_slot()
1115                    uri = tbparams[tb].uri
1116                    t  = pooled_thread(\
1117                            target=self.terminate_segment(log=log,
1118                                testbed=tb,
1119                                cert_file=cert, 
1120                                cert_pwd=pw,
1121                                trusted_certs=self.trusted_certs,
1122                                caller=self.call_TerminateSegment),
1123                            args=(uri, tbparams[tb].allocID),
1124                            name=tb,
1125                            pdata=tp, trace_file=self.trace_file)
1126                    t.start()
1127                # Wait until all finish (if any are being stopped)
1128                if succeeded:
1129                    tp.wait_for_all_done()
1130
1131                # release the allocations
1132                for tb in tbparams.keys():
1133                    try:
1134                        self.release_access(tb, tbparams[tb].allocID, 
1135                                tbmap=tbmap, uri=tbparams[tb].uri,
1136                                cert_file=cert, cert_pwd=pw)
1137                    except service_error, e:
1138                        self.log.warn("Error releasing access: %s" % e.desc)
1139                # Remove the placeholder
1140                self.state_lock.acquire()
1141                self.state[eid].status = 'failed'
1142                self.state[eid].updated()
1143                if self.state_filename: self.write_state()
1144                self.state_lock.release()
1145                # Remove the repo dir
1146                self.remove_dirs("%s/%s" %(self.repodir, expid))
1147                # Walk up tmpdir, deleting as we go
1148                if self.cleanup:
1149                    self.remove_dirs(tmpdir)
1150                else:
1151                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1152
1153
1154                log.error("Swap in failed on %s" % ",".join(failed))
1155                return
1156        else:
1157            # Walk through the successes and gather the proofs
1158            proofs = { }
1159            for s in starters:
1160                if s.proof:
1161                    proofs[s.testbed] = s.proof
1162            self.annotate_topology(top, starters)
1163            log.info("[start_segment]: Experiment %s active" % eid)
1164
1165
1166        # Walk up tmpdir, deleting as we go
1167        if self.cleanup:
1168            self.remove_dirs(tmpdir)
1169        else:
1170            log.debug("[start_experiment]: not removing %s" % tmpdir)
1171
1172        # Insert the experiment into our state and update the disk copy.
1173        self.state_lock.acquire()
1174        self.state[expid].status = 'active'
1175        self.state[eid] = self.state[expid]
1176        self.state[eid].top = top
1177        self.state[eid].updated()
1178        # Append startup proofs
1179        for f in self.state[eid].get_all_allocations():
1180            if f.tb in proofs:
1181                f.proof.append(proofs[f.tb])
1182
1183        if self.state_filename: self.write_state()
1184        self.state_lock.release()
1185        return
1186
1187
1188    def add_kit(self, e, kit):
1189        """
1190        Add a Software object created from the list of (install, location)
1191        tuples passed as kit  to the software attribute of an object e.  We
1192        do this enough to break out the code, but it's kind of a hack to
1193        avoid changing the old tuple rep.
1194        """
1195
1196        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1197
1198        if isinstance(e.software, list): e.software.extend(s)
1199        else: e.software = s
1200
1201    def append_experiment_authorization(self, expid, attrs, 
1202            need_state_lock=True):
1203        """
1204        Append the authorization information to system state
1205        """
1206
1207        for p, a in attrs:
1208            if p and a:
1209                # Improperly configured overrides can add bad principals.
1210                self.auth.set_attribute(p, a)
1211            else:
1212                self.log.debug("Bad append experiment_authorization %s %s" \
1213                        % (p, a))
1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
1236        self.auth.save()
1237
1238
1239    def create_experiment_state(self, fid, req, expid, expcert,
1240            state='starting'):
1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
1247        failed, it is overwritten.  Otherwise new letters are added until a
1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
1268                    status = self.state[eid].status
1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
1286            self.state[expid] = self.state[eid]
1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
1301            self.state[expid] = self.state[eid]
1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
1316        Add an ip4_address attribute to all the hosts in the topology that have
1317        not already been assigned one by the user, based on the shared
1318        substrates on which they sit.  An /etc/hosts file is also created and
1319        returned as a list of hostfiles entries.  We also return the allocator,
1320        because we may need to allocate IPs to portals
1321        (specifically DRAGON portals).
1322        """
1323        subs = sorted(top.substrates, 
1324                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1325                reverse=True)
1326        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1327        ifs = { }
1328        hosts = [ ]
1329
1330        assigned = { }
1331        assigned_subs = set()
1332        # Look for IP networks that were already assigned to the experiment by
1333        # the user.  We assume that users are smart in making that assignment,
1334        # but we check them later.
1335        for e in top.elements:
1336            if not isinstance(e, topdl.Computer): continue
1337            for inf in e.interface:
1338                a = inf.get_attribute('ip4_address')
1339                if not a: continue
1340                for s in inf.substrate:
1341                    assigned_subs.add(s)
1342                a = ip_addr(a)
1343                n = ip_addr(inf.get_attribute('ip4_netmask'))
1344
1345                sz = 0x100000000 - long(n)
1346                net = (long(a) & long(n)) & 0xffffffff
1347                if net in assigned:
1348                    if sz > assigned[net]: assigned[net] = sz
1349                assigned[net] = sz
1350
1351        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1352        assigned[long(ip_addr('10.0.23.0'))] = 256
1353        # end of hack
1354
1355        # Walk all substrates, smallest to largest, assigning IP addresses to
1356        # the unassigned and checking the assigned.  Build an /etc/hosts file
1357        # as well.
1358        for idx, s in enumerate(subs):
1359            if s.name not in assigned_subs:
1360                net_size = len(s.interfaces)+2
1361
1362                # Get an ip allocation for this substrate.  Make sure the
1363                # allocation does not collide with any user allocations.
1364                ok = False
1365                while not ok:
1366                    a = ips.allocate(net_size)
1367                    if not a:
1368                        raise service_error(service_error.req, 
1369                                "Cannot allocate IP addresses")
1370                    base, num = a
1371                    if num < net_size: 
1372                        raise service_error(service_error.internal,
1373                                "Allocator returned wrong number of IPs??")
1374                    # Check for overlap.  An assigned network could contain an
1375                    # endpoint of the new allocation or the new allocation
1376                    # could contain an endpoint of an assigned network.
1377                    # NB the else is attached to the for - if the loop is not
1378                    # exited by a break, ok = True.
1379                    for n, sz in assigned.items():
1380                        if base >= n and base < n + sz:
1381                            ok = False
1382                            break
1383                        if base + num > n and base + num  < n + sz:
1384                            ok = False
1385                            break
1386                        if n >= base and n < base + num:
1387                            ok = False
1388                            break
1389                        if n+sz > base and n+sz < base + num:
1390                            ok = False
1391                            break
1392                    else:
1393                        ok = True
1394                           
1395                mask = ips.min_alloc
1396                while mask < net_size:
1397                    mask *= 2
1398
1399                netmask = ((2**32-1) ^ (mask-1))
1400            else:
1401                base = 0
1402
1403            # Add the attributes and build hosts
1404            base += 1
1405            for i in s.interfaces:
1406                # Add address and masks to interfaces on unassigned substrates
1407                if s.name not in assigned_subs:
1408                    i.attribute.append(
1409                            topdl.Attribute('ip4_address', 
1410                                "%s" % ip_addr(base)))
1411                    i.attribute.append(
1412                            topdl.Attribute('ip4_netmask', 
1413                                "%s" % ip_addr(int(netmask))))
1414
1415                # Make /etc/hosts entries
1416                addr = i.get_attribute('ip4_address')
1417                if addr is None:
1418                    raise service_error(service_error.req, 
1419                            "Partially assigned substrate %s" %s.name)
1420                hname = i.element.name
1421                if hname in ifs:
1422                    hosts.append("%s\t%s-%s %s-%d" % \
1423                            (addr, hname, s.name, hname, ifs[hname]))
1424                else:
1425                    # First IP allocated it the default ip according to hosts,
1426                    # so the extra alias is here.
1427                    ifs[hname] = 0
1428                    hosts.append("%s\t%s-%s %s-%d %s" % \
1429                            (addr, hname, s.name, hname, ifs[hname], hname))
1430
1431                ifs[hname] += 1
1432                base += 1
1433        return hosts, ips
1434
1435    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1436            tbparam, masters, tbmap, expid=None, expcert=None):
1437        for tb in testbeds:
1438            self.log.debug('Trying testbed %s' % tb)
1439            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1440                    expcert)
1441            allocated[tb] = 1
1442            self.log.debug('Got testbed %s' % tb)
1443
1444    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1445            expcert=None):
1446        """
1447        Get access to testbed through fedd and set the parameters for that tb
1448        """
1449        def get_export_project(svcs):
1450            """
1451            Look through for the list of federated_service for this testbed
1452            objects for a project_export service, and extract the project
1453            parameter.
1454            """
1455
1456            pe = [s for s in svcs if s.name=='project_export']
1457            if len(pe) == 1:
1458                return pe[0].params.get('project', None)
1459            elif len(pe) == 0:
1460                return None
1461            else:
1462                raise service_error(service_error.req,
1463                        "More than one project export is not supported")
1464
1465        def add_services(svcs, type, slist, keys):
1466            """
1467            Add the given services to slist.  type is import or export.  Also
1468            add a mapping entry from the assigned id to the original service
1469            record.
1470            """
1471            for i, s in enumerate(svcs):
1472                idx = '%s%d' % (type, i)
1473                keys[idx] = s
1474                sr = {'id': idx, 'name': s.name, 'visibility': type }
1475                if s.params:
1476                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1477                            for k, v in s.params.items()]
1478                slist.append(sr)
1479
1480        uri = tbmap.get(testbed_base(tb), None)
1481        if not uri:
1482            raise service_error(service_error.server_config, 
1483                    "Unknown testbed: %s" % tb)
1484
1485        export_svcs = masters.get(tb,[])
1486        import_svcs = [ s for m in masters.values() \
1487                for s in m \
1488                    if tb in s.importers ]
1489
1490        export_project = get_export_project(export_svcs)
1491        # Compose the credential list so that IDs come before attributes
1492        creds = set()
1493        keys = set()
1494        certs = self.auth.get_creds_for_principal(fid)
1495        # Append credenials about this experiment controller - e.g. that it is
1496        # trusted.
1497        certs.update(self.auth.get_creds_for_principal(
1498            fedid(file=self.cert_file)))
1499        if expid:
1500            certs.update(self.auth.get_creds_for_principal(expid))
1501        for c in certs:
1502            keys.add(c.issuer_cert())
1503            creds.add(c.attribute_cert())
1504        creds = list(keys) + list(creds)
1505
1506        if expcert: cert, pw = expcert, None
1507        else: cert, pw = self.cert_file, self.cert_pw
1508
1509        # Request credentials
1510        req = {
1511                'abac_credential': creds,
1512            }
1513        # Make the service request from the services we're importing and
1514        # exporting.  Keep track of the export request ids so we can
1515        # collect the resulting info from the access response.
1516        e_keys = { }
1517        if import_svcs or export_svcs:
1518            slist = []
1519            add_services(import_svcs, 'import', slist, e_keys)
1520            add_services(export_svcs, 'export', slist, e_keys)
1521            req['service'] = slist
1522
1523        if self.local_access.has_key(uri):
1524            # Local access call
1525            req = { 'RequestAccessRequestBody' : req }
1526            r = self.local_access[uri].RequestAccess(req, 
1527                    fedid(file=self.cert_file))
1528            r = { 'RequestAccessResponseBody' : r }
1529        else:
1530            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1531
1532        if r.has_key('RequestAccessResponseBody'):
1533            # Through to here we have a valid response, not a fault.
1534            # Access denied is a fault, so something better or worse than
1535            # access denied has happened.
1536            r = r['RequestAccessResponseBody']
1537            self.log.debug("[get_access] Access granted")
1538        else:
1539            raise service_error(service_error.protocol,
1540                        "Bad proxy response")
1541        if 'proof' not in r:
1542            raise service_error(service_error.protocol,
1543                        "Bad access response (no access proof)")
1544
1545        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1546                tb=tb, uri=uri, proof=[r['proof']], 
1547                services=masters.get(tb, None))
1548
1549        # Collect the responses corresponding to the services this testbed
1550        # exports.  These will be the service requests that we will include in
1551        # the start segment requests (with appropriate visibility values) to
1552        # import and export the segments.
1553        for s in r.get('service', []):
1554            id = s.get('id', None)
1555            # Note that this attaches the response to the object in the masters
1556            # data structure.  (The e_keys index disappears when this fcn
1557            # returns)
1558            if id and id in e_keys:
1559                e_keys[id].reqs.append(s)
1560
1561        # Add attributes to parameter space.  We don't allow attributes to
1562        # overlay any parameters already installed.
1563        for a in r.get('fedAttr', []):
1564            try:
1565                if a['attribute']:
1566                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1567            except KeyError:
1568                self.log.error("Bad attribute in response: %s" % a)
1569
1570
1571    def split_topology(self, top, topo, testbeds):
1572        """
1573        Create the sub-topologies that are needed for experiment instantiation.
1574        """
1575        for tb in testbeds:
1576            topo[tb] = top.clone()
1577            # copy in for loop allows deletions from the original
1578            for e in [ e for e in topo[tb].elements]:
1579                etb = e.get_attribute('testbed')
1580                # NB: elements without a testbed attribute won't appear in any
1581                # sub topologies. 
1582                if not etb or etb != tb:
1583                    for i in e.interface:
1584                        for s in i.subs:
1585                            try:
1586                                s.interfaces.remove(i)
1587                            except ValueError:
1588                                raise service_error(service_error.internal,
1589                                        "Can't remove interface??")
1590                    topo[tb].elements.remove(e)
1591            topo[tb].make_indices()
1592
1593    def confirm_software(self, top):
1594        """
1595        Make sure that the software to be loaded in the topo is all available
1596        before we begin making access requests, etc.  This is a subset of
1597        wrangle_software.
1598        """
1599        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1600        pkgs.update([x.location for e in top.elements for x in e.software])
1601
1602        for pkg in pkgs:
1603            loc = pkg
1604
1605            scheme, host, path = urlparse(loc)[0:3]
1606            dest = os.path.basename(path)
1607            if not scheme:
1608                if not loc.startswith('/'):
1609                    loc = "/%s" % loc
1610                loc = "file://%s" %loc
1611            # NB: if scheme was found, loc == pkg
1612            try:
1613                u = urlopen(loc)
1614                u.close()
1615            except Exception, e:
1616                raise service_error(service_error.req, 
1617                        "Cannot open %s: %s" % (loc, e))
1618        return True
1619
1620    def wrangle_software(self, expid, top, topo, tbparams):
1621        """
1622        Copy software out to the repository directory, allocate permissions and
1623        rewrite the segment topologies to look for the software in local
1624        places.
1625        """
1626
1627        # Copy the rpms and tarfiles to a distribution directory from
1628        # which the federants can retrieve them
1629        linkpath = "%s/software" %  expid
1630        softdir ="%s/%s" % ( self.repodir, linkpath)
1631        softmap = { }
1632
1633        # self.fedkit and self.gateway kit are lists of tuples of
1634        # (install_location, download_location) this extracts the download
1635        # locations.
1636        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1637        pkgs.update([x.location for e in top.elements for x in e.software])
1638        try:
1639            os.makedirs(softdir)
1640        except EnvironmentError, e:
1641            raise service_error(
1642                    "Cannot create software directory: %s" % e)
1643        # The actual copying.  Everything's converted into a url for copying.
1644        auth_attrs = set()
1645        for pkg in pkgs:
1646            loc = pkg
1647
1648            scheme, host, path = urlparse(loc)[0:3]
1649            dest = os.path.basename(path)
1650            if not scheme:
1651                if not loc.startswith('/'):
1652                    loc = "/%s" % loc
1653                loc = "file://%s" %loc
1654            # NB: if scheme was found, loc == pkg
1655            try:
1656                u = urlopen(loc)
1657            except Exception, e:
1658                raise service_error(service_error.req, 
1659                        "Cannot open %s: %s" % (loc, e))
1660            try:
1661                f = open("%s/%s" % (softdir, dest) , "w")
1662                self.log.debug("Writing %s/%s" % (softdir,dest) )
1663                data = u.read(4096)
1664                while data:
1665                    f.write(data)
1666                    data = u.read(4096)
1667                f.close()
1668                u.close()
1669            except Exception, e:
1670                raise service_error(service_error.internal,
1671                        "Could not copy %s: %s" % (loc, e))
1672            path = re.sub("/tmp", "", linkpath)
1673            # XXX
1674            softmap[pkg] = \
1675                    "%s/%s/%s" %\
1676                    ( self.repo_url, path, dest)
1677
1678            # Allow the individual segments to access the software by assigning
1679            # an attribute to each testbed allocation that encodes the data to
1680            # be released.  This expression collects the data for each run of
1681            # the loop.
1682            auth_attrs.update([
1683                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1684                        for tb in tbparams.keys()])
1685
1686        self.append_experiment_authorization(expid, auth_attrs)
1687
1688        # Convert the software locations in the segments into the local
1689        # copies on this host
1690        for soft in [ s for tb in topo.values() \
1691                for e in tb.elements \
1692                    if getattr(e, 'software', False) \
1693                        for s in e.software ]:
1694            if softmap.has_key(soft.location):
1695                soft.location = softmap[soft.location]
1696
1697
1698    def new_experiment(self, req, fid):
1699        """
1700        The external interface to empty initial experiment creation called from
1701        the dispatcher.
1702
1703        Creates a working directory, splits the incoming description using the
1704        splitter script and parses out the avrious subsections using the
1705        lcasses above.  Once each sub-experiment is created, use pooled threads
1706        to instantiate them and start it all up.
1707        """
1708        self.log.info("New experiment call started for %s" % fid)
1709        req = req.get('NewRequestBody', None)
1710        if not req:
1711            raise service_error(service_error.req,
1712                    "Bad request format (no NewRequestBody)")
1713
1714        # import may partially succeed so always save credentials and warn
1715        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1716            self.log.debug("Failed to import delegation credentials(!)")
1717        self.get_grouper_updates(fid)
1718        self.auth.update()
1719        self.auth.save()
1720
1721        try:
1722            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1723                    with_proof=True)
1724        except service_error, e:
1725            self.log.info("New experiment call for %s: access denied" % fid)
1726            raise e
1727
1728
1729        if not access_ok:
1730            self.log.info("New experiment call for %s: Access denied" % fid)
1731            raise service_error(service_error.access, "New access denied",
1732                    proof=[proof])
1733
1734        try:
1735            tmpdir = tempfile.mkdtemp(prefix="split-")
1736        except EnvironmentError:
1737            raise service_error(service_error.internal, "Cannot create tmp dir")
1738
1739        # Generate an ID for the experiment (slice) and a certificate that the
1740        # allocator can use to prove they own it.  We'll ship it back through
1741        # the encrypted connection.  If the requester supplied one, use it.
1742        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1743            expcert = req['experimentAccess']['X509']
1744            expid = fedid(certstr=expcert)
1745            self.state_lock.acquire()
1746            if expid in self.state:
1747                self.state_lock.release()
1748                raise service_error(service_error.req, 
1749                        'fedid %s identifies an existing experiment' % expid)
1750            self.state_lock.release()
1751        else:
1752            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1753
1754        #now we're done with the tmpdir, and it should be empty
1755        if self.cleanup:
1756            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1757            os.rmdir(tmpdir)
1758        else:
1759            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1760
1761        eid = self.create_experiment_state(fid, req, expid, expcert, 
1762                state='empty')
1763
1764        rv = {
1765                'experimentID': [
1766                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1767                ],
1768                'experimentStatus': 'empty',
1769                'experimentAccess': { 'X509' : expcert },
1770                'proof': proof.to_dict(),
1771            }
1772
1773        self.log.info("New experiment call succeeded for %s" % fid)
1774        return rv
1775
1776    # create_experiment sub-functions
1777
1778    @staticmethod
1779    def get_experiment_key(req, field='experimentID'):
1780        """
1781        Parse the experiment identifiers out of the request (the request body
1782        tag has been removed).  Specifically this pulls either the fedid or the
1783        localname out of the experimentID field.  A fedid is preferred.  If
1784        neither is present or the request does not contain the fields,
1785        service_errors are raised.
1786        """
1787        # Get the experiment access
1788        exp = req.get(field, None)
1789        if exp:
1790            if exp.has_key('fedid'):
1791                key = exp['fedid']
1792            elif exp.has_key('localname'):
1793                key = exp['localname']
1794            else:
1795                raise service_error(service_error.req, "Unknown lookup type")
1796        else:
1797            raise service_error(service_error.req, "No request?")
1798
1799        return key
1800
1801    def get_experiment_ids_and_start(self, key, tmpdir):
1802        """
1803        Get the experiment name, id and access certificate from the state, and
1804        set the experiment state to 'starting'.  returns a triple (fedid,
1805        localname, access_cert_file). The access_cert_file is a copy of the
1806        contents of the access certificate, created in the tempdir with
1807        restricted permissions.  If things are confused, raise an exception.
1808        """
1809
1810        expid = eid = None
1811        self.state_lock.acquire()
1812        if key in self.state:
1813            exp = self.state[key]
1814            exp.status = "starting"
1815            exp.updated()
1816            expid = exp.fedid
1817            eid = exp.localname
1818            expcert = exp.identity
1819        self.state_lock.release()
1820
1821        # make a protected copy of the access certificate so the experiment
1822        # controller can act as the experiment principal.
1823        if expcert:
1824            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1825            if not expcert_file:
1826                raise service_error(service_error.internal, 
1827                        "Cannot create temp cert file?")
1828        else:
1829            expcert_file = None
1830
1831        return (eid, expid, expcert_file)
1832
1833    def get_topology(self, req, tmpdir):
1834        """
1835        Get the ns2 content and put it into a file for parsing.  Call the local
1836        or remote parser and return the topdl.Topology.  Errors result in
1837        exceptions.  req is the request and tmpdir is a work directory.
1838        """
1839
1840        # The tcl parser needs to read a file so put the content into that file
1841        descr=req.get('experimentdescription', None)
1842        if descr:
1843            if 'ns2description' in descr:
1844                file_content=descr['ns2description']
1845            elif 'topdldescription' in descr:
1846                return topdl.Topology(**descr['topdldescription'])
1847            else:
1848                raise service_error(service_error.req, 
1849                        'Unknown experiment description type')
1850        else:
1851            raise service_error(service_error.req, "No experiment description")
1852
1853
1854        if self.splitter_url:
1855            self.log.debug("Calling remote topdl translator at %s" % \
1856                    self.splitter_url)
1857            top = self.remote_ns2topdl(self.splitter_url, file_content)
1858        else:
1859            tclfile = os.path.join(tmpdir, "experiment.tcl")
1860            if file_content:
1861                try:
1862                    f = open(tclfile, 'w')
1863                    f.write(file_content)
1864                    f.close()
1865                except EnvironmentError:
1866                    raise service_error(service_error.internal,
1867                            "Cannot write temp experiment description")
1868            else:
1869                raise service_error(service_error.req, 
1870                        "Only ns2descriptions supported")
1871            pid = "dummy"
1872            gid = "dummy"
1873            eid = "dummy"
1874
1875            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1876                str(self.muxmax), '-m', 'dummy']
1877
1878            tclcmd.extend([pid, gid, eid, tclfile])
1879
1880            self.log.debug("running local splitter %s", " ".join(tclcmd))
1881            # This is just fantastic.  As a side effect the parser copies
1882            # tb_compat.tcl into the current directory, so that directory
1883            # must be writable by the fedd user.  Doing this in the
1884            # temporary subdir ensures this is the case.
1885            tclparser = Popen(tclcmd, stdout=PIPE, stderr=PIPE, close_fds=True, 
1886                    cwd=tmpdir)
1887            split_data, err_data = tclparser.communicate()
1888
1889            if tclparser.returncode != 0:
1890                os.remove(tclfile)
1891                raise service_error(service_error.req, 
1892                        "Cannot parse input ns2: %s" % err_data)
1893
1894            top = topdl.topology_from_xml(string=split_data, top="experiment")
1895            os.remove(tclfile)
1896
1897        return top
1898
1899    def get_testbed_services(self, req, testbeds):
1900        """
1901        Parse the services section of the request into two dicts mapping
1902        testbed to lists of federated_service objects.  The first dict maps all
1903        exporters of services to those service objects, the second maps
1904        testbeds to service objects only for services requiring portals.
1905        """
1906
1907        # Sanity check the services.  Exports or imports from unknown testbeds
1908        # cause an exception.
1909        for s in req.get('service', []):
1910            for t in s.get('import', []):
1911                if t not in testbeds:
1912                    raise service_error(service_error.req, 
1913                            'Service import to unknown testbed: %s' %t)
1914            for t in s.get('export', []):
1915                if t not in testbeds:
1916                    raise service_error(service_error.req, 
1917                            'Service export from unknown testbed: %s' %t)
1918
1919
1920        # We construct both dicts here because deriving the second is more
1921        # complex than it looks - both the keys and lists can differ, and it's
1922        # much easier to generate both in one pass.
1923        masters = { }
1924        pmasters = { }
1925        for s in req.get('service', []):
1926            # If this is a service request with the importall field
1927            # set, fill it out.
1928
1929            if s.get('importall', False):
1930                s['import'] = [ tb for tb in testbeds \
1931                        if tb not in s.get('export',[])]
1932                del s['importall']
1933
1934            # Add the service to masters
1935            for tb in s.get('export', []):
1936                if s.get('name', None):
1937
1938                    params = { }
1939                    for a in s.get('fedAttr', []):
1940                        params[a.get('attribute', '')] = a.get('value','')
1941
1942                    fser = federated_service(name=s['name'],
1943                            exporter=tb, importers=s.get('import',[]),
1944                            params=params)
1945                    if fser.name == 'hide_hosts' \
1946                            and 'hosts' not in fser.params:
1947                        fser.params['hosts'] = \
1948                                ",".join(tb_hosts.get(fser.exporter, []))
1949                    if tb in masters: masters[tb].append(fser)
1950                    else: masters[tb] = [fser]
1951
1952                    if fser.portal:
1953                        if tb in pmasters: pmasters[tb].append(fser)
1954                        else: pmasters[tb] = [fser]
1955                else:
1956                    self.log.error('Testbed service does not have name " + \
1957                            "and importers')
1958        return masters, pmasters
1959
1960    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1961        """
1962        Create the ssh keys necessary for interconnecting the portal nodes and
1963        the global hosts file for letting each segment know about the IP
1964        addresses in play.  If we have computed static routes, copy them into
1965        the repo. Save these into the repo.  Add attributes to the autorizer
1966        allowing access controllers to download them and return a set of
1967        attributes that inform the segments where to find this stuff.  May
1968        raise service_errors in if there are problems.
1969        """
1970        gw_pubkey_base = "fedgw_%s.pub" % self.ssh_type
1971        gw_secretkey_base = "fedgw_%s" % self.ssh_type
1972        keydir = os.path.join(tmpdir, 'keys')
1973        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1974        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1975        route = os.path.join(tmpdir, 'route.tgz')
1976
1977        try:
1978            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1979        except ValueError:
1980            raise service_error(service_error.server_config, 
1981                    "Bad key type (%s)" % self.ssh_type)
1982
1983        self.generate_seer_certs(keydir)
1984
1985        # Copy configuration files into the remote file store
1986        # The config urlpath
1987        configpath = "/%s/config" % expid
1988        # The config file system location
1989        configdir ="%s%s" % ( self.repodir, configpath)
1990        route_conf = os.path.join(configdir, 'route.tgz')
1991        try:
1992            os.makedirs(configdir)
1993        except EnvironmentError, e:
1994            raise service_error(service_error.internal,
1995                    "Cannot create config directory: %s" % e)
1996        try:
1997            f = open("%s/hosts" % configdir, "w")
1998            print >> f, string.join(hosts, '\n')
1999            f.close()
2000        except EnvironmentError, e:
2001            raise service_error(service_error.internal, 
2002                    "Cannot write hosts file: %s" % e)
2003        try:
2004            if os.path.exists(route):
2005                copy_file(route, route_conf)
2006
2007            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2008            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2009            copy_file(os.path.join(keydir, 'ca.pem'), 
2010                    os.path.join(configdir, 'ca.pem'))
2011            copy_file(os.path.join(keydir, 'node.pem'), 
2012                    os.path.join(configdir, 'node.pem'))
2013        except EnvironmentError, e:
2014            raise service_error(service_error.internal, 
2015                    "Cannot copy keyfiles: %s" % e)
2016
2017        # Allow the individual testbeds to access the configuration files,
2018        # again by setting an attribute for the relevant pathnames on each
2019        # allocation principal.  Yeah, that's a long list comprehension.
2020        self.append_experiment_authorization(expid, set([
2021            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2022                    for tb in tbparams.keys() \
2023                        for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz',
2024                            gw_secretkey_base, gw_pubkey_base)]))
2025
2026        attrs = [ 
2027                {
2028                    'attribute': 'ssh_pubkey', 
2029                    'value': '%s/%s/config/%s' % \
2030                            (self.repo_url, expid, gw_pubkey_base)
2031                },
2032                {
2033                    'attribute': 'ssh_secretkey', 
2034                    'value': '%s/%s/config/%s' % \
2035                            (self.repo_url, expid, gw_secretkey_base)
2036                },
2037                {
2038                    'attribute': 'hosts', 
2039                    'value': '%s/%s/config/hosts' % \
2040                            (self.repo_url, expid)
2041                },
2042                {
2043                    'attribute': 'seer_ca_pem', 
2044                    'value': '%s/%s/config/%s' % \
2045                            (self.repo_url, expid, 'ca.pem')
2046                },
2047                {
2048                    'attribute': 'seer_node_pem', 
2049                    'value': '%s/%s/config/%s' % \
2050                            (self.repo_url, expid, 'node.pem')
2051                },
2052            ]
2053        # Add info about static routes if we have some
2054        if os.path.exists(route_conf):
2055            attrs.append(
2056                {
2057                    'attribute': 'route.tgz', 
2058                    'value': '%s/%s/config/%s' % \
2059                            (self.repo_url, expid, 'route.tgz')
2060                }
2061            )
2062        return attrs
2063
2064
2065    def get_vtopo(self, req, fid):
2066        """
2067        Return the stored virtual topology for this experiment
2068        """
2069        rv = None
2070        state = None
2071        self.log.info("vtopo call started for %s" %  fid)
2072
2073        req = req.get('VtopoRequestBody', None)
2074        if not req:
2075            raise service_error(service_error.req,
2076                    "Bad request format (no VtopoRequestBody)")
2077        exp = req.get('experiment', None)
2078        if exp:
2079            if exp.has_key('fedid'):
2080                key = exp['fedid']
2081                keytype = "fedid"
2082            elif exp.has_key('localname'):
2083                key = exp['localname']
2084                keytype = "localname"
2085            else:
2086                raise service_error(service_error.req, "Unknown lookup type")
2087        else:
2088            raise service_error(service_error.req, "No request?")
2089
2090        try:
2091            proof = self.check_experiment_access(fid, key)
2092        except service_error, e:
2093            self.log.info("vtopo call failed for %s: access denied" %  fid)
2094            raise e
2095
2096        self.state_lock.acquire()
2097        # XXX: this needs to be recalculated
2098        if key in self.state:
2099            if self.state[key].top is not None:
2100                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2101                rv = { 'experiment' : {keytype: key },
2102                        'vtopo': vtopo,
2103                        'proof': proof.to_dict(), 
2104                    }
2105            else:
2106                state = self.state[key].status
2107        self.state_lock.release()
2108
2109        if rv: 
2110            self.log.info("vtopo call completed for %s %s " % \
2111                (key, fid))
2112            return rv
2113        else: 
2114            if state:
2115                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2116                    (key, fid))
2117                raise service_error(service_error.partial, 
2118                        "Not ready: %s" % state)
2119            else:
2120                self.log.info("vtopo call completed for %s %s (No experiment)"\
2121                        % (key, fid))
2122                raise service_error(service_error.req, "No such experiment")
2123
2124    def get_vis(self, req, fid):
2125        """
2126        Return the stored visualization for this experiment
2127        """
2128        rv = None
2129        state = None
2130
2131        self.log.info("vis call started for %s" %  fid)
2132        req = req.get('VisRequestBody', None)
2133        if not req:
2134            raise service_error(service_error.req,
2135                    "Bad request format (no VisRequestBody)")
2136        exp = req.get('experiment', None)
2137        if exp:
2138            if exp.has_key('fedid'):
2139                key = exp['fedid']
2140                keytype = "fedid"
2141            elif exp.has_key('localname'):
2142                key = exp['localname']
2143                keytype = "localname"
2144            else:
2145                raise service_error(service_error.req, "Unknown lookup type")
2146        else:
2147            raise service_error(service_error.req, "No request?")
2148
2149        try:
2150            proof = self.check_experiment_access(fid, key)
2151        except service_error, e:
2152            self.log.info("vis call failed for %s: access denied" %  fid)
2153            raise e
2154
2155        self.state_lock.acquire()
2156        # Generate the visualization
2157        if key in self.state:
2158            if self.state[key].top is not None:
2159                try:
2160                    vis = self.genviz(
2161                            topdl.topology_to_vtopo(self.state[key].top))
2162                except service_error, e:
2163                    self.state_lock.release()
2164                    raise e
2165                rv =  { 'experiment' : {keytype: key },
2166                        'vis': vis,
2167                        'proof': proof.to_dict(), 
2168                        }
2169            else:
2170                state = self.state[key].status
2171        self.state_lock.release()
2172
2173        if rv: 
2174            self.log.info("vis call completed for %s %s " % \
2175                (key, fid))
2176            return rv
2177        else:
2178            if state:
2179                self.log.info("vis call completed for %s %s (not ready)" % \
2180                    (key, fid))
2181                raise service_error(service_error.partial, 
2182                        "Not ready: %s" % state)
2183            else:
2184                self.log.info("vis call completed for %s %s (no experiment)" % \
2185                    (key, fid))
2186                raise service_error(service_error.req, "No such experiment")
2187
2188    @staticmethod
2189    def needs_route_computation(req):
2190        '''
2191        Walk the request services looking for a static routing request
2192        '''
2193        for s in req.get('service', []):
2194            if s.get('name', '') == 'static_routing':
2195                return True
2196        return False
2197
2198    def compute_static_routes(self, tmpdir, top):
2199        '''
2200        Compute a set of static routes for the topology.  The result is a file
2201        called route.tgz in tmpdir that contains a dinrectory, route, with a
2202        file per node in the topology that lists the prefix and router for all
2203        the routes in that node's routing table.  Exceptions from the route
2204        calculation program and the tar creation call are not caught.
2205        '''
2206        if self.routing is None:
2207            raise service_error(service_error.server, 
2208                    'Cannot provide staticroutes, no routing program specified')
2209        rg = tempfile.NamedTemporaryFile()
2210        outdir = os.path.join(tmpdir, 'route')
2211        tarfile = os.path.join(tmpdir, 'route.tgz')
2212        topology_to_route_file(top, file=rg)
2213        os.mkdir(outdir)
2214        try:
2215            for cmd in (
2216                    [self.routing, '--input', rg.name,'--output', outdir],
2217                    ['tar', '-C', tmpdir, '-czf', tarfile, 'route']):
2218                subprocess.check_call(cmd)
2219        except subprocess.CalledProcessError, e:
2220            raise service_error(service_error.internal, 
2221                    'Cannot call %s: %s' % (' '.join(cmd), e.returncode))
2222        rg.close()
2223        shutil.rmtree(outdir)
2224
2225
2226   
2227    def save_federant_information(self, allocated, tbparams, eid, top):
2228        """
2229        Store the various data that have changed in the experiment state
2230        between when it was started and the beginning of resource allocation.
2231        This is basically the information about each local allocation.  This
2232        fills in the values of the placeholder allocation in the state.  It
2233        also collects the access proofs and returns them as dicts for a
2234        response message.
2235        """
2236        self.state_lock.acquire()
2237        exp = self.state[eid]
2238        exp.top = top.clone()
2239        # save federant information
2240        for k in allocated.keys():
2241            exp.add_allocation(tbparams[k])
2242            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2243                type="testbed", localname=[k], 
2244                service=[ s.to_topdl() for s in tbparams[k].services]))
2245
2246        # Access proofs for the response message
2247        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2248                    for p in tbparams[k].proof]
2249        exp.updated()
2250        if self.state_filename: 
2251            self.write_state()
2252        self.state_lock.release()
2253        return proofs
2254
2255    def clear_placeholder(self, eid, expid, tmpdir):
2256        """
2257        Clear the placeholder and remove any allocated temporary dir.
2258        """
2259
2260        self.state_lock.acquire()
2261        del self.state[eid]
2262        del self.state[expid]
2263        if self.state_filename: self.write_state()
2264        self.state_lock.release()
2265        if tmpdir and self.cleanup:
2266            self.remove_dirs(tmpdir)
2267
2268    # end of create_experiment sub-functions
2269
2270    def create_experiment(self, req, fid):
2271        """
2272        The external interface to experiment creation called from the
2273        dispatcher.
2274
2275        Creates a working directory, splits the incoming description using the
2276        splitter script and parses out the various subsections using the
2277        classes above.  Once each sub-experiment is created, use pooled threads
2278        to instantiate them and start it all up.
2279        """
2280
2281        self.log.info("Create experiment call started for %s" % fid)
2282        req = req.get('CreateRequestBody', None)
2283        if req:
2284            key = self.get_experiment_key(req)
2285        else:
2286            raise service_error(service_error.req,
2287                    "Bad request format (no CreateRequestBody)")
2288
2289        # Import information from the requester
2290        # import may partially succeed so always save credentials and warn
2291        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2292            self.log.debug("Failed to import delegation credentials(!)")
2293        self.get_grouper_updates(fid)
2294        self.auth.update()
2295        self.auth.save()
2296
2297        try:
2298            # Make sure that the caller can talk to us
2299            proof = self.check_experiment_access(fid, key)
2300        except service_error, e:
2301            self.log.info("Create experiment call failed for %s: access denied"\
2302                    % fid)
2303            raise e
2304
2305
2306        # Install the testbed map entries supplied with the request into a copy
2307        # of the testbed map.
2308        tbmap = dict(self.tbmap)
2309        for m in req.get('testbedmap', []):
2310            if 'testbed' in m and 'uri' in m:
2311                tbmap[m['testbed']] = m['uri']
2312
2313        # a place to work
2314        try:
2315            tmpdir = tempfile.mkdtemp(prefix="split-")
2316            os.mkdir(tmpdir+"/keys")
2317        except EnvironmentError:
2318            raise service_error(service_error.internal, "Cannot create tmp dir")
2319
2320        tbparams = { }
2321
2322        eid, expid, expcert_file = \
2323                self.get_experiment_ids_and_start(key, tmpdir)
2324
2325        # This catches exceptions to clear the placeholder if necessary
2326        try: 
2327            if not (eid and expid):
2328                raise service_error(service_error.internal, 
2329                        "Cannot find local experiment info!?")
2330
2331            top = self.get_topology(req, tmpdir)
2332            self.confirm_software(top)
2333            # Assign the IPs
2334            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2335            if self.needs_route_computation(req):
2336                self.compute_static_routes(tmpdir, top)
2337            # Find the testbeds to look up
2338            tb_hosts = { }
2339            testbeds = [ ]
2340            for e in top.elements:
2341                if isinstance(e, topdl.Computer):
2342                    tb = e.get_attribute('testbed')
2343                    # Put nodes not in a testbed into the default testbed if
2344                    # there is one.
2345                    if tb is None:
2346                        if self.default_tb is None:
2347                            raise service_error(service_error.req, 
2348                                '%s not in a testbed (and no default)' % e.name)
2349                        tb = self.default_tb
2350                        e.set_attribute('testbed', tb)
2351                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2352                    else: 
2353                        tb_hosts[tb] = [ e.name ]
2354                        testbeds.append(tb)
2355
2356            masters, pmasters = self.get_testbed_services(req, testbeds)
2357            allocated = { }         # Testbeds we can access
2358            topo ={ }               # Sub topologies
2359            connInfo = { }          # Connection information
2360
2361            self.get_access_to_testbeds(testbeds, fid, allocated, 
2362                    tbparams, masters, tbmap, expid, expcert_file)
2363
2364            # tbactive is the set of testbeds that have NATs in front of their
2365            # portals. They need to initiate connections.
2366            tbactive = set([k for k, v in tbparams.items() \
2367                    if v.get_attribute('nat_portals')])
2368
2369            self.split_topology(top, topo, testbeds)
2370
2371            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2372
2373            part = experiment_partition(self.auth, self.store_url, tbmap,
2374                    self.muxmax, self.direct_transit, tbactive)
2375            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2376                    connInfo, expid)
2377
2378            auth_attrs = set()
2379            # Now get access to the dynamic testbeds (those added above)
2380            for tb in [ t for t in topo if t not in allocated]:
2381                self.get_access(tb, tbparams, fid, masters, tbmap, 
2382                        expid, expcert_file)
2383                allocated[tb] = 1
2384                store_keys = topo[tb].get_attribute('store_keys')
2385                # Give the testbed access to keys it exports or imports
2386                if store_keys:
2387                    auth_attrs.update(set([
2388                        (tbparams[tb].allocID, sk) \
2389                                for sk in store_keys.split(" ")]))
2390
2391            if auth_attrs:
2392                self.append_experiment_authorization(expid, auth_attrs)
2393
2394            # transit and disconnected testbeds may not have a connInfo entry.
2395            # Fill in the blanks.
2396            for t in allocated.keys():
2397                if not connInfo.has_key(t):
2398                    connInfo[t] = { }
2399
2400            self.wrangle_software(expid, top, topo, tbparams)
2401
2402            proofs = self.save_federant_information(allocated, tbparams, 
2403                    eid, top)
2404        except service_error, e:
2405            # If something goes wrong in the parse (usually an access error)
2406            # clear the placeholder state.  From here on out the code delays
2407            # exceptions.  Failing at this point returns a fault to the remote
2408            # caller.
2409
2410            self.log.info("Create experiment call failed for %s %s: %s" % 
2411                    (eid, fid, e))
2412            self.clear_placeholder(eid, expid, tmpdir)
2413            raise e
2414
2415        # Start the background swapper and return the starting state.  From
2416        # here on out, the state will stick around a while.
2417
2418        # Create a logger that logs to the experiment's state object as well as
2419        # to the main log file.
2420        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2421        alloc_collector = self.list_log(self.state[eid].log)
2422        h = logging.StreamHandler(alloc_collector)
2423        # XXX: there should be a global one of these rather than repeating the
2424        # code.
2425        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2426                    '%d %b %y %H:%M:%S'))
2427        alloc_log.addHandler(h)
2428
2429        # Start a thread to do the resource allocation
2430        t  = Thread(target=self.allocate_resources,
2431                args=(allocated, masters, eid, expid, tbparams, 
2432                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2433                    connInfo, tbmap, expcert_file),
2434                name=eid)
2435        t.start()
2436
2437        rv = {
2438                'experimentID': [
2439                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2440                ],
2441                'experimentStatus': 'starting',
2442                'proof': [ proof.to_dict() ] + proofs,
2443            }
2444        self.log.info("Create experiment call succeeded for %s %s" % \
2445                (eid, fid))
2446
2447        return rv
2448   
2449    def get_experiment_fedid(self, key):
2450        """
2451        find the fedid associated with the localname key in the state database.
2452        """
2453
2454        rv = None
2455        self.state_lock.acquire()
2456        if key in self.state:
2457            rv = self.state[key].fedid
2458        self.state_lock.release()
2459        return rv
2460
2461    def check_experiment_access(self, fid, key):
2462        """
2463        Confirm that the fid has access to the experiment.  Though a request
2464        may be made in terms of a local name, the access attribute is always
2465        the experiment's fedid.
2466        """
2467        if not isinstance(key, fedid):
2468            key = self.get_experiment_fedid(key)
2469
2470        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2471
2472        if access_ok:
2473            return proof
2474        else:
2475            raise service_error(service_error.access, "Access Denied",
2476                proof)
2477
2478
2479    def get_handler(self, path, fid):
2480        """
2481        Perhaps surprisingly named, this function handles HTTP GET requests to
2482        this server (SOAP requests are POSTs).
2483        """
2484        self.log.info("Get handler %s %s" % (path, fid))
2485        if len("%s" % fid) == 0:
2486            return (None, None)
2487        # XXX: log proofs?
2488        if self.auth.check_attribute(fid, path):
2489            return ("%s/%s" % (self.repodir, path), "application/binary")
2490        else:
2491            return (None, None)
2492
2493    def update_info(self, key, force=False):
2494        top = None
2495        self.state_lock.acquire()
2496        if key in self.state:
2497            if force or self.state[key].older_than(self.info_cache_limit):
2498                top = self.state[key].top
2499                if top is not None: top = top.clone()
2500                d1, info_params, cert, d2 = \
2501                        self.get_segment_info(self.state[key], need_lock=False)
2502        self.state_lock.release()
2503
2504        if top is None: return
2505
2506        try:
2507            tmpdir = tempfile.mkdtemp(prefix="info-")
2508        except EnvironmentError:
2509            raise service_error(service_error.internal, 
2510                    "Cannot create tmp dir")
2511        cert_file = self.make_temp_certfile(cert, tmpdir)
2512
2513        data = []
2514        try:
2515            for k, (uri, aid) in info_params.items():
2516                info=self.info_segment(log=self.log, testbed=uri,
2517                            cert_file=cert_file, cert_pwd=None,
2518                            trusted_certs=self.trusted_certs,
2519                            caller=self.call_InfoSegment)
2520                info(uri, aid)
2521                data.append(info)
2522        # Clean up the tmpdir no matter what
2523        finally:
2524            if tmpdir: self.remove_dirs(tmpdir)
2525
2526        self.annotate_topology(top, data)
2527        self.state_lock.acquire()
2528        if key in self.state:
2529            self.state[key].top = top
2530            self.state[key].updated()
2531            if self.state_filename: self.write_state()
2532        self.state_lock.release()
2533
2534   
2535    def get_info(self, req, fid):
2536        """
2537        Return all the stored info about this experiment
2538        """
2539        rv = None
2540
2541        self.log.info("Info call started for %s" %  fid)
2542        req = req.get('InfoRequestBody', None)
2543        if not req:
2544            raise service_error(service_error.req,
2545                    "Bad request format (no InfoRequestBody)")
2546        exp = req.get('experiment', None)
2547        legacy = req.get('legacy', False)
2548        fresh = req.get('fresh', False)
2549        if exp:
2550            if exp.has_key('fedid'):
2551                key = exp['fedid']
2552                keytype = "fedid"
2553            elif exp.has_key('localname'):
2554                key = exp['localname']
2555                keytype = "localname"
2556            else:
2557                raise service_error(service_error.req, "Unknown lookup type")
2558        else:
2559            raise service_error(service_error.req, "No request?")
2560
2561        try:
2562            proof = self.check_experiment_access(fid, key)
2563        except service_error, e:
2564            self.log.info("Info call failed for %s: access denied" %  fid)
2565
2566
2567        self.update_info(key, fresh)
2568
2569        self.state_lock.acquire()
2570        if self.state.has_key(key):
2571            rv = self.state[key].get_info()
2572            # Copy the topo if we need legacy annotations
2573            if legacy:
2574                top = self.state[key].top
2575                if top is not None: top = top.clone()
2576        self.state_lock.release()
2577        self.log.info("Gathered Info for %s %s" % (key, fid))
2578
2579        # If the legacy visualization and topology representations are
2580        # requested, calculate them and add them to the return.
2581        if legacy and rv is not None:
2582            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2583            if top is not None:
2584                vtopo = topdl.topology_to_vtopo(top)
2585                if vtopo is not None:
2586                    rv['vtopo'] = vtopo
2587                    try:
2588                        vis = self.genviz(vtopo)
2589                    except service_error, e:
2590                        self.log.debug('Problem generating visualization: %s' \
2591                                % e)
2592                        vis = None
2593                    if vis is not None:
2594                        rv['vis'] = vis
2595        if rv:
2596            self.log.info("Info succeded for %s %s" % (key, fid))
2597            rv['proof'] = proof.to_dict()
2598            return rv
2599        else: 
2600            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2601            raise service_error(service_error.req, "No such experiment")
2602
2603    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2604            results):
2605        """
2606        Call OperateSegment on multiple testbeds and gather the results.
2607        op_params contains the parameters needed to contact that testbed, cert
2608        is a certificate containing the fedid to use, op is the operation,
2609        testbeds is a dict mapping testbed name to targets in that testbed,
2610        params are the parameters to include a,d results is a growing list of
2611        the results of the calls.
2612        """
2613        try:
2614            tmpdir = tempfile.mkdtemp(prefix="info-")
2615        except EnvironmentError:
2616            raise service_error(service_error.internal, 
2617                    "Cannot create tmp dir")
2618        cert_file = self.make_temp_certfile(cert, tmpdir)
2619
2620        try:
2621            for tb, targets in testbeds.items():
2622                if tb in op_params:
2623                    uri, aid = op_params[tb]
2624                    operate=self.operation_segment(log=self.log, testbed=uri,
2625                                cert_file=cert_file, cert_pwd=None,
2626                                trusted_certs=self.trusted_certs,
2627                                caller=self.call_OperationSegment)
2628                    if operate(uri, aid, op, targets, params):
2629                        if operate.status is not None:
2630                            results.extend(operate.status)
2631                            continue
2632                # Something went wrong in a weird way.  Add statuses
2633                # that reflect that to results
2634                for t in targets:
2635                    results.append(operation_status(t, 
2636                        operation_status.federant,
2637                        'Unexpected error on %s' % tb))
2638        # Clean up the tmpdir no matter what
2639        finally:
2640            if tmpdir: self.remove_dirs(tmpdir)
2641
2642    def do_operation(self, req, fid):
2643        """
2644        Find the testbeds holding each target and ask them to carry out the
2645        operation.  Return the statuses.
2646        """
2647        # Map an element to the testbed containing it
2648        def element_to_tb(e):
2649            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2650            elif isinstance(e, topdl.Testbed): return e.name
2651            else: return None
2652        # If d is an operation_status object, make it a dict
2653        def make_dict(d):
2654            if isinstance(d, dict): return d
2655            elif isinstance(d, operation_status): return d.to_dict()
2656            else: return { }
2657
2658        def element_name(e):
2659            if isinstance(e, topdl.Computer): return e.name
2660            elif isinstance(e, topdl.Testbed): 
2661                if e.localname: return e.localname[0]
2662                else: return None
2663            else: return None
2664
2665        self.log.info("Operation call started for %s" %  fid)
2666        req = req.get('OperationRequestBody', None)
2667        if not req:
2668            raise service_error(service_error.req,
2669                    "Bad request format (no OperationRequestBody)")
2670        exp = req.get('experiment', None)
2671        op = req.get('operation', None)
2672        targets = set(req.get('target', []))
2673        params = req.get('parameter', None)
2674
2675        if exp:
2676            if 'fedid' in exp:
2677                key = exp['fedid']
2678                keytype = "fedid"
2679            elif 'localname' in exp:
2680                key = exp['localname']
2681                keytype = "localname"
2682            else:
2683                raise service_error(service_error.req, "Unknown lookup type")
2684        else:
2685            raise service_error(service_error.req, "No request?")
2686
2687        if op is None or not targets:
2688            raise service_error(service_error.req, "No request?")
2689
2690        try:
2691            proof = self.check_experiment_access(fid, key)
2692        except service_error, e:
2693            self.log.info("Operation call failed for %s: access denied" %  fid)
2694            raise e
2695
2696        self.state_lock.acquire()
2697        if key in self.state:
2698            d1, op_params, cert, d2 = \
2699                    self.get_segment_info(self.state[key], need_lock=False,
2700                            key='tb')
2701            top = self.state[key].top
2702            if top is not None:
2703                top = top.clone()
2704        self.state_lock.release()
2705
2706        if top is None:
2707            self.log.info("Operation call failed for %s: not active" %  fid)
2708            raise service_error(service_error.partial, "No topology yet", 
2709                    proof=proof)
2710
2711        testbeds = { }
2712        results = []
2713        for e in top.elements:
2714            ename = element_name(e)
2715            if ename in targets:
2716                tb = element_to_tb(e)
2717                targets.remove(ename)
2718                if tb is not None:
2719                    if tb in testbeds: testbeds[tb].append(ename)
2720                    else: testbeds[tb] = [ ename ]
2721                else:
2722                    results.append(operation_status(e.name, 
2723                        code=operation_status.no_target, 
2724                        description='Cannot map target to testbed'))
2725
2726        for t in targets:
2727            results.append(operation_status(t, operation_status.no_target))
2728
2729        self.operate_on_segments(op_params, cert, op, testbeds, params,
2730                results)
2731
2732        self.log.info("Operation call succeeded for %s" %  fid)
2733        return { 
2734                'experiment': exp, 
2735                'status': [make_dict(r) for r in results],
2736                'proof': proof.to_dict()
2737                }
2738
2739
2740    def get_multi_info(self, req, fid):
2741        """
2742        Return all the stored info that this fedid can access
2743        """
2744        rv = { 'info': [ ], 'proof': [ ] }
2745
2746        self.log.info("Multi Info call started for %s" %  fid)
2747        self.get_grouper_updates(fid)
2748        self.auth.update()
2749        self.auth.save()
2750        self.state_lock.acquire()
2751        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2752            try:
2753                proof = self.check_experiment_access(fid, key)
2754            except service_error, e:
2755                if e.code == service_error.access:
2756                    continue
2757                else:
2758                    self.log.info("Multi Info call failed for %s: %s" %  \
2759                            (e,fid))
2760                    self.state_lock.release()
2761                    raise e
2762
2763            if self.state.has_key(key):
2764                e = self.state[key].get_info()
2765                e['proof'] = proof.to_dict()
2766                rv['info'].append(e)
2767                rv['proof'].append(proof.to_dict())
2768        self.state_lock.release()
2769        self.log.info("Multi Info call succeeded for %s" %  fid)
2770        return rv
2771
2772    def check_termination_status(self, fed_exp, force):
2773        """
2774        Confirm that the experiment is sin a valid state to stop (or force it)
2775        return the state - invalid states for deletion and force settings cause
2776        exceptions.
2777        """
2778        self.state_lock.acquire()
2779        status = fed_exp.status
2780
2781        if status:
2782            if status in ('starting', 'terminating'):
2783                if not force:
2784                    self.state_lock.release()
2785                    raise service_error(service_error.partial, 
2786                            'Experiment still being created or destroyed')
2787                else:
2788                    self.log.warning('Experiment in %s state ' % status + \
2789                            'being terminated by force.')
2790            self.state_lock.release()
2791            return status
2792        else:
2793            # No status??? trouble
2794            self.state_lock.release()
2795            raise service_error(service_error.internal,
2796                    "Experiment has no status!?")
2797
2798    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2799        ids = []
2800        term_params = { }
2801        if need_lock: self.state_lock.acquire()
2802        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2803        expcert = fed_exp.identity
2804        repo = "%s" % fed_exp.fedid
2805
2806        # Collect the allocation/segment ids into a dict keyed by the fedid
2807        # of the allocation that contains a tuple of uri, aid
2808        for i, fed in enumerate(fed_exp.get_all_allocations()):
2809            uri = fed.uri
2810            aid = fed.allocID
2811            if key == 'aid': term_params[aid] = (uri, aid)
2812            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2813
2814        if need_lock: self.state_lock.release()
2815        return ids, term_params, expcert, repo
2816
2817
2818    def get_termination_info(self, fed_exp):
2819        self.state_lock.acquire()
2820        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2821        # Change the experiment state
2822        fed_exp.status = 'terminating'
2823        fed_exp.updated()
2824        if self.state_filename: self.write_state()
2825        self.state_lock.release()
2826
2827        return ids, term_params, expcert, repo
2828
2829
2830    def deallocate_resources(self, term_params, expcert, status, force, 
2831            dealloc_log):
2832        tmpdir = None
2833        # This try block makes sure the tempdir is cleared
2834        try:
2835            # If no expcert, try the deallocation as the experiment
2836            # controller instance.
2837            if expcert and self.auth_type != 'legacy': 
2838                try:
2839                    tmpdir = tempfile.mkdtemp(prefix="term-")
2840                except EnvironmentError:
2841                    raise service_error(service_error.internal, 
2842                            "Cannot create tmp dir")
2843                cert_file = self.make_temp_certfile(expcert, tmpdir)
2844                pw = None
2845            else: 
2846                cert_file = self.cert_file
2847                pw = self.cert_pwd
2848
2849            # Stop everyone.  NB, wait_for_all waits until a thread starts
2850            # and then completes, so we can't wait if nothing starts.  So,
2851            # no tbparams, no start.
2852            if len(term_params) > 0:
2853                tp = thread_pool(self.nthreads)
2854                for k, (uri, aid) in term_params.items():
2855                    # Create and start a thread to stop the segment
2856                    tp.wait_for_slot()
2857                    t  = pooled_thread(\
2858                            target=self.terminate_segment(log=dealloc_log,
2859                                testbed=uri,
2860                                cert_file=cert_file, 
2861                                cert_pwd=pw,
2862                                trusted_certs=self.trusted_certs,
2863                                caller=self.call_TerminateSegment),
2864                            args=(uri, aid), name=k,
2865                            pdata=tp, trace_file=self.trace_file)
2866                    t.start()
2867                # Wait for completions
2868                tp.wait_for_all_done()
2869
2870            # release the allocations (failed experiments have done this
2871            # already, and starting experiments may be in odd states, so we
2872            # ignore errors releasing those allocations
2873            try: 
2874                for k, (uri, aid)  in term_params.items():
2875                    self.release_access(None, aid, uri=uri,
2876                            cert_file=cert_file, cert_pwd=pw)
2877            except service_error, e:
2878                if status != 'failed' and not force:
2879                    raise e
2880
2881        # Clean up the tmpdir no matter what
2882        finally:
2883            if tmpdir: self.remove_dirs(tmpdir)
2884
2885    def terminate_experiment(self, req, fid):
2886        """
2887        Swap this experiment out on the federants and delete the shared
2888        information
2889        """
2890        self.log.info("Terminate experiment call started for %s" % fid)
2891        tbparams = { }
2892        req = req.get('TerminateRequestBody', None)
2893        if not req:
2894            raise service_error(service_error.req,
2895                    "Bad request format (no TerminateRequestBody)")
2896
2897        key = self.get_experiment_key(req, 'experiment')
2898        try:
2899            proof = self.check_experiment_access(fid, key)
2900        except service_error, e:
2901            self.log.info(
2902                    "Terminate experiment call failed for %s: access denied" \
2903                            % fid)
2904            raise e
2905        exp = req.get('experiment', False)
2906        force = req.get('force', False)
2907
2908        dealloc_list = [ ]
2909
2910
2911        # Create a logger that logs to the dealloc_list as well as to the main
2912        # log file.
2913        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2914        dealloc_log.info("Terminating %s " %key)
2915        h = logging.StreamHandler(self.list_log(dealloc_list))
2916        # XXX: there should be a global one of these rather than repeating the
2917        # code.
2918        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2919                    '%d %b %y %H:%M:%S'))
2920        dealloc_log.addHandler(h)
2921
2922        self.state_lock.acquire()
2923        fed_exp = self.state.get(key, None)
2924        self.state_lock.release()
2925        repo = None
2926
2927        if fed_exp:
2928            status = self.check_termination_status(fed_exp, force)
2929            # get_termination_info updates the experiment state
2930            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2931            self.deallocate_resources(term_params, expcert, status, force, 
2932                    dealloc_log)
2933
2934            # Remove the terminated experiment
2935            self.state_lock.acquire()
2936            for id in ids:
2937                self.clear_experiment_authorization(id, need_state_lock=False)
2938                if id in self.state: del self.state[id]
2939
2940            if self.state_filename: self.write_state()
2941            self.state_lock.release()
2942
2943            # Delete any synch points associated with this experiment.  All
2944            # synch points begin with the fedid of the experiment.
2945            fedid_keys = set(["fedid:%s" % f for f in ids \
2946                    if isinstance(f, fedid)])
2947            for k in self.synch_store.all_keys():
2948                try:
2949                    if len(k) > 45 and k[0:46] in fedid_keys:
2950                        self.synch_store.del_value(k)
2951                except synch_store.BadDeletionError:
2952                    pass
2953            self.write_store()
2954
2955            # Remove software and other cached stuff from the filesystem.
2956            if repo:
2957                self.remove_dirs("%s/%s" % (self.repodir, repo))
2958       
2959            self.log.info("Terminate experiment succeeded for %s %s" % \
2960                    (key, fid))
2961            return { 
2962                    'experiment': exp , 
2963                    'deallocationLog': string.join(dealloc_list, ''),
2964                    'proof': [proof.to_dict()],
2965                    }
2966        else:
2967            self.log.info("Terminate experiment failed for %s %s: no state" % \
2968                    (key, fid))
2969            raise service_error(service_error.req, "No saved state")
2970
2971
2972    def GetValue(self, req, fid):
2973        """
2974        Get a value from the synchronized store
2975        """
2976        req = req.get('GetValueRequestBody', None)
2977        if not req:
2978            raise service_error(service_error.req,
2979                    "Bad request format (no GetValueRequestBody)")
2980       
2981        name = req.get('name', None)
2982        wait = req.get('wait', False)
2983        rv = { 'name': name }
2984
2985        if not name:
2986            raise service_error(service_error.req, "No name?")
2987
2988        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2989
2990        if access_ok:
2991            self.log.debug("[GetValue] asking for %s " % name)
2992            try:
2993                v = self.synch_store.get_value(name, wait)
2994            except synch_store.RevokedKeyError:
2995                # No more synch on this key
2996                raise service_error(service_error.federant, 
2997                        "Synch key %s revoked" % name)
2998            if v is not None:
2999                rv['value'] = v
3000            rv['proof'] = proof.to_dict()
3001            self.log.debug("[GetValue] got %s from %s" % (v, name))
3002            return rv
3003        else:
3004            raise service_error(service_error.access, "Access Denied",
3005                    proof=proof)
3006       
3007
3008    def SetValue(self, req, fid):
3009        """
3010        Set a value in the synchronized store
3011        """
3012        req = req.get('SetValueRequestBody', None)
3013        if not req:
3014            raise service_error(service_error.req,
3015                    "Bad request format (no SetValueRequestBody)")
3016       
3017        name = req.get('name', None)
3018        v = req.get('value', '')
3019
3020        if not name:
3021            raise service_error(service_error.req, "No name?")
3022
3023        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
3024
3025        if access_ok:
3026            try:
3027                self.synch_store.set_value(name, v)
3028                self.write_store()
3029                self.log.debug("[SetValue] set %s to %s" % (name, v))
3030            except synch_store.CollisionError:
3031                # Translate into a service_error
3032                raise service_error(service_error.req,
3033                        "Value already set: %s" %name)
3034            except synch_store.RevokedKeyError:
3035                # No more synch on this key
3036                raise service_error(service_error.federant, 
3037                        "Synch key %s revoked" % name)
3038                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
3039        else:
3040            raise service_error(service_error.access, "Access Denied",
3041                    proof=proof)
Note: See TracBrowser for help on using the repository browser.