source: fedd/federation/experiment_control.py @ 51d3aa0

Last change on this file since 51d3aa0 was 6b439fa, checked in by Ted Faber <faber@…>, 11 years ago

Change the name of the ssh key to avoid confusing some ssh implementations.

  • Property mode set to 100644
File size: 99.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15import shutil
16import zipfile
17
18import os.path
19
20import traceback
21# For parsing visualization output and splitter output
22import xml.parsers.expat
23
24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
26from string import join
27
28from urlparse import urlparse
29from urllib2 import urlopen, URLError
30
31from util import *
32from deter import fedid, generate_fedid
33from remote_service import xmlrpc_handler, soap_handler, service_caller
34from service_error import service_error
35from synch_store import synch_store
36from experiment_partition import experiment_partition
37from experiment_control_legacy import experiment_control_legacy
38from authorizer import abac_authorizer
39from thread_pool import thread_pool, pooled_thread
40from experiment_info import experiment_info, allocation_info, federated_service
41from operation_status import operation_status
42
43from deter import topdl
44from deter import ip_allocator
45from deter import ip_addr
46from deter import topology_to_route_file
47import list_log
48
49
50class nullHandler(logging.Handler):
51    def emit(self, record): pass
52
53fl = logging.getLogger("fedd.experiment_control")
54fl.addHandler(nullHandler())
55
56class experiment_control_local(experiment_control_legacy):
57    """
58    Control of experiments that this system can directly access.
59
60    Includes experiment creation, termination and information dissemination.
61    Thred safe.
62    """
63
64    class ssh_cmd_timeout(RuntimeError): pass
65   
66    call_RequestAccess = service_caller('RequestAccess')
67    call_ReleaseAccess = service_caller('ReleaseAccess')
68    call_StartSegment = service_caller('StartSegment')
69    call_TerminateSegment = service_caller('TerminateSegment')
70    call_InfoSegment = service_caller('InfoSegment')
71    call_OperationSegment = service_caller('OperationSegment')
72    call_Ns2Topdl = service_caller('Ns2Topdl')
73
74    def __init__(self, config=None, auth=None):
75        """
76        Intialize the various attributes, most from the config object
77        """
78
79        def parse_tarfile_list(tf):
80            """
81            Parse a tarfile list from the configuration.  This is a set of
82            paths and tarfiles separated by spaces.
83            """
84            rv = [ ]
85            if tf is not None:
86                tl = tf.split()
87                while len(tl) > 1:
88                    p, t = tl[0:2]
89                    del tl[0:2]
90                    rv.append((p, t))
91            return rv
92
93        self.list_log = list_log.list_log
94
95        self.cert_file = config.get("experiment_control", "cert_file")
96        if self.cert_file:
97            self.cert_pwd = config.get("experiment_control", "cert_pwd")
98        else:
99            self.cert_file = config.get("globals", "cert_file")
100            self.cert_pwd = config.get("globals", "cert_pwd")
101
102        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
103                or config.get("globals", "trusted_certs")
104
105        self.repodir = config.get("experiment_control", "repodir")
106        self.repo_url = config.get("experiment_control", "repo_url", 
107                "https://users.isi.deterlab.net:23235");
108
109        self.exp_stem = "fed-stem"
110        self.log = logging.getLogger("fedd.experiment_control")
111        set_log_level(config, "experiment_control", self.log)
112        self.muxmax = 2
113        self.nthreads = 10
114        self.randomize_experiments = False
115
116        self.splitter = None
117        self.ssh_keygen = "/usr/bin/ssh-keygen"
118        self.ssh_identity_file = None
119
120
121        self.debug = config.getboolean("experiment_control", "create_debug")
122        self.cleanup = not config.getboolean("experiment_control", 
123                "leave_tmpfiles")
124        self.state_filename = config.get("experiment_control", 
125                "experiment_state")
126        self.store_filename = config.get("experiment_control", 
127                "synch_store")
128        self.store_url = config.get("experiment_control", "store_url")
129        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
130        self.grouper_url = config.get("experiment_control", "grouper_url")
131        self.fedkit = parse_tarfile_list(\
132                config.get("experiment_control", "fedkit"))
133        self.gatewaykit = parse_tarfile_list(\
134                config.get("experiment_control", "gatewaykit"))
135
136        dt = config.get("experiment_control", "direct_transit")
137        self.auth_type = config.get('experiment_control', 'auth_type') \
138                or 'legacy'
139        self.auth_dir = config.get('experiment_control', 'auth_dir')
140        self.routing = config.get('experiment_control', 'routing')
141        self.default_tb = config.get('experiment_control', 'default_testbed')
142        # XXX: document this!
143        self.info_cache_limit = \
144                config.getint('experiment_control', 'info_cache', 600)
145        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
146        else: self.direct_transit = [ ]
147        # NB for internal master/slave ops, not experiment setup
148        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
149
150        self.overrides = set([])
151        ovr = config.get('experiment_control', 'overrides')
152        if ovr:
153            for o in ovr.split(","):
154                o = o.strip()
155                if o.startswith('fedid:'): o = o[len('fedid:'):]
156                self.overrides.add(fedid(hexstr=o))
157
158        self.state = { }
159        self.state_lock = Lock()
160        self.tclsh = "/usr/local/bin/otclsh"
161        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
162                config.get("experiment_control", "tcl_splitter",
163                        "/usr/testbed/lib/ns2ir/parse.tcl")
164        mapdb_file = config.get("experiment_control", "mapdb")
165        self.trace_file = sys.stderr
166
167        self.def_expstart = \
168                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
169                "/tmp/federate";
170        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
171                "FEDDIR/hosts";
172        self.def_gwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
174                "/tmp/bridge.log";
175        self.def_mgwstart = \
176                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
177                "/tmp/bridge.log";
178        self.def_gwimage = "FBSD61-TUNNEL2";
179        self.def_gwtype = "pc";
180        self.local_access = { }
181
182        if self.auth_type == 'legacy':
183            if auth:
184                self.auth = auth
185            else:
186                self.log.error( "[access]: No authorizer initialized, " +\
187                        "creating local one.")
188                auth = authorizer()
189            self.get_access = self.legacy_get_access
190        elif self.auth_type == 'abac':
191            self.auth = abac_authorizer(load=self.auth_dir, 
192                    update=os.path.join(self.auth_dir,'update'))
193        else:
194            raise service_error(service_error.internal, 
195                    "Unknown auth_type: %s" % self.auth_type)
196
197        if mapdb_file:
198            self.read_mapdb(mapdb_file)
199        else:
200            self.log.warn("[experiment_control] No testbed map, using defaults")
201            self.tbmap = { 
202                    'deter':'https://users.isi.deterlab.net:23235',
203                    'emulab':'https://users.isi.deterlab.net:23236',
204                    'ucb':'https://users.isi.deterlab.net:23237',
205                    }
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
219        # Dispatch tables
220        self.soap_services = {\
221                'New': soap_handler('New', self.new_experiment),
222                'Create': soap_handler('Create', self.create_experiment),
223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
227                'Operation': soap_handler('Operation', self.do_operation),
228                'Terminate': soap_handler('Terminate', 
229                    self.terminate_experiment),
230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
232        }
233
234        self.xmlrpc_services = {\
235                'New': xmlrpc_handler('New', self.new_experiment),
236                'Create': xmlrpc_handler('Create', self.create_experiment),
237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
241                'Terminate': xmlrpc_handler('Terminate',
242                    self.terminate_experiment),
243                'Operation': xmlrpc_handler('Operation', self.do_operation),
244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
246        }
247
248    # Call while holding self.state_lock
249    def write_state(self):
250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
262        except EnvironmentError, e:
263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
269
270    @staticmethod
271    def get_alloc_ids(exp):
272        """
273        Used by read_store and read state.  This used to be worse.
274        """
275
276        return [ a.allocID for a in exp.get_all_allocations() ]
277
278
279    # Call while holding self.state_lock
280    def read_state(self):
281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
286       
287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
292        except EnvironmentError, e:
293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
299        for s in self.state.values():
300            try:
301
302                eid = s.fedid
303                if eid : 
304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
307                        #self.auth.set_attribute(s['owner'], eid)
308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
317                else:
318                    raise KeyError("No experiment id")
319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
322
323    def read_mapdb(self, file):
324        """
325        Read a simple colon separated list of mappings for the
326        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
327        also adds testbeds to active if they include , active after
328        their name.
329        """
330
331        self.tbmap = { }
332        lineno =0
333        try:
334            f = open(file, "r")
335            for line in f:
336                lineno += 1
337                line = line.strip()
338                if line.startswith('#') or len(line) == 0:
339                    continue
340                try:
341                    label, url = line.split(':', 1)
342                    self.tbmap[label] = url
343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
346        except EnvironmentError, e:
347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
349        else:
350            f.close()
351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
358        except EnvironmentError, e:
359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
391        except EnvironmentError, e:
392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
397    # XXX this may belong somewhere else
398
399    def get_grouper_updates(self, fid):
400        if self.grouper_url is None: return
401        d = tempfile.mkdtemp()
402        try:
403            fstr = "%s" % fid
404            # XXX locking
405            zipname = os.path.join(d, 'grouper.zip')
406            dest = os.path.join(self.auth_dir, 'update')
407            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
408            f = open(zipname, 'w')
409            f.write(resp.read())
410            f.close()
411            zf = zipfile.ZipFile(zipname, 'r')
412            zf.extractall(dest)
413            zf.close()
414        except URLError, e:
415            self.log.error("Cannot connect to grouper: %s" % e)
416            pass
417        finally:
418            shutil.rmtree(d)
419
420
421    def remove_dirs(self, dir):
422        """
423        Remove the directory tree and all files rooted at dir.  Log any errors,
424        but continue.
425        """
426        self.log.debug("[removedirs]: removing %s" % dir)
427        try:
428            for path, dirs, files in os.walk(dir, topdown=False):
429                for f in files:
430                    os.remove(os.path.join(path, f))
431                for d in dirs:
432                    os.rmdir(os.path.join(path, d))
433            os.rmdir(dir)
434        except EnvironmentError, e:
435            self.log.error("Error deleting directory tree in %s" % e);
436
437    @staticmethod
438    def make_temp_certfile(expcert, tmpdir):
439        """
440        make a protected copy of the access certificate so the experiment
441        controller can act as the experiment principal.  mkstemp is the most
442        secure way to do that. The directory should be created by
443        mkdtemp.  Return the filename.
444        """
445        if expcert and tmpdir:
446            try:
447                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
448                f = os.fdopen(certf, 'w')
449                print >> f, expcert
450                f.close()
451            except EnvironmentError, e:
452                raise service_error(service_error.internal, 
453                        "Cannot create temp cert file?")
454            return certfn
455        else:
456            return None
457
458       
459    def generate_ssh_keys(self, dest, type="rsa" ):
460        """
461        Generate a set of keys for the gateways to use to talk.
462
463        Keys are of type type and are stored in the required dest file.
464        """
465        valid_types = ("rsa", "dsa")
466        t = type.lower();
467        if t not in valid_types: raise ValueError
468        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
469
470        try:
471            trace = open("/dev/null", "w")
472        except EnvironmentError:
473            raise service_error(service_error.internal,
474                    "Cannot open /dev/null??");
475
476        # May raise CalledProcessError
477        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
478        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
479        if rv != 0:
480            raise service_error(service_error.internal, 
481                    "Cannot generate nonce ssh keys.  %s return code %d" \
482                            % (self.ssh_keygen, rv))
483
484    def generate_seer_certs(self, destdir):
485        '''
486        Create a SEER ca cert and a node cert in destdir/ca.pem and
487        destdir/node.pem respectively.  These will be distributed throughout
488        the federated experiment.  This routine reports errors via
489        service_errors.
490        '''
491        openssl = '/usr/bin/openssl'
492        # All the filenames and parameters we need for openssl calls below
493        ca_key =os.path.join(destdir, 'ca.key') 
494        ca_pem = os.path.join(destdir, 'ca.pem')
495        node_key =os.path.join(destdir, 'node.key') 
496        node_pem = os.path.join(destdir, 'node.pem')
497        node_req = os.path.join(destdir, 'node.req')
498        node_signed = os.path.join(destdir, 'node.signed')
499        days = '%s' % (365 * 10)
500        serial = '%s' % random.randint(0, 1<<16)
501
502        try:
503            # Sequence of calls to create a CA key, create a ca cert, create a
504            # node key, node signing request, and finally a signed node
505            # certificate.
506            sequence = (
507                    (openssl, 'genrsa', '-out', ca_key, '1024'),
508                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
509                        ca_pem, '-days', days, '-subj', 
510                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
511                    (openssl, 'genrsa', '-out', node_key, '1024'),
512                    (openssl, 'req', '-new', '-key', node_key, '-out', 
513                        node_req, '-days', days, '-subj', 
514                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
515                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
516                        '-set_serial', serial, '-req', '-in', node_req, 
517                        '-out', node_signed, '-days', days),
518                )
519            # Do all that stuff; bail if there's an error, and push all the
520            # output to dev/null.
521            for cmd in sequence:
522                trace = open("/dev/null", "w")
523                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
524                if rv != 0:
525                    raise service_error(service_error.internal, 
526                            "Cannot generate SEER certs.  %s return code %d" \
527                                    % (' '.join(cmd), rv))
528            # Concatinate the node key and signed certificate into node.pem
529            f = open(node_pem, 'w')
530            for comp in (node_signed, node_key):
531                g = open(comp, 'r')
532                f.write(g.read())
533                g.close()
534            f.close()
535
536            # Throw out intermediaries.
537            for fn in (ca_key, node_key, node_req, node_signed):
538                os.unlink(fn)
539
540        except EnvironmentError, e:
541            # Any difficulties with the file system wind up here
542            raise service_error(service_error.internal,
543                    "File error on  %s while creating SEER certs: %s" % \
544                            (e.filename, e.strerror))
545
546
547
548    def gentopo(self, str):
549        """
550        Generate the topology data structure from the splitter's XML
551        representation of it.
552
553        The topology XML looks like:
554            <experiment>
555                <nodes>
556                    <node><vname></vname><ips>ip1:ip2</ips></node>
557                </nodes>
558                <lans>
559                    <lan>
560                        <vname></vname><vnode></vnode><ip></ip>
561                        <bandwidth></bandwidth><member>node:port</member>
562                    </lan>
563                </lans>
564        """
565        class topo_parse:
566            """
567            Parse the topology XML and create the dats structure.
568            """
569            def __init__(self):
570                # Typing of the subelements for data conversion
571                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
572                self.int_subelements = ( 'bandwidth',)
573                self.float_subelements = ( 'delay',)
574                # The final data structure
575                self.nodes = [ ]
576                self.lans =  [ ]
577                self.topo = { \
578                        'node': self.nodes,\
579                        'lan' : self.lans,\
580                    }
581                self.element = { }  # Current element being created
582                self.chars = ""     # Last text seen
583
584            def end_element(self, name):
585                # After each sub element the contents is added to the current
586                # element or to the appropriate list.
587                if name == 'node':
588                    self.nodes.append(self.element)
589                    self.element = { }
590                elif name == 'lan':
591                    self.lans.append(self.element)
592                    self.element = { }
593                elif name in self.str_subelements:
594                    self.element[name] = self.chars
595                    self.chars = ""
596                elif name in self.int_subelements:
597                    self.element[name] = int(self.chars)
598                    self.chars = ""
599                elif name in self.float_subelements:
600                    self.element[name] = float(self.chars)
601                    self.chars = ""
602
603            def found_chars(self, data):
604                self.chars += data.rstrip()
605
606
607        tp = topo_parse();
608        parser = xml.parsers.expat.ParserCreate()
609        parser.EndElementHandler = tp.end_element
610        parser.CharacterDataHandler = tp.found_chars
611
612        parser.Parse(str)
613
614        return tp.topo
615       
616
617    def genviz(self, topo):
618        """
619        Generate the visualization the virtual topology
620        """
621
622        neato = "/usr/local/bin/neato"
623        # These are used to parse neato output and to create the visualization
624        # file.
625        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
626        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
627                "%s</type></node>"
628
629        try:
630            # Node names
631            nodes = [ n['vname'] for n in topo['node'] ]
632            topo_lans = topo['lan']
633        except KeyError, e:
634            raise service_error(service_error.internal, "Bad topology: %s" %e)
635
636        lans = { }
637        links = { }
638
639        # Walk through the virtual topology, organizing the connections into
640        # 2-node connections (links) and more-than-2-node connections (lans).
641        # When a lan is created, it's added to the list of nodes (there's a
642        # node in the visualization for the lan).
643        for l in topo_lans:
644            if links.has_key(l['vname']):
645                if len(links[l['vname']]) < 2:
646                    links[l['vname']].append(l['vnode'])
647                else:
648                    nodes.append(l['vname'])
649                    lans[l['vname']] = links[l['vname']]
650                    del links[l['vname']]
651                    lans[l['vname']].append(l['vnode'])
652            elif lans.has_key(l['vname']):
653                lans[l['vname']].append(l['vnode'])
654            else:
655                links[l['vname']] = [ l['vnode'] ]
656
657
658        # Open up a temporary file for dot to turn into a visualization
659        try:
660            df, dotname = tempfile.mkstemp()
661            dotfile = os.fdopen(df, 'w')
662        except EnvironmentError:
663            raise service_error(service_error.internal,
664                    "Failed to open file in genviz")
665
666        try:
667            dnull = open('/dev/null', 'w')
668        except EnvironmentError:
669            service_error(service_error.internal,
670                    "Failed to open /dev/null in genviz")
671
672        # Generate a dot/neato input file from the links, nodes and lans
673        try:
674            print >>dotfile, "graph G {"
675            for n in nodes:
676                print >>dotfile, '\t"%s"' % n
677            for l in links.keys():
678                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
679            for l in lans.keys():
680                for n in lans[l]:
681                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
682            print >>dotfile, "}"
683            dotfile.close()
684        except TypeError:
685            raise service_error(service_error.internal,
686                    "Single endpoint link in vtopo")
687        except EnvironmentError:
688            raise service_error(service_error.internal, "Cannot write dot file")
689
690        # Use dot to create a visualization
691        try:
692            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
693                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
694                stderr=dnull, close_fds=True)
695        except EnvironmentError:
696            raise service_error(service_error.internal, 
697                    "Cannot generate visualization: is graphviz available?")
698        dnull.close()
699
700        # Translate dot to vis format
701        vis_nodes = [ ]
702        vis = { 'node': vis_nodes }
703        for line in dot.stdout:
704            m = vis_re.match(line)
705            if m:
706                vn = m.group(1)
707                vis_node = {'name': vn, \
708                        'x': float(m.group(2)),\
709                        'y' : float(m.group(3)),\
710                    }
711                if vn in links.keys() or vn in lans.keys():
712                    vis_node['type'] = 'lan'
713                else:
714                    vis_node['type'] = 'node'
715                vis_nodes.append(vis_node)
716        rv = dot.wait()
717
718        os.remove(dotname)
719        # XXX: graphviz seems to use low return codes for warnings, like
720        # "couldn't find font"
721        if rv < 2 : return vis
722        else: return None
723
724
725    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
726            cert_pwd=None):
727        """
728        Release access to testbed through fedd
729        """
730
731        if not uri and tbmap:
732            uri = tbmap.get(tb, None)
733        if not uri:
734            raise service_error(service_error.server_config, 
735                    "Unknown testbed: %s" % tb)
736
737        if self.local_access.has_key(uri):
738            resp = self.local_access[uri].ReleaseAccess(\
739                    { 'ReleaseAccessRequestBody' : 
740                        {'allocID': {'fedid': aid}},}, 
741                    fedid(file=cert_file))
742            resp = { 'ReleaseAccessResponseBody': resp } 
743        else:
744            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
745                    cert_file, cert_pwd, self.trusted_certs)
746
747        # better error coding
748
749    def remote_ns2topdl(self, uri, desc):
750
751        req = {
752                'description' : { 'ns2description': desc },
753            }
754
755        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
756                self.trusted_certs)
757
758        if r.has_key('Ns2TopdlResponseBody'):
759            r = r['Ns2TopdlResponseBody']
760            ed = r.get('experimentdescription', None)
761            if ed.has_key('topdldescription'):
762                return topdl.Topology(**ed['topdldescription'])
763            else:
764                raise service_error(service_error.protocol, 
765                        "Bad splitter response (no output)")
766        else:
767            raise service_error(service_error.protocol, "Bad splitter response")
768
769    class start_segment:
770        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
771                cert_pwd=None, trusted_certs=None, caller=None,
772                log_collector=None):
773            self.log = log
774            self.debug = debug
775            self.cert_file = cert_file
776            self.cert_pwd = cert_pwd
777            self.trusted_certs = None
778            self.caller = caller
779            self.testbed = testbed
780            self.log_collector = log_collector
781            self.response = None
782            self.node = { }
783            self.subs = { }
784            self.tb = { }
785            self.proof = None
786
787        def make_map(self, resp):
788            if 'segmentdescription' not in resp  or \
789                    'topdldescription' not in resp['segmentdescription']:
790                self.log.warn('No topology returned from startsegment')
791                return 
792
793            top = topdl.Topology(
794                    **resp['segmentdescription']['topdldescription'])
795
796            for e in top.elements:
797                if isinstance(e, topdl.Computer):
798                    self.node[e.name] = e
799                elif isinstance(e, topdl.Testbed):
800                    self.tb[e.uri] = e
801            for s in top.substrates:
802                self.subs[s.name] = s
803
804        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
805            req = {
806                    'allocID': { 'fedid' : aid }, 
807                    'segmentdescription': { 
808                        'topdldescription': topo.to_dict(),
809                    },
810                }
811
812            if connInfo:
813                req['connection'] = connInfo
814
815            import_svcs = [ s for m in masters.values() \
816                    for s in m if self.testbed in s.importers]
817
818            if import_svcs or self.testbed in masters:
819                req['service'] = []
820
821            for s in import_svcs:
822                for r in s.reqs:
823                    sr = copy.deepcopy(r)
824                    sr['visibility'] = 'import';
825                    req['service'].append(sr)
826
827            for s in masters.get(self.testbed, []):
828                for r in s.reqs:
829                    sr = copy.deepcopy(r)
830                    sr['visibility'] = 'export';
831                    req['service'].append(sr)
832
833            if attrs:
834                req['fedAttr'] = attrs
835
836            try:
837                self.log.debug("Calling StartSegment at %s " % uri)
838                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
839                        self.trusted_certs)
840                if r.has_key('StartSegmentResponseBody'):
841                    lval = r['StartSegmentResponseBody'].get('allocationLog',
842                            None)
843                    if lval and self.log_collector:
844                        for line in  lval.splitlines(True):
845                            self.log_collector.write(line)
846                    self.make_map(r['StartSegmentResponseBody'])
847                    if 'proof' in r: self.proof = r['proof']
848                    self.response = r
849                else:
850                    raise service_error(service_error.internal, 
851                            "Bad response!?: %s" %r)
852                return True
853            except service_error, e:
854                self.log.error("Start segment failed on %s: %s" % \
855                        (self.testbed, e))
856                return False
857
858
859
860    class terminate_segment:
861        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
862                cert_pwd=None, trusted_certs=None, caller=None):
863            self.log = log
864            self.debug = debug
865            self.cert_file = cert_file
866            self.cert_pwd = cert_pwd
867            self.trusted_certs = None
868            self.caller = caller
869            self.testbed = testbed
870
871        def __call__(self, uri, aid ):
872            req = {
873                    'allocID': {'fedid': aid }, 
874                }
875            self.log.info("Calling terminate segment")
876            try:
877                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
878                        self.trusted_certs)
879                self.log.info("Terminate segment succeeded")
880                return True
881            except service_error, e:
882                self.log.error("Terminate segment failed on %s: %s" % \
883                        (self.testbed, e))
884                return False
885
886    class info_segment(start_segment):
887        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
888                cert_pwd=None, trusted_certs=None, caller=None,
889                log_collector=None):
890            experiment_control_local.start_segment.__init__(self, debug, 
891                    log, testbed, cert_file, cert_pwd, trusted_certs, 
892                    caller, log_collector)
893
894        def __call__(self, uri, aid):
895            req = { 'allocID': { 'fedid' : aid } }
896
897            try:
898                self.log.debug("Calling InfoSegment at %s " % uri)
899                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
900                        self.trusted_certs)
901                if r.has_key('InfoSegmentResponseBody'):
902                    self.make_map(r['InfoSegmentResponseBody'])
903                    if 'proof' in r: self.proof = r['proof']
904                    self.response = r
905                else:
906                    raise service_error(service_error.internal, 
907                            "Bad response!?: %s" %r)
908                return True
909            except service_error, e:
910                self.log.error("Info segment failed on %s: %s" % \
911                        (self.testbed, e))
912                return False
913
914    class operation_segment:
915        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
916                cert_pwd=None, trusted_certs=None, caller=None,
917                log_collector=None):
918            self.log = log
919            self.debug = debug
920            self.cert_file = cert_file
921            self.cert_pwd = cert_pwd
922            self.trusted_certs = None
923            self.caller = caller
924            self.testbed = testbed
925            self.status = None
926
927        def __call__(self, uri, aid, op, targets, params):
928            req = { 
929                    'allocID': { 'fedid' : aid },
930                    'operation': op,
931                    'target': targets,
932                    }
933            if params: req['parameter'] = params
934
935
936            try:
937                self.log.debug("Calling OperationSegment at %s " % uri)
938                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
939                        self.trusted_certs)
940                if 'OperationSegmentResponseBody' in r:
941                    r = r['OperationSegmentResponseBody']
942                    if 'status' in r:
943                        self.status = r['status']
944                else:
945                    raise service_error(service_error.internal, 
946                            "Bad response!?: %s" %r)
947                return True
948            except service_error, e:
949                self.log.error("Operation segment failed on %s: %s" % \
950                        (self.testbed, e))
951                return False
952
953    def annotate_topology(self, top, data):
954        # These routines do various parts of the annotation
955        def add_new_names(nl, l):
956            """ add any names in nl to the list in l """
957            for n in nl:
958                if n not in l: l.append(n)
959       
960        def merge_services(ne, e):
961            for ns in ne.service:
962                # NB: the else is on the for
963                for s in e.service:
964                    if ns.name == s.name:
965                        s.importer = ns.importer
966                        s.param = ns.param
967                        s.description = ns.description
968                        s.status = ns.status
969                        break
970                else:
971                    e.service.append(ns)
972       
973        def merge_oses(ne, e):
974            """
975            Merge the operating system entries of ne into e
976            """
977            for nos in ne.os:
978                # NB: the else is on the for
979                for os in e.os:
980                    if nos.name == os.name:
981                        os.version = nos.version
982                        os.version = nos.distribution
983                        os.version = nos.distributionversion
984                        for a in nos.attribute:
985                            if os.get_attribute(a.attribute):
986                                os.remove_attribute(a.attribute)
987                            os.set_attribute(a.attribute, a.value)
988                        break
989                else:
990                    # If both nodes have one OS, this is a replacement
991                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
992                    else: e.os.append(nos)
993
994        # Annotate the topology with embedding info
995        for e in top.elements:
996            if isinstance(e, topdl.Computer):
997                for s in data:
998                    ne = s.node.get(e.name, None)
999                    if ne is not None:
1000                        add_new_names(ne.localname, e.localname)
1001                        e.status = ne.status
1002                        merge_services(ne, e)
1003                        add_new_names(ne.operation, e.operation)
1004                        if ne.os: merge_oses(ne, e)
1005                        break
1006            elif isinstance(e,topdl.Testbed):
1007                for s in data:
1008                    ne = s.tb.get(e.uri, None)
1009                    if ne is not None:
1010                        add_new_names(ne.localname, e.localname)
1011                        add_new_names(ne.operation, e.operation)
1012                        merge_services(ne, e)
1013                        for a in ne.attribute:
1014                            e.set_attribute(a.attribute, a.value)
1015        # Annotate substrates
1016        for s in top.substrates:
1017            for d in data:
1018                ss = d.subs.get(s.name, None)
1019                if ss is not None:
1020                    if ss.capacity is not None:
1021                        s.capacity = ss.capacity
1022                    if s.latency is not None:
1023                        s.latency = ss.latency
1024
1025
1026
1027    def allocate_resources(self, allocated, masters, eid, expid, 
1028            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1029            attrs=None, connInfo={}, tbmap=None, expcert=None):
1030
1031        started = { }           # Testbeds where a sub-experiment started
1032                                # successfully
1033
1034        # XXX
1035        fail_soft = False
1036
1037        if tbmap is None: tbmap = { }
1038
1039        log = alloc_log or self.log
1040
1041        tp = thread_pool(self.nthreads)
1042        threads = [ ]
1043        starters = [ ]
1044
1045        if expcert:
1046            cert = expcert
1047            pw = None
1048        else:
1049            cert = self.cert_file
1050            pw = self.cert_pwd
1051
1052        for tb in allocated.keys():
1053            # Create and start a thread to start the segment, and save it
1054            # to get the return value later
1055            tb_attrs = copy.copy(attrs)
1056            tp.wait_for_slot()
1057            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
1058            base, suffix = split_testbed(tb)
1059            if suffix:
1060                tb_attrs.append({'attribute': 'experiment_name', 
1061                    'value': "%s-%s" % (eid, suffix)})
1062            else:
1063                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1064            if not uri:
1065                raise service_error(service_error.internal, 
1066                        "Unknown testbed %s !?" % tb)
1067
1068            aid = tbparams[tb].allocID
1069            if not aid:
1070                raise service_error(service_error.internal, 
1071                        "No alloc id for testbed %s !?" % tb)
1072
1073            s = self.start_segment(log=log, debug=self.debug,
1074                    testbed=tb, cert_file=cert,
1075                    cert_pwd=pw, trusted_certs=self.trusted_certs,
1076                    caller=self.call_StartSegment,
1077                    log_collector=log_collector)
1078            starters.append(s)
1079            t  = pooled_thread(\
1080                    target=s, name=tb,
1081                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1082                    pdata=tp, trace_file=self.trace_file)
1083            threads.append(t)
1084            t.start()
1085
1086        # Wait until all finish (keep pinging the log, though)
1087        mins = 0
1088        revoked = False
1089        while not tp.wait_for_all_done(60.0):
1090            mins += 1
1091            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1092                    % mins)
1093            if not revoked and \
1094                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1095                # a testbed has failed.  Revoke this experiment's
1096                # synchronizarion values so that sub experiments will not
1097                # deadlock waiting for synchronization that will never happen
1098                self.log.info("A subexperiment has failed to swap in, " + \
1099                        "revoking synch keys")
1100                var_key = "fedid:%s" % expid
1101                for k in self.synch_store.all_keys():
1102                    if len(k) > 45 and k[0:46] == var_key:
1103                        self.synch_store.revoke_key(k)
1104                revoked = True
1105
1106        failed = [ t.getName() for t in threads if not t.rv ]
1107        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1108
1109        # If one failed clean up, unless fail_soft is set
1110        if failed:
1111            if not fail_soft:
1112                tp.clear()
1113                for tb in succeeded:
1114                    # Create and start a thread to stop the segment
1115                    tp.wait_for_slot()
1116                    uri = tbparams[tb].uri
1117                    t  = pooled_thread(\
1118                            target=self.terminate_segment(log=log,
1119                                testbed=tb,
1120                                cert_file=cert, 
1121                                cert_pwd=pw,
1122                                trusted_certs=self.trusted_certs,
1123                                caller=self.call_TerminateSegment),
1124                            args=(uri, tbparams[tb].allocID),
1125                            name=tb,
1126                            pdata=tp, trace_file=self.trace_file)
1127                    t.start()
1128                # Wait until all finish (if any are being stopped)
1129                if succeeded:
1130                    tp.wait_for_all_done()
1131
1132                # release the allocations
1133                for tb in tbparams.keys():
1134                    try:
1135                        self.release_access(tb, tbparams[tb].allocID, 
1136                                tbmap=tbmap, uri=tbparams[tb].uri,
1137                                cert_file=cert, cert_pwd=pw)
1138                    except service_error, e:
1139                        self.log.warn("Error releasing access: %s" % e.desc)
1140                # Remove the placeholder
1141                self.state_lock.acquire()
1142                self.state[eid].status = 'failed'
1143                self.state[eid].updated()
1144                if self.state_filename: self.write_state()
1145                self.state_lock.release()
1146                # Remove the repo dir
1147                self.remove_dirs("%s/%s" %(self.repodir, expid))
1148                # Walk up tmpdir, deleting as we go
1149                if self.cleanup:
1150                    self.remove_dirs(tmpdir)
1151                else:
1152                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1153
1154
1155                log.error("Swap in failed on %s" % ",".join(failed))
1156                return
1157        else:
1158            # Walk through the successes and gather the proofs
1159            proofs = { }
1160            for s in starters:
1161                if s.proof:
1162                    proofs[s.testbed] = s.proof
1163            self.annotate_topology(top, starters)
1164            log.info("[start_segment]: Experiment %s active" % eid)
1165
1166
1167        # Walk up tmpdir, deleting as we go
1168        if self.cleanup:
1169            self.remove_dirs(tmpdir)
1170        else:
1171            log.debug("[start_experiment]: not removing %s" % tmpdir)
1172
1173        # Insert the experiment into our state and update the disk copy.
1174        self.state_lock.acquire()
1175        self.state[expid].status = 'active'
1176        self.state[eid] = self.state[expid]
1177        self.state[eid].top = top
1178        self.state[eid].updated()
1179        # Append startup proofs
1180        for f in self.state[eid].get_all_allocations():
1181            if f.tb in proofs:
1182                f.proof.append(proofs[f.tb])
1183
1184        if self.state_filename: self.write_state()
1185        self.state_lock.release()
1186        return
1187
1188
1189    def add_kit(self, e, kit):
1190        """
1191        Add a Software object created from the list of (install, location)
1192        tuples passed as kit  to the software attribute of an object e.  We
1193        do this enough to break out the code, but it's kind of a hack to
1194        avoid changing the old tuple rep.
1195        """
1196
1197        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1198
1199        if isinstance(e.software, list): e.software.extend(s)
1200        else: e.software = s
1201
1202    def append_experiment_authorization(self, expid, attrs, 
1203            need_state_lock=True):
1204        """
1205        Append the authorization information to system state
1206        """
1207
1208        for p, a in attrs:
1209            if p and a:
1210                # Improperly configured overrides can add bad principals.
1211                self.auth.set_attribute(p, a)
1212            else:
1213                self.log.debug("Bad append experiment_authorization %s %s" \
1214                        % (p, a))
1215        self.auth.save()
1216
1217        if need_state_lock: self.state_lock.acquire()
1218        # XXX: really a no op?
1219        #self.state[expid]['auth'].update(attrs)
1220        if self.state_filename: self.write_state()
1221        if need_state_lock: self.state_lock.release()
1222
1223    def clear_experiment_authorization(self, expid, need_state_lock=True):
1224        """
1225        Attrs is a set of attribute principal pairs that need to be removed
1226        from the authenticator.  Remove them and save the authenticator.
1227        """
1228
1229        if need_state_lock: self.state_lock.acquire()
1230        # XXX: should be a no-op
1231        #if expid in self.state and 'auth' in self.state[expid]:
1232            #for p, a in self.state[expid]['auth']:
1233                #self.auth.unset_attribute(p, a)
1234            #self.state[expid]['auth'] = set()
1235        if self.state_filename: self.write_state()
1236        if need_state_lock: self.state_lock.release()
1237        self.auth.save()
1238
1239
1240    def create_experiment_state(self, fid, req, expid, expcert,
1241            state='starting'):
1242        """
1243        Create the initial entry in the experiment's state.  The expid and
1244        expcert are the experiment's fedid and certifacte that represents that
1245        ID, which are installed in the experiment state.  If the request
1246        includes a suggested local name that is used if possible.  If the local
1247        name is already taken by an experiment owned by this user that has
1248        failed, it is overwritten.  Otherwise new letters are added until a
1249        valid localname is found.  The generated local name is returned.
1250        """
1251
1252        if req.has_key('experimentID') and \
1253                req['experimentID'].has_key('localname'):
1254            overwrite = False
1255            eid = req['experimentID']['localname']
1256            # If there's an old failed experiment here with the same local name
1257            # and accessible by this user, we'll overwrite it, otherwise we'll
1258            # fall through and do the collision avoidance.
1259            old_expid = self.get_experiment_fedid(eid)
1260            if old_expid:
1261                users_experiment = True
1262                try:
1263                    self.check_experiment_access(fid, old_expid)
1264                except service_error, e:
1265                    if e.code == service_error.access: users_experiment = False
1266                    else: raise e
1267                if users_experiment:
1268                    self.state_lock.acquire()
1269                    status = self.state[eid].status
1270                    if status and status == 'failed':
1271                        # remove the old access attributes
1272                        self.clear_experiment_authorization(eid,
1273                                need_state_lock=False)
1274                        overwrite = True
1275                        del self.state[eid]
1276                        del self.state[old_expid]
1277                    self.state_lock.release()
1278                else:
1279                    self.log.info('Experiment %s exists, ' % eid + \
1280                            'but this user cannot access it')
1281            self.state_lock.acquire()
1282            while (self.state.has_key(eid) and not overwrite):
1283                eid += random.choice(string.ascii_letters)
1284            # Initial state
1285            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1286                    identity=expcert)
1287            self.state[expid] = self.state[eid]
1288            if self.state_filename: self.write_state()
1289            self.state_lock.release()
1290        else:
1291            eid = self.exp_stem
1292            for i in range(0,5):
1293                eid += random.choice(string.ascii_letters)
1294            self.state_lock.acquire()
1295            while (self.state.has_key(eid)):
1296                eid = self.exp_stem
1297                for i in range(0,5):
1298                    eid += random.choice(string.ascii_letters)
1299            # Initial state
1300            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1301                    identity=expcert)
1302            self.state[expid] = self.state[eid]
1303            if self.state_filename: self.write_state()
1304            self.state_lock.release()
1305
1306        # Let users touch the state.  Authorize this fid and the expid itself
1307        # to touch the experiment, as well as allowing th eoverrides.
1308        self.append_experiment_authorization(eid, 
1309                set([(fid, expid), (expid,expid)] + \
1310                        [ (o, expid) for o in self.overrides]))
1311
1312        return eid
1313
1314
1315    def allocate_ips_to_topo(self, top):
1316        """
1317        Add an ip4_address attribute to all the hosts in the topology that have
1318        not already been assigned one by the user, based on the shared
1319        substrates on which they sit.  An /etc/hosts file is also created and
1320        returned as a list of hostfiles entries.  We also return the allocator,
1321        because we may need to allocate IPs to portals
1322        (specifically DRAGON portals).
1323        """
1324        subs = sorted(top.substrates, 
1325                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1326                reverse=True)
1327        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1328        ifs = { }
1329        hosts = [ ]
1330
1331        assigned = { }
1332        assigned_subs = set()
1333        # Look for IP networks that were already assigned to the experiment by
1334        # the user.  We assume that users are smart in making that assignment,
1335        # but we check them later.
1336        for e in top.elements:
1337            if not isinstance(e, topdl.Computer): continue
1338            for inf in e.interface:
1339                a = inf.get_attribute('ip4_address')
1340                if not a: continue
1341                for s in inf.substrate:
1342                    assigned_subs.add(s)
1343                a = ip_addr(a)
1344                n = ip_addr(inf.get_attribute('ip4_netmask'))
1345
1346                sz = 0x100000000 - long(n)
1347                net = (long(a) & long(n)) & 0xffffffff
1348                if net in assigned:
1349                    if sz > assigned[net]: assigned[net] = sz
1350                assigned[net] = sz
1351
1352        # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER
1353        assigned[long(ip_addr('10.0.23.0'))] = 256
1354        # end of hack
1355
1356        # Walk all substrates, smallest to largest, assigning IP addresses to
1357        # the unassigned and checking the assigned.  Build an /etc/hosts file
1358        # as well.
1359        for idx, s in enumerate(subs):
1360            if s.name not in assigned_subs:
1361                net_size = len(s.interfaces)+2
1362
1363                # Get an ip allocation for this substrate.  Make sure the
1364                # allocation does not collide with any user allocations.
1365                ok = False
1366                while not ok:
1367                    a = ips.allocate(net_size)
1368                    if not a:
1369                        raise service_error(service_error.req, 
1370                                "Cannot allocate IP addresses")
1371                    base, num = a
1372                    if num < net_size: 
1373                        raise service_error(service_error.internal,
1374                                "Allocator returned wrong number of IPs??")
1375                    # Check for overlap.  An assigned network could contain an
1376                    # endpoint of the new allocation or the new allocation
1377                    # could contain an endpoint of an assigned network.
1378                    # NB the else is attached to the for - if the loop is not
1379                    # exited by a break, ok = True.
1380                    for n, sz in assigned.items():
1381                        if base >= n and base < n + sz:
1382                            ok = False
1383                            break
1384                        if base + num > n and base + num  < n + sz:
1385                            ok = False
1386                            break
1387                        if n >= base and n < base + num:
1388                            ok = False
1389                            break
1390                        if n+sz > base and n+sz < base + num:
1391                            ok = False
1392                            break
1393                    else:
1394                        ok = True
1395                           
1396                mask = ips.min_alloc
1397                while mask < net_size:
1398                    mask *= 2
1399
1400                netmask = ((2**32-1) ^ (mask-1))
1401            else:
1402                base = 0
1403
1404            # Add the attributes and build hosts
1405            base += 1
1406            for i in s.interfaces:
1407                # Add address and masks to interfaces on unassigned substrates
1408                if s.name not in assigned_subs:
1409                    i.attribute.append(
1410                            topdl.Attribute('ip4_address', 
1411                                "%s" % ip_addr(base)))
1412                    i.attribute.append(
1413                            topdl.Attribute('ip4_netmask', 
1414                                "%s" % ip_addr(int(netmask))))
1415
1416                # Make /etc/hosts entries
1417                addr = i.get_attribute('ip4_address')
1418                if addr is None:
1419                    raise service_error(service_error.req, 
1420                            "Partially assigned substrate %s" %s.name)
1421                hname = i.element.name
1422                if hname in ifs:
1423                    hosts.append("%s\t%s-%s %s-%d" % \
1424                            (addr, hname, s.name, hname, ifs[hname]))
1425                else:
1426                    # First IP allocated it the default ip according to hosts,
1427                    # so the extra alias is here.
1428                    ifs[hname] = 0
1429                    hosts.append("%s\t%s-%s %s-%d %s" % \
1430                            (addr, hname, s.name, hname, ifs[hname], hname))
1431
1432                ifs[hname] += 1
1433                base += 1
1434        return hosts, ips
1435
1436    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1437            tbparam, masters, tbmap, expid=None, expcert=None):
1438        for tb in testbeds:
1439            self.log.debug('Trying testbed %s' % tb)
1440            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1441                    expcert)
1442            allocated[tb] = 1
1443            self.log.debug('Got testbed %s' % tb)
1444
1445    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1446            expcert=None):
1447        """
1448        Get access to testbed through fedd and set the parameters for that tb
1449        """
1450        def get_export_project(svcs):
1451            """
1452            Look through for the list of federated_service for this testbed
1453            objects for a project_export service, and extract the project
1454            parameter.
1455            """
1456
1457            pe = [s for s in svcs if s.name=='project_export']
1458            if len(pe) == 1:
1459                return pe[0].params.get('project', None)
1460            elif len(pe) == 0:
1461                return None
1462            else:
1463                raise service_error(service_error.req,
1464                        "More than one project export is not supported")
1465
1466        def add_services(svcs, type, slist, keys):
1467            """
1468            Add the given services to slist.  type is import or export.  Also
1469            add a mapping entry from the assigned id to the original service
1470            record.
1471            """
1472            for i, s in enumerate(svcs):
1473                idx = '%s%d' % (type, i)
1474                keys[idx] = s
1475                sr = {'id': idx, 'name': s.name, 'visibility': type }
1476                if s.params:
1477                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1478                            for k, v in s.params.items()]
1479                slist.append(sr)
1480
1481        uri = tbmap.get(testbed_base(tb), None)
1482        if not uri:
1483            raise service_error(service_error.server_config, 
1484                    "Unknown testbed: %s" % tb)
1485
1486        export_svcs = masters.get(tb,[])
1487        import_svcs = [ s for m in masters.values() \
1488                for s in m \
1489                    if tb in s.importers ]
1490
1491        export_project = get_export_project(export_svcs)
1492        # Compose the credential list so that IDs come before attributes
1493        creds = set()
1494        keys = set()
1495        certs = self.auth.get_creds_for_principal(fid)
1496        # Append credenials about this experiment controller - e.g. that it is
1497        # trusted.
1498        certs.update(self.auth.get_creds_for_principal(
1499            fedid(file=self.cert_file)))
1500        if expid:
1501            certs.update(self.auth.get_creds_for_principal(expid))
1502        for c in certs:
1503            keys.add(c.issuer_cert())
1504            creds.add(c.attribute_cert())
1505        creds = list(keys) + list(creds)
1506
1507        if expcert: cert, pw = expcert, None
1508        else: cert, pw = self.cert_file, self.cert_pw
1509
1510        # Request credentials
1511        req = {
1512                'abac_credential': creds,
1513            }
1514        # Make the service request from the services we're importing and
1515        # exporting.  Keep track of the export request ids so we can
1516        # collect the resulting info from the access response.
1517        e_keys = { }
1518        if import_svcs or export_svcs:
1519            slist = []
1520            add_services(import_svcs, 'import', slist, e_keys)
1521            add_services(export_svcs, 'export', slist, e_keys)
1522            req['service'] = slist
1523
1524        if self.local_access.has_key(uri):
1525            # Local access call
1526            req = { 'RequestAccessRequestBody' : req }
1527            r = self.local_access[uri].RequestAccess(req, 
1528                    fedid(file=self.cert_file))
1529            r = { 'RequestAccessResponseBody' : r }
1530        else:
1531            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1532
1533        if r.has_key('RequestAccessResponseBody'):
1534            # Through to here we have a valid response, not a fault.
1535            # Access denied is a fault, so something better or worse than
1536            # access denied has happened.
1537            r = r['RequestAccessResponseBody']
1538            self.log.debug("[get_access] Access granted")
1539        else:
1540            raise service_error(service_error.protocol,
1541                        "Bad proxy response")
1542        if 'proof' not in r:
1543            raise service_error(service_error.protocol,
1544                        "Bad access response (no access proof)")
1545
1546        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
1547                tb=tb, uri=uri, proof=[r['proof']], 
1548                services=masters.get(tb, None))
1549
1550        # Collect the responses corresponding to the services this testbed
1551        # exports.  These will be the service requests that we will include in
1552        # the start segment requests (with appropriate visibility values) to
1553        # import and export the segments.
1554        for s in r.get('service', []):
1555            id = s.get('id', None)
1556            # Note that this attaches the response to the object in the masters
1557            # data structure.  (The e_keys index disappears when this fcn
1558            # returns)
1559            if id and id in e_keys:
1560                e_keys[id].reqs.append(s)
1561
1562        # Add attributes to parameter space.  We don't allow attributes to
1563        # overlay any parameters already installed.
1564        for a in r.get('fedAttr', []):
1565            try:
1566                if a['attribute']:
1567                    tbparam[tb].set_attribute(a['attribute'], a['value'])
1568            except KeyError:
1569                self.log.error("Bad attribute in response: %s" % a)
1570
1571
1572    def split_topology(self, top, topo, testbeds):
1573        """
1574        Create the sub-topologies that are needed for experiment instantiation.
1575        """
1576        for tb in testbeds:
1577            topo[tb] = top.clone()
1578            # copy in for loop allows deletions from the original
1579            for e in [ e for e in topo[tb].elements]:
1580                etb = e.get_attribute('testbed')
1581                # NB: elements without a testbed attribute won't appear in any
1582                # sub topologies. 
1583                if not etb or etb != tb:
1584                    for i in e.interface:
1585                        for s in i.subs:
1586                            try:
1587                                s.interfaces.remove(i)
1588                            except ValueError:
1589                                raise service_error(service_error.internal,
1590                                        "Can't remove interface??")
1591                    topo[tb].elements.remove(e)
1592            topo[tb].make_indices()
1593
1594    def confirm_software(self, top):
1595        """
1596        Make sure that the software to be loaded in the topo is all available
1597        before we begin making access requests, etc.  This is a subset of
1598        wrangle_software.
1599        """
1600        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1601        pkgs.update([x.location for e in top.elements for x in e.software])
1602
1603        for pkg in pkgs:
1604            loc = pkg
1605
1606            scheme, host, path = urlparse(loc)[0:3]
1607            dest = os.path.basename(path)
1608            if not scheme:
1609                if not loc.startswith('/'):
1610                    loc = "/%s" % loc
1611                loc = "file://%s" %loc
1612            # NB: if scheme was found, loc == pkg
1613            try:
1614                u = urlopen(loc)
1615                u.close()
1616            except Exception, e:
1617                raise service_error(service_error.req, 
1618                        "Cannot open %s: %s" % (loc, e))
1619        return True
1620
1621    def wrangle_software(self, expid, top, topo, tbparams):
1622        """
1623        Copy software out to the repository directory, allocate permissions and
1624        rewrite the segment topologies to look for the software in local
1625        places.
1626        """
1627
1628        # Copy the rpms and tarfiles to a distribution directory from
1629        # which the federants can retrieve them
1630        linkpath = "%s/software" %  expid
1631        softdir ="%s/%s" % ( self.repodir, linkpath)
1632        softmap = { }
1633
1634        # self.fedkit and self.gateway kit are lists of tuples of
1635        # (install_location, download_location) this extracts the download
1636        # locations.
1637        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1638        pkgs.update([x.location for e in top.elements for x in e.software])
1639        try:
1640            os.makedirs(softdir)
1641        except EnvironmentError, e:
1642            raise service_error(
1643                    "Cannot create software directory: %s" % e)
1644        # The actual copying.  Everything's converted into a url for copying.
1645        auth_attrs = set()
1646        for pkg in pkgs:
1647            loc = pkg
1648
1649            scheme, host, path = urlparse(loc)[0:3]
1650            dest = os.path.basename(path)
1651            if not scheme:
1652                if not loc.startswith('/'):
1653                    loc = "/%s" % loc
1654                loc = "file://%s" %loc
1655            # NB: if scheme was found, loc == pkg
1656            try:
1657                u = urlopen(loc)
1658            except Exception, e:
1659                raise service_error(service_error.req, 
1660                        "Cannot open %s: %s" % (loc, e))
1661            try:
1662                f = open("%s/%s" % (softdir, dest) , "w")
1663                self.log.debug("Writing %s/%s" % (softdir,dest) )
1664                data = u.read(4096)
1665                while data:
1666                    f.write(data)
1667                    data = u.read(4096)
1668                f.close()
1669                u.close()
1670            except Exception, e:
1671                raise service_error(service_error.internal,
1672                        "Could not copy %s: %s" % (loc, e))
1673            path = re.sub("/tmp", "", linkpath)
1674            # XXX
1675            softmap[pkg] = \
1676                    "%s/%s/%s" %\
1677                    ( self.repo_url, path, dest)
1678
1679            # Allow the individual segments to access the software by assigning
1680            # an attribute to each testbed allocation that encodes the data to
1681            # be released.  This expression collects the data for each run of
1682            # the loop.
1683            auth_attrs.update([
1684                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
1685                        for tb in tbparams.keys()])
1686
1687        self.append_experiment_authorization(expid, auth_attrs)
1688
1689        # Convert the software locations in the segments into the local
1690        # copies on this host
1691        for soft in [ s for tb in topo.values() \
1692                for e in tb.elements \
1693                    if getattr(e, 'software', False) \
1694                        for s in e.software ]:
1695            if softmap.has_key(soft.location):
1696                soft.location = softmap[soft.location]
1697
1698
1699    def new_experiment(self, req, fid):
1700        """
1701        The external interface to empty initial experiment creation called from
1702        the dispatcher.
1703
1704        Creates a working directory, splits the incoming description using the
1705        splitter script and parses out the avrious subsections using the
1706        lcasses above.  Once each sub-experiment is created, use pooled threads
1707        to instantiate them and start it all up.
1708        """
1709        self.log.info("New experiment call started for %s" % fid)
1710        req = req.get('NewRequestBody', None)
1711        if not req:
1712            raise service_error(service_error.req,
1713                    "Bad request format (no NewRequestBody)")
1714
1715        # import may partially succeed so always save credentials and warn
1716        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1717            self.log.debug("Failed to import delegation credentials(!)")
1718        self.get_grouper_updates(fid)
1719        self.auth.update()
1720        self.auth.save()
1721
1722        try:
1723            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1724                    with_proof=True)
1725        except service_error, e:
1726            self.log.info("New experiment call for %s: access denied" % fid)
1727            raise e
1728
1729
1730        if not access_ok:
1731            self.log.info("New experiment call for %s: Access denied" % fid)
1732            raise service_error(service_error.access, "New access denied",
1733                    proof=[proof])
1734
1735        try:
1736            tmpdir = tempfile.mkdtemp(prefix="split-")
1737        except EnvironmentError:
1738            raise service_error(service_error.internal, "Cannot create tmp dir")
1739
1740        # Generate an ID for the experiment (slice) and a certificate that the
1741        # allocator can use to prove they own it.  We'll ship it back through
1742        # the encrypted connection.  If the requester supplied one, use it.
1743        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1744            expcert = req['experimentAccess']['X509']
1745            expid = fedid(certstr=expcert)
1746            self.state_lock.acquire()
1747            if expid in self.state:
1748                self.state_lock.release()
1749                raise service_error(service_error.req, 
1750                        'fedid %s identifies an existing experiment' % expid)
1751            self.state_lock.release()
1752        else:
1753            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1754
1755        #now we're done with the tmpdir, and it should be empty
1756        if self.cleanup:
1757            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1758            os.rmdir(tmpdir)
1759        else:
1760            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1761
1762        eid = self.create_experiment_state(fid, req, expid, expcert, 
1763                state='empty')
1764
1765        rv = {
1766                'experimentID': [
1767                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1768                ],
1769                'experimentStatus': 'empty',
1770                'experimentAccess': { 'X509' : expcert },
1771                'proof': proof.to_dict(),
1772            }
1773
1774        self.log.info("New experiment call succeeded for %s" % fid)
1775        return rv
1776
1777    # create_experiment sub-functions
1778
1779    @staticmethod
1780    def get_experiment_key(req, field='experimentID'):
1781        """
1782        Parse the experiment identifiers out of the request (the request body
1783        tag has been removed).  Specifically this pulls either the fedid or the
1784        localname out of the experimentID field.  A fedid is preferred.  If
1785        neither is present or the request does not contain the fields,
1786        service_errors are raised.
1787        """
1788        # Get the experiment access
1789        exp = req.get(field, None)
1790        if exp:
1791            if exp.has_key('fedid'):
1792                key = exp['fedid']
1793            elif exp.has_key('localname'):
1794                key = exp['localname']
1795            else:
1796                raise service_error(service_error.req, "Unknown lookup type")
1797        else:
1798            raise service_error(service_error.req, "No request?")
1799
1800        return key
1801
1802    def get_experiment_ids_and_start(self, key, tmpdir):
1803        """
1804        Get the experiment name, id and access certificate from the state, and
1805        set the experiment state to 'starting'.  returns a triple (fedid,
1806        localname, access_cert_file). The access_cert_file is a copy of the
1807        contents of the access certificate, created in the tempdir with
1808        restricted permissions.  If things are confused, raise an exception.
1809        """
1810
1811        expid = eid = None
1812        self.state_lock.acquire()
1813        if key in self.state:
1814            exp = self.state[key]
1815            exp.status = "starting"
1816            exp.updated()
1817            expid = exp.fedid
1818            eid = exp.localname
1819            expcert = exp.identity
1820        self.state_lock.release()
1821
1822        # make a protected copy of the access certificate so the experiment
1823        # controller can act as the experiment principal.
1824        if expcert:
1825            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1826            if not expcert_file:
1827                raise service_error(service_error.internal, 
1828                        "Cannot create temp cert file?")
1829        else:
1830            expcert_file = None
1831
1832        return (eid, expid, expcert_file)
1833
1834    def get_topology(self, req, tmpdir):
1835        """
1836        Get the ns2 content and put it into a file for parsing.  Call the local
1837        or remote parser and return the topdl.Topology.  Errors result in
1838        exceptions.  req is the request and tmpdir is a work directory.
1839        """
1840
1841        # The tcl parser needs to read a file so put the content into that file
1842        descr=req.get('experimentdescription', None)
1843        if descr:
1844            if 'ns2description' in descr:
1845                file_content=descr['ns2description']
1846            elif 'topdldescription' in descr:
1847                return topdl.Topology(**descr['topdldescription'])
1848            else:
1849                raise service_error(service_error.req, 
1850                        'Unknown experiment description type')
1851        else:
1852            raise service_error(service_error.req, "No experiment description")
1853
1854
1855        if self.splitter_url:
1856            self.log.debug("Calling remote topdl translator at %s" % \
1857                    self.splitter_url)
1858            top = self.remote_ns2topdl(self.splitter_url, file_content)
1859        else:
1860            tclfile = os.path.join(tmpdir, "experiment.tcl")
1861            if file_content:
1862                try:
1863                    f = open(tclfile, 'w')
1864                    f.write(file_content)
1865                    f.close()
1866                except EnvironmentError:
1867                    raise service_error(service_error.internal,
1868                            "Cannot write temp experiment description")
1869            else:
1870                raise service_error(service_error.req, 
1871                        "Only ns2descriptions supported")
1872            pid = "dummy"
1873            gid = "dummy"
1874            eid = "dummy"
1875
1876            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1877                str(self.muxmax), '-m', 'dummy']
1878
1879            tclcmd.extend([pid, gid, eid, tclfile])
1880
1881            self.log.debug("running local splitter %s", " ".join(tclcmd))
1882            # This is just fantastic.  As a side effect the parser copies
1883            # tb_compat.tcl into the current directory, so that directory
1884            # must be writable by the fedd user.  Doing this in the
1885            # temporary subdir ensures this is the case.
1886            tclparser = Popen(tclcmd, stdout=PIPE, stderr=PIPE, close_fds=True, 
1887                    cwd=tmpdir)
1888            split_data, err_data = tclparser.communicate()
1889
1890            if tclparser.returncode != 0:
1891                os.remove(tclfile)
1892                raise service_error(service_error.req, 
1893                        "Cannot parse input ns2: %s" % err_data)
1894
1895            top = topdl.topology_from_xml(string=split_data, top="experiment")
1896            os.remove(tclfile)
1897
1898        return top
1899
1900    def get_testbed_services(self, req, testbeds):
1901        """
1902        Parse the services section of the request into two dicts mapping
1903        testbed to lists of federated_service objects.  The first dict maps all
1904        exporters of services to those service objects, the second maps
1905        testbeds to service objects only for services requiring portals.
1906        """
1907
1908        # Sanity check the services.  Exports or imports from unknown testbeds
1909        # cause an exception.
1910        for s in req.get('service', []):
1911            for t in s.get('import', []):
1912                if t not in testbeds:
1913                    raise service_error(service_error.req, 
1914                            'Service import to unknown testbed: %s' %t)
1915            for t in s.get('export', []):
1916                if t not in testbeds:
1917                    raise service_error(service_error.req, 
1918                            'Service export from unknown testbed: %s' %t)
1919
1920
1921        # We construct both dicts here because deriving the second is more
1922        # complex than it looks - both the keys and lists can differ, and it's
1923        # much easier to generate both in one pass.
1924        masters = { }
1925        pmasters = { }
1926        for s in req.get('service', []):
1927            # If this is a service request with the importall field
1928            # set, fill it out.
1929
1930            if s.get('importall', False):
1931                s['import'] = [ tb for tb in testbeds \
1932                        if tb not in s.get('export',[])]
1933                del s['importall']
1934
1935            # Add the service to masters
1936            for tb in s.get('export', []):
1937                if s.get('name', None):
1938
1939                    params = { }
1940                    for a in s.get('fedAttr', []):
1941                        params[a.get('attribute', '')] = a.get('value','')
1942
1943                    fser = federated_service(name=s['name'],
1944                            exporter=tb, importers=s.get('import',[]),
1945                            params=params)
1946                    if fser.name == 'hide_hosts' \
1947                            and 'hosts' not in fser.params:
1948                        fser.params['hosts'] = \
1949                                ",".join(tb_hosts.get(fser.exporter, []))
1950                    if tb in masters: masters[tb].append(fser)
1951                    else: masters[tb] = [fser]
1952
1953                    if fser.portal:
1954                        if tb in pmasters: pmasters[tb].append(fser)
1955                        else: pmasters[tb] = [fser]
1956                else:
1957                    self.log.error('Testbed service does not have name " + \
1958                            "and importers')
1959        return masters, pmasters
1960
1961    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1962        """
1963        Create the ssh keys necessary for interconnecting the portal nodes and
1964        the global hosts file for letting each segment know about the IP
1965        addresses in play.  If we have computed static routes, copy them into
1966        the repo. Save these into the repo.  Add attributes to the autorizer
1967        allowing access controllers to download them and return a set of
1968        attributes that inform the segments where to find this stuff.  May
1969        raise service_errors in if there are problems.
1970        """
1971        gw_pubkey_base = "fedgw_%s.pub" % self.ssh_type
1972        gw_secretkey_base = "fedgw_%s" % self.ssh_type
1973        keydir = os.path.join(tmpdir, 'keys')
1974        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1975        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1976        route = os.path.join(tmpdir, 'route.tgz')
1977
1978        try:
1979            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1980        except ValueError:
1981            raise service_error(service_error.server_config, 
1982                    "Bad key type (%s)" % self.ssh_type)
1983
1984        self.generate_seer_certs(keydir)
1985
1986        # Copy configuration files into the remote file store
1987        # The config urlpath
1988        configpath = "/%s/config" % expid
1989        # The config file system location
1990        configdir ="%s%s" % ( self.repodir, configpath)
1991        route_conf = os.path.join(configdir, 'route.tgz')
1992        try:
1993            os.makedirs(configdir)
1994        except EnvironmentError, e:
1995            raise service_error(service_error.internal,
1996                    "Cannot create config directory: %s" % e)
1997        try:
1998            f = open("%s/hosts" % configdir, "w")
1999            print >> f, string.join(hosts, '\n')
2000            f.close()
2001        except EnvironmentError, e:
2002            raise service_error(service_error.internal, 
2003                    "Cannot write hosts file: %s" % e)
2004        try:
2005            if os.path.exists(route):
2006                copy_file(route, route_conf)
2007
2008            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
2009            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
2010            copy_file(os.path.join(keydir, 'ca.pem'), 
2011                    os.path.join(configdir, 'ca.pem'))
2012            copy_file(os.path.join(keydir, 'node.pem'), 
2013                    os.path.join(configdir, 'node.pem'))
2014        except EnvironmentError, e:
2015            raise service_error(service_error.internal, 
2016                    "Cannot copy keyfiles: %s" % e)
2017
2018        # Allow the individual testbeds to access the configuration files,
2019        # again by setting an attribute for the relevant pathnames on each
2020        # allocation principal.  Yeah, that's a long list comprehension.
2021        self.append_experiment_authorization(expid, set([
2022            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
2023                    for tb in tbparams.keys() \
2024                        for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz',
2025                            gw_secretkey_base, gw_pubkey_base)]))
2026
2027        attrs = [ 
2028                {
2029                    'attribute': 'ssh_pubkey', 
2030                    'value': '%s/%s/config/%s' % \
2031                            (self.repo_url, expid, gw_pubkey_base)
2032                },
2033                {
2034                    'attribute': 'ssh_secretkey', 
2035                    'value': '%s/%s/config/%s' % \
2036                            (self.repo_url, expid, gw_secretkey_base)
2037                },
2038                {
2039                    'attribute': 'hosts', 
2040                    'value': '%s/%s/config/hosts' % \
2041                            (self.repo_url, expid)
2042                },
2043                {
2044                    'attribute': 'seer_ca_pem', 
2045                    'value': '%s/%s/config/%s' % \
2046                            (self.repo_url, expid, 'ca.pem')
2047                },
2048                {
2049                    'attribute': 'seer_node_pem', 
2050                    'value': '%s/%s/config/%s' % \
2051                            (self.repo_url, expid, 'node.pem')
2052                },
2053            ]
2054        # Add info about static routes if we have some
2055        if os.path.exists(route_conf):
2056            attrs.append(
2057                {
2058                    'attribute': 'route.tgz', 
2059                    'value': '%s/%s/config/%s' % \
2060                            (self.repo_url, expid, 'route.tgz')
2061                }
2062            )
2063        return attrs
2064
2065
2066    def get_vtopo(self, req, fid):
2067        """
2068        Return the stored virtual topology for this experiment
2069        """
2070        rv = None
2071        state = None
2072        self.log.info("vtopo call started for %s" %  fid)
2073
2074        req = req.get('VtopoRequestBody', None)
2075        if not req:
2076            raise service_error(service_error.req,
2077                    "Bad request format (no VtopoRequestBody)")
2078        exp = req.get('experiment', None)
2079        if exp:
2080            if exp.has_key('fedid'):
2081                key = exp['fedid']
2082                keytype = "fedid"
2083            elif exp.has_key('localname'):
2084                key = exp['localname']
2085                keytype = "localname"
2086            else:
2087                raise service_error(service_error.req, "Unknown lookup type")
2088        else:
2089            raise service_error(service_error.req, "No request?")
2090
2091        try:
2092            proof = self.check_experiment_access(fid, key)
2093        except service_error, e:
2094            self.log.info("vtopo call failed for %s: access denied" %  fid)
2095            raise e
2096
2097        self.state_lock.acquire()
2098        # XXX: this needs to be recalculated
2099        if key in self.state:
2100            if self.state[key].top is not None:
2101                vtopo = topdl.topology_to_vtopo(self.state[key].top)
2102                rv = { 'experiment' : {keytype: key },
2103                        'vtopo': vtopo,
2104                        'proof': proof.to_dict(), 
2105                    }
2106            else:
2107                state = self.state[key].status
2108        self.state_lock.release()
2109
2110        if rv: 
2111            self.log.info("vtopo call completed for %s %s " % \
2112                (key, fid))
2113            return rv
2114        else: 
2115            if state:
2116                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2117                    (key, fid))
2118                raise service_error(service_error.partial, 
2119                        "Not ready: %s" % state)
2120            else:
2121                self.log.info("vtopo call completed for %s %s (No experiment)"\
2122                        % (key, fid))
2123                raise service_error(service_error.req, "No such experiment")
2124
2125    def get_vis(self, req, fid):
2126        """
2127        Return the stored visualization for this experiment
2128        """
2129        rv = None
2130        state = None
2131
2132        self.log.info("vis call started for %s" %  fid)
2133        req = req.get('VisRequestBody', None)
2134        if not req:
2135            raise service_error(service_error.req,
2136                    "Bad request format (no VisRequestBody)")
2137        exp = req.get('experiment', None)
2138        if exp:
2139            if exp.has_key('fedid'):
2140                key = exp['fedid']
2141                keytype = "fedid"
2142            elif exp.has_key('localname'):
2143                key = exp['localname']
2144                keytype = "localname"
2145            else:
2146                raise service_error(service_error.req, "Unknown lookup type")
2147        else:
2148            raise service_error(service_error.req, "No request?")
2149
2150        try:
2151            proof = self.check_experiment_access(fid, key)
2152        except service_error, e:
2153            self.log.info("vis call failed for %s: access denied" %  fid)
2154            raise e
2155
2156        self.state_lock.acquire()
2157        # Generate the visualization
2158        if key in self.state:
2159            if self.state[key].top is not None:
2160                try:
2161                    vis = self.genviz(
2162                            topdl.topology_to_vtopo(self.state[key].top))
2163                except service_error, e:
2164                    self.state_lock.release()
2165                    raise e
2166                rv =  { 'experiment' : {keytype: key },
2167                        'vis': vis,
2168                        'proof': proof.to_dict(), 
2169                        }
2170            else:
2171                state = self.state[key].status
2172        self.state_lock.release()
2173
2174        if rv: 
2175            self.log.info("vis call completed for %s %s " % \
2176                (key, fid))
2177            return rv
2178        else:
2179            if state:
2180                self.log.info("vis call completed for %s %s (not ready)" % \
2181                    (key, fid))
2182                raise service_error(service_error.partial, 
2183                        "Not ready: %s" % state)
2184            else:
2185                self.log.info("vis call completed for %s %s (no experiment)" % \
2186                    (key, fid))
2187                raise service_error(service_error.req, "No such experiment")
2188
2189    @staticmethod
2190    def needs_route_computation(req):
2191        '''
2192        Walk the request services looking for a static routing request
2193        '''
2194        for s in req.get('service', []):
2195            if s.get('name', '') == 'static_routing':
2196                return True
2197        return False
2198
2199    def compute_static_routes(self, tmpdir, top):
2200        '''
2201        Compute a set of static routes for the topology.  The result is a file
2202        called route.tgz in tmpdir that contains a dinrectory, route, with a
2203        file per node in the topology that lists the prefix and router for all
2204        the routes in that node's routing table.  Exceptions from the route
2205        calculation program and the tar creation call are not caught.
2206        '''
2207        if self.routing is None:
2208            raise service_error(service_error.server, 
2209                    'Cannot provide staticroutes, no routing program specified')
2210        rg = tempfile.NamedTemporaryFile()
2211        outdir = os.path.join(tmpdir, 'route')
2212        tarfile = os.path.join(tmpdir, 'route.tgz')
2213        topology_to_route_file(top, file=rg)
2214        os.mkdir(outdir)
2215        try:
2216            for cmd in (
2217                    [self.routing, '--input', rg.name,'--output', outdir],
2218                    ['tar', '-C', tmpdir, '-czf', tarfile, 'route']):
2219                subprocess.check_call(cmd)
2220        except subprocess.CalledProcessError, e:
2221            raise service_error(service_error.internal, 
2222                    'Cannot call %s: %s' % (' '.join(cmd), e.returncode))
2223        rg.close()
2224        shutil.rmtree(outdir)
2225
2226
2227   
2228    def save_federant_information(self, allocated, tbparams, eid, top):
2229        """
2230        Store the various data that have changed in the experiment state
2231        between when it was started and the beginning of resource allocation.
2232        This is basically the information about each local allocation.  This
2233        fills in the values of the placeholder allocation in the state.  It
2234        also collects the access proofs and returns them as dicts for a
2235        response message.
2236        """
2237        self.state_lock.acquire()
2238        exp = self.state[eid]
2239        exp.top = top.clone()
2240        # save federant information
2241        for k in allocated.keys():
2242            exp.add_allocation(tbparams[k])
2243            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
2244                type="testbed", localname=[k], 
2245                service=[ s.to_topdl() for s in tbparams[k].services]))
2246
2247        # Access proofs for the response message
2248        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
2249                    for p in tbparams[k].proof]
2250        exp.updated()
2251        if self.state_filename: 
2252            self.write_state()
2253        self.state_lock.release()
2254        return proofs
2255
2256    def clear_placeholder(self, eid, expid, tmpdir):
2257        """
2258        Clear the placeholder and remove any allocated temporary dir.
2259        """
2260
2261        self.state_lock.acquire()
2262        del self.state[eid]
2263        del self.state[expid]
2264        if self.state_filename: self.write_state()
2265        self.state_lock.release()
2266        if tmpdir and self.cleanup:
2267            self.remove_dirs(tmpdir)
2268
2269    # end of create_experiment sub-functions
2270
2271    def create_experiment(self, req, fid):
2272        """
2273        The external interface to experiment creation called from the
2274        dispatcher.
2275
2276        Creates a working directory, splits the incoming description using the
2277        splitter script and parses out the various subsections using the
2278        classes above.  Once each sub-experiment is created, use pooled threads
2279        to instantiate them and start it all up.
2280        """
2281
2282        self.log.info("Create experiment call started for %s" % fid)
2283        req = req.get('CreateRequestBody', None)
2284        if req:
2285            key = self.get_experiment_key(req)
2286        else:
2287            raise service_error(service_error.req,
2288                    "Bad request format (no CreateRequestBody)")
2289
2290        # Import information from the requester
2291        # import may partially succeed so always save credentials and warn
2292        if not self.auth.import_credentials(data_list=req.get('credential',[])):
2293            self.log.debug("Failed to import delegation credentials(!)")
2294        self.get_grouper_updates(fid)
2295        self.auth.update()
2296        self.auth.save()
2297
2298        try:
2299            # Make sure that the caller can talk to us
2300            proof = self.check_experiment_access(fid, key)
2301        except service_error, e:
2302            self.log.info("Create experiment call failed for %s: access denied"\
2303                    % fid)
2304            raise e
2305
2306
2307        # Install the testbed map entries supplied with the request into a copy
2308        # of the testbed map.
2309        tbmap = dict(self.tbmap)
2310        for m in req.get('testbedmap', []):
2311            if 'testbed' in m and 'uri' in m:
2312                tbmap[m['testbed']] = m['uri']
2313
2314        # a place to work
2315        try:
2316            tmpdir = tempfile.mkdtemp(prefix="split-")
2317            os.mkdir(tmpdir+"/keys")
2318        except EnvironmentError:
2319            raise service_error(service_error.internal, "Cannot create tmp dir")
2320
2321        tbparams = { }
2322
2323        eid, expid, expcert_file = \
2324                self.get_experiment_ids_and_start(key, tmpdir)
2325
2326        # This catches exceptions to clear the placeholder if necessary
2327        try: 
2328            if not (eid and expid):
2329                raise service_error(service_error.internal, 
2330                        "Cannot find local experiment info!?")
2331
2332            top = self.get_topology(req, tmpdir)
2333            self.confirm_software(top)
2334            # Assign the IPs
2335            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2336            if self.needs_route_computation(req):
2337                self.compute_static_routes(tmpdir, top)
2338            # Find the testbeds to look up
2339            tb_hosts = { }
2340            testbeds = [ ]
2341            for e in top.elements:
2342                if isinstance(e, topdl.Computer):
2343                    tb = e.get_attribute('testbed')
2344                    # Put nodes not in a testbed into the default testbed if
2345                    # there is one.
2346                    if tb is None:
2347                        if self.default_tb is None:
2348                            raise service_error(service_error.req, 
2349                                '%s not in a testbed (and no default)' % e.name)
2350                        tb = self.default_tb
2351                        e.set_attribute('testbed', tb)
2352                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2353                    else: 
2354                        tb_hosts[tb] = [ e.name ]
2355                        testbeds.append(tb)
2356
2357            masters, pmasters = self.get_testbed_services(req, testbeds)
2358            allocated = { }         # Testbeds we can access
2359            topo ={ }               # Sub topologies
2360            connInfo = { }          # Connection information
2361
2362            self.get_access_to_testbeds(testbeds, fid, allocated, 
2363                    tbparams, masters, tbmap, expid, expcert_file)
2364
2365            # tbactive is the set of testbeds that have NATs in front of their
2366            # portals. They need to initiate connections.
2367            tbactive = set([k for k, v in tbparams.items() \
2368                    if v.get_attribute('nat_portals')])
2369
2370            self.split_topology(top, topo, testbeds)
2371
2372            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2373
2374            part = experiment_partition(self.auth, self.store_url, tbmap,
2375                    self.muxmax, self.direct_transit, tbactive)
2376            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2377                    connInfo, expid)
2378
2379            auth_attrs = set()
2380            # Now get access to the dynamic testbeds (those added above)
2381            for tb in [ t for t in topo if t not in allocated]:
2382                self.get_access(tb, tbparams, fid, masters, tbmap, 
2383                        expid, expcert_file)
2384                allocated[tb] = 1
2385                store_keys = topo[tb].get_attribute('store_keys')
2386                # Give the testbed access to keys it exports or imports
2387                if store_keys:
2388                    auth_attrs.update(set([
2389                        (tbparams[tb].allocID, sk) \
2390                                for sk in store_keys.split(" ")]))
2391
2392            if auth_attrs:
2393                self.append_experiment_authorization(expid, auth_attrs)
2394
2395            # transit and disconnected testbeds may not have a connInfo entry.
2396            # Fill in the blanks.
2397            for t in allocated.keys():
2398                if not connInfo.has_key(t):
2399                    connInfo[t] = { }
2400
2401            self.wrangle_software(expid, top, topo, tbparams)
2402
2403            proofs = self.save_federant_information(allocated, tbparams, 
2404                    eid, top)
2405        except service_error, e:
2406            # If something goes wrong in the parse (usually an access error)
2407            # clear the placeholder state.  From here on out the code delays
2408            # exceptions.  Failing at this point returns a fault to the remote
2409            # caller.
2410
2411            self.log.info("Create experiment call failed for %s %s: %s" % 
2412                    (eid, fid, e))
2413            self.clear_placeholder(eid, expid, tmpdir)
2414            raise e
2415
2416        # Start the background swapper and return the starting state.  From
2417        # here on out, the state will stick around a while.
2418
2419        # Create a logger that logs to the experiment's state object as well as
2420        # to the main log file.
2421        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2422        alloc_collector = self.list_log(self.state[eid].log)
2423        h = logging.StreamHandler(alloc_collector)
2424        # XXX: there should be a global one of these rather than repeating the
2425        # code.
2426        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2427                    '%d %b %y %H:%M:%S'))
2428        alloc_log.addHandler(h)
2429
2430        # Start a thread to do the resource allocation
2431        t  = Thread(target=self.allocate_resources,
2432                args=(allocated, masters, eid, expid, tbparams, 
2433                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2434                    connInfo, tbmap, expcert_file),
2435                name=eid)
2436        t.start()
2437
2438        rv = {
2439                'experimentID': [
2440                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2441                ],
2442                'experimentStatus': 'starting',
2443                'proof': [ proof.to_dict() ] + proofs,
2444            }
2445        self.log.info("Create experiment call succeeded for %s %s" % \
2446                (eid, fid))
2447
2448        return rv
2449   
2450    def get_experiment_fedid(self, key):
2451        """
2452        find the fedid associated with the localname key in the state database.
2453        """
2454
2455        rv = None
2456        self.state_lock.acquire()
2457        if key in self.state:
2458            rv = self.state[key].fedid
2459        self.state_lock.release()
2460        return rv
2461
2462    def check_experiment_access(self, fid, key):
2463        """
2464        Confirm that the fid has access to the experiment.  Though a request
2465        may be made in terms of a local name, the access attribute is always
2466        the experiment's fedid.
2467        """
2468        if not isinstance(key, fedid):
2469            key = self.get_experiment_fedid(key)
2470
2471        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2472
2473        if access_ok:
2474            return proof
2475        else:
2476            raise service_error(service_error.access, "Access Denied",
2477                proof)
2478
2479
2480    def get_handler(self, path, fid):
2481        """
2482        Perhaps surprisingly named, this function handles HTTP GET requests to
2483        this server (SOAP requests are POSTs).
2484        """
2485        self.log.info("Get handler %s %s" % (path, fid))
2486        if len("%s" % fid) == 0:
2487            return (None, None)
2488        # XXX: log proofs?
2489        if self.auth.check_attribute(fid, path):
2490            return ("%s/%s" % (self.repodir, path), "application/binary")
2491        else:
2492            return (None, None)
2493
2494    def update_info(self, key, force=False):
2495        top = None
2496        self.state_lock.acquire()
2497        if key in self.state:
2498            if force or self.state[key].older_than(self.info_cache_limit):
2499                top = self.state[key].top
2500                if top is not None: top = top.clone()
2501                d1, info_params, cert, d2 = \
2502                        self.get_segment_info(self.state[key], need_lock=False)
2503        self.state_lock.release()
2504
2505        if top is None: return
2506
2507        try:
2508            tmpdir = tempfile.mkdtemp(prefix="info-")
2509        except EnvironmentError:
2510            raise service_error(service_error.internal, 
2511                    "Cannot create tmp dir")
2512        cert_file = self.make_temp_certfile(cert, tmpdir)
2513
2514        data = []
2515        try:
2516            for k, (uri, aid) in info_params.items():
2517                info=self.info_segment(log=self.log, testbed=uri,
2518                            cert_file=cert_file, cert_pwd=None,
2519                            trusted_certs=self.trusted_certs,
2520                            caller=self.call_InfoSegment)
2521                info(uri, aid)
2522                data.append(info)
2523        # Clean up the tmpdir no matter what
2524        finally:
2525            if tmpdir: self.remove_dirs(tmpdir)
2526
2527        self.annotate_topology(top, data)
2528        self.state_lock.acquire()
2529        if key in self.state:
2530            self.state[key].top = top
2531            self.state[key].updated()
2532            if self.state_filename: self.write_state()
2533        self.state_lock.release()
2534
2535   
2536    def get_info(self, req, fid):
2537        """
2538        Return all the stored info about this experiment
2539        """
2540        rv = None
2541
2542        self.log.info("Info call started for %s" %  fid)
2543        req = req.get('InfoRequestBody', None)
2544        if not req:
2545            raise service_error(service_error.req,
2546                    "Bad request format (no InfoRequestBody)")
2547        exp = req.get('experiment', None)
2548        legacy = req.get('legacy', False)
2549        fresh = req.get('fresh', False)
2550        if exp:
2551            if exp.has_key('fedid'):
2552                key = exp['fedid']
2553                keytype = "fedid"
2554            elif exp.has_key('localname'):
2555                key = exp['localname']
2556                keytype = "localname"
2557            else:
2558                raise service_error(service_error.req, "Unknown lookup type")
2559        else:
2560            raise service_error(service_error.req, "No request?")
2561
2562        try:
2563            proof = self.check_experiment_access(fid, key)
2564        except service_error, e:
2565            self.log.info("Info call failed for %s: access denied" %  fid)
2566
2567
2568        self.update_info(key, fresh)
2569
2570        self.state_lock.acquire()
2571        if self.state.has_key(key):
2572            rv = self.state[key].get_info()
2573            # Copy the topo if we need legacy annotations
2574            if legacy:
2575                top = self.state[key].top
2576                if top is not None: top = top.clone()
2577        self.state_lock.release()
2578        self.log.info("Gathered Info for %s %s" % (key, fid))
2579
2580        # If the legacy visualization and topology representations are
2581        # requested, calculate them and add them to the return.
2582        if legacy and rv is not None:
2583            self.log.info("Generating legacy Info for %s %s" % (key, fid))
2584            if top is not None:
2585                vtopo = topdl.topology_to_vtopo(top)
2586                if vtopo is not None:
2587                    rv['vtopo'] = vtopo
2588                    try:
2589                        vis = self.genviz(vtopo)
2590                    except service_error, e:
2591                        self.log.debug('Problem generating visualization: %s' \
2592                                % e)
2593                        vis = None
2594                    if vis is not None:
2595                        rv['vis'] = vis
2596        if rv:
2597            self.log.info("Info succeded for %s %s" % (key, fid))
2598            rv['proof'] = proof.to_dict()
2599            return rv
2600        else: 
2601            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
2602            raise service_error(service_error.req, "No such experiment")
2603
2604    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2605            results):
2606        """
2607        Call OperateSegment on multiple testbeds and gather the results.
2608        op_params contains the parameters needed to contact that testbed, cert
2609        is a certificate containing the fedid to use, op is the operation,
2610        testbeds is a dict mapping testbed name to targets in that testbed,
2611        params are the parameters to include a,d results is a growing list of
2612        the results of the calls.
2613        """
2614        try:
2615            tmpdir = tempfile.mkdtemp(prefix="info-")
2616        except EnvironmentError:
2617            raise service_error(service_error.internal, 
2618                    "Cannot create tmp dir")
2619        cert_file = self.make_temp_certfile(cert, tmpdir)
2620
2621        try:
2622            for tb, targets in testbeds.items():
2623                if tb in op_params:
2624                    uri, aid = op_params[tb]
2625                    operate=self.operation_segment(log=self.log, testbed=uri,
2626                                cert_file=cert_file, cert_pwd=None,
2627                                trusted_certs=self.trusted_certs,
2628                                caller=self.call_OperationSegment)
2629                    if operate(uri, aid, op, targets, params):
2630                        if operate.status is not None:
2631                            results.extend(operate.status)
2632                            continue
2633                # Something went wrong in a weird way.  Add statuses
2634                # that reflect that to results
2635                for t in targets:
2636                    results.append(operation_status(t, 
2637                        operation_status.federant,
2638                        'Unexpected error on %s' % tb))
2639        # Clean up the tmpdir no matter what
2640        finally:
2641            if tmpdir: self.remove_dirs(tmpdir)
2642
2643    def do_operation(self, req, fid):
2644        """
2645        Find the testbeds holding each target and ask them to carry out the
2646        operation.  Return the statuses.
2647        """
2648        # Map an element to the testbed containing it
2649        def element_to_tb(e):
2650            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2651            elif isinstance(e, topdl.Testbed): return e.name
2652            else: return None
2653        # If d is an operation_status object, make it a dict
2654        def make_dict(d):
2655            if isinstance(d, dict): return d
2656            elif isinstance(d, operation_status): return d.to_dict()
2657            else: return { }
2658
2659        def element_name(e):
2660            if isinstance(e, topdl.Computer): return e.name
2661            elif isinstance(e, topdl.Testbed): 
2662                if e.localname: return e.localname[0]
2663                else: return None
2664            else: return None
2665
2666        self.log.info("Operation call started for %s" %  fid)
2667        req = req.get('OperationRequestBody', None)
2668        if not req:
2669            raise service_error(service_error.req,
2670                    "Bad request format (no OperationRequestBody)")
2671        exp = req.get('experiment', None)
2672        op = req.get('operation', None)
2673        targets = set(req.get('target', []))
2674        params = req.get('parameter', None)
2675
2676        if exp:
2677            if 'fedid' in exp:
2678                key = exp['fedid']
2679                keytype = "fedid"
2680            elif 'localname' in exp:
2681                key = exp['localname']
2682                keytype = "localname"
2683            else:
2684                raise service_error(service_error.req, "Unknown lookup type")
2685        else:
2686            raise service_error(service_error.req, "No request?")
2687
2688        if op is None or not targets:
2689            raise service_error(service_error.req, "No request?")
2690
2691        try:
2692            proof = self.check_experiment_access(fid, key)
2693        except service_error, e:
2694            self.log.info("Operation call failed for %s: access denied" %  fid)
2695            raise e
2696
2697        self.state_lock.acquire()
2698        if key in self.state:
2699            d1, op_params, cert, d2 = \
2700                    self.get_segment_info(self.state[key], need_lock=False,
2701                            key='tb')
2702            top = self.state[key].top
2703            if top is not None:
2704                top = top.clone()
2705        self.state_lock.release()
2706
2707        if top is None:
2708            self.log.info("Operation call failed for %s: not active" %  fid)
2709            raise service_error(service_error.partial, "No topology yet", 
2710                    proof=proof)
2711
2712        testbeds = { }
2713        results = []
2714        for e in top.elements:
2715            ename = element_name(e)
2716            if ename in targets:
2717                tb = element_to_tb(e)
2718                targets.remove(ename)
2719                if tb is not None:
2720                    if tb in testbeds: testbeds[tb].append(ename)
2721                    else: testbeds[tb] = [ ename ]
2722                else:
2723                    results.append(operation_status(e.name, 
2724                        code=operation_status.no_target, 
2725                        description='Cannot map target to testbed'))
2726
2727        for t in targets:
2728            results.append(operation_status(t, operation_status.no_target))
2729
2730        self.operate_on_segments(op_params, cert, op, testbeds, params,
2731                results)
2732
2733        self.log.info("Operation call succeeded for %s" %  fid)
2734        return { 
2735                'experiment': exp, 
2736                'status': [make_dict(r) for r in results],
2737                'proof': proof.to_dict()
2738                }
2739
2740
2741    def get_multi_info(self, req, fid):
2742        """
2743        Return all the stored info that this fedid can access
2744        """
2745        rv = { 'info': [ ], 'proof': [ ] }
2746
2747        self.log.info("Multi Info call started for %s" %  fid)
2748        self.get_grouper_updates(fid)
2749        self.auth.update()
2750        self.auth.save()
2751        self.state_lock.acquire()
2752        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2753            try:
2754                proof = self.check_experiment_access(fid, key)
2755            except service_error, e:
2756                if e.code == service_error.access:
2757                    continue
2758                else:
2759                    self.log.info("Multi Info call failed for %s: %s" %  \
2760                            (e,fid))
2761                    self.state_lock.release()
2762                    raise e
2763
2764            if self.state.has_key(key):
2765                e = self.state[key].get_info()
2766                e['proof'] = proof.to_dict()
2767                rv['info'].append(e)
2768                rv['proof'].append(proof.to_dict())
2769        self.state_lock.release()
2770        self.log.info("Multi Info call succeeded for %s" %  fid)
2771        return rv
2772
2773    def check_termination_status(self, fed_exp, force):
2774        """
2775        Confirm that the experiment is sin a valid state to stop (or force it)
2776        return the state - invalid states for deletion and force settings cause
2777        exceptions.
2778        """
2779        self.state_lock.acquire()
2780        status = fed_exp.status
2781
2782        if status:
2783            if status in ('starting', 'terminating'):
2784                if not force:
2785                    self.state_lock.release()
2786                    raise service_error(service_error.partial, 
2787                            'Experiment still being created or destroyed')
2788                else:
2789                    self.log.warning('Experiment in %s state ' % status + \
2790                            'being terminated by force.')
2791            self.state_lock.release()
2792            return status
2793        else:
2794            # No status??? trouble
2795            self.state_lock.release()
2796            raise service_error(service_error.internal,
2797                    "Experiment has no status!?")
2798
2799    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
2800        ids = []
2801        term_params = { }
2802        if need_lock: self.state_lock.acquire()
2803        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2804        expcert = fed_exp.identity
2805        repo = "%s" % fed_exp.fedid
2806
2807        # Collect the allocation/segment ids into a dict keyed by the fedid
2808        # of the allocation that contains a tuple of uri, aid
2809        for i, fed in enumerate(fed_exp.get_all_allocations()):
2810            uri = fed.uri
2811            aid = fed.allocID
2812            if key == 'aid': term_params[aid] = (uri, aid)
2813            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2814
2815        if need_lock: self.state_lock.release()
2816        return ids, term_params, expcert, repo
2817
2818
2819    def get_termination_info(self, fed_exp):
2820        self.state_lock.acquire()
2821        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
2822        # Change the experiment state
2823        fed_exp.status = 'terminating'
2824        fed_exp.updated()
2825        if self.state_filename: self.write_state()
2826        self.state_lock.release()
2827
2828        return ids, term_params, expcert, repo
2829
2830
2831    def deallocate_resources(self, term_params, expcert, status, force, 
2832            dealloc_log):
2833        tmpdir = None
2834        # This try block makes sure the tempdir is cleared
2835        try:
2836            # If no expcert, try the deallocation as the experiment
2837            # controller instance.
2838            if expcert and self.auth_type != 'legacy': 
2839                try:
2840                    tmpdir = tempfile.mkdtemp(prefix="term-")
2841                except EnvironmentError:
2842                    raise service_error(service_error.internal, 
2843                            "Cannot create tmp dir")
2844                cert_file = self.make_temp_certfile(expcert, tmpdir)
2845                pw = None
2846            else: 
2847                cert_file = self.cert_file
2848                pw = self.cert_pwd
2849
2850            # Stop everyone.  NB, wait_for_all waits until a thread starts
2851            # and then completes, so we can't wait if nothing starts.  So,
2852            # no tbparams, no start.
2853            if len(term_params) > 0:
2854                tp = thread_pool(self.nthreads)
2855                for k, (uri, aid) in term_params.items():
2856                    # Create and start a thread to stop the segment
2857                    tp.wait_for_slot()
2858                    t  = pooled_thread(\
2859                            target=self.terminate_segment(log=dealloc_log,
2860                                testbed=uri,
2861                                cert_file=cert_file, 
2862                                cert_pwd=pw,
2863                                trusted_certs=self.trusted_certs,
2864                                caller=self.call_TerminateSegment),
2865                            args=(uri, aid), name=k,
2866                            pdata=tp, trace_file=self.trace_file)
2867                    t.start()
2868                # Wait for completions
2869                tp.wait_for_all_done()
2870
2871            # release the allocations (failed experiments have done this
2872            # already, and starting experiments may be in odd states, so we
2873            # ignore errors releasing those allocations
2874            try: 
2875                for k, (uri, aid)  in term_params.items():
2876                    self.release_access(None, aid, uri=uri,
2877                            cert_file=cert_file, cert_pwd=pw)
2878            except service_error, e:
2879                if status != 'failed' and not force:
2880                    raise e
2881
2882        # Clean up the tmpdir no matter what
2883        finally:
2884            if tmpdir: self.remove_dirs(tmpdir)
2885
2886    def terminate_experiment(self, req, fid):
2887        """
2888        Swap this experiment out on the federants and delete the shared
2889        information
2890        """
2891        self.log.info("Terminate experiment call started for %s" % fid)
2892        tbparams = { }
2893        req = req.get('TerminateRequestBody', None)
2894        if not req:
2895            raise service_error(service_error.req,
2896                    "Bad request format (no TerminateRequestBody)")
2897
2898        key = self.get_experiment_key(req, 'experiment')
2899        try:
2900            proof = self.check_experiment_access(fid, key)
2901        except service_error, e:
2902            self.log.info(
2903                    "Terminate experiment call failed for %s: access denied" \
2904                            % fid)
2905            raise e
2906        exp = req.get('experiment', False)
2907        force = req.get('force', False)
2908
2909        dealloc_list = [ ]
2910
2911
2912        # Create a logger that logs to the dealloc_list as well as to the main
2913        # log file.
2914        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2915        dealloc_log.info("Terminating %s " %key)
2916        h = logging.StreamHandler(self.list_log(dealloc_list))
2917        # XXX: there should be a global one of these rather than repeating the
2918        # code.
2919        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2920                    '%d %b %y %H:%M:%S'))
2921        dealloc_log.addHandler(h)
2922
2923        self.state_lock.acquire()
2924        fed_exp = self.state.get(key, None)
2925        self.state_lock.release()
2926        repo = None
2927
2928        if fed_exp:
2929            status = self.check_termination_status(fed_exp, force)
2930            # get_termination_info updates the experiment state
2931            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2932            self.deallocate_resources(term_params, expcert, status, force, 
2933                    dealloc_log)
2934
2935            # Remove the terminated experiment
2936            self.state_lock.acquire()
2937            for id in ids:
2938                self.clear_experiment_authorization(id, need_state_lock=False)
2939                if id in self.state: del self.state[id]
2940
2941            if self.state_filename: self.write_state()
2942            self.state_lock.release()
2943
2944            # Delete any synch points associated with this experiment.  All
2945            # synch points begin with the fedid of the experiment.
2946            fedid_keys = set(["fedid:%s" % f for f in ids \
2947                    if isinstance(f, fedid)])
2948            for k in self.synch_store.all_keys():
2949                try:
2950                    if len(k) > 45 and k[0:46] in fedid_keys:
2951                        self.synch_store.del_value(k)
2952                except synch_store.BadDeletionError:
2953                    pass
2954            self.write_store()
2955
2956            # Remove software and other cached stuff from the filesystem.
2957            if repo:
2958                self.remove_dirs("%s/%s" % (self.repodir, repo))
2959       
2960            self.log.info("Terminate experiment succeeded for %s %s" % \
2961                    (key, fid))
2962            return { 
2963                    'experiment': exp , 
2964                    'deallocationLog': string.join(dealloc_list, ''),
2965                    'proof': [proof.to_dict()],
2966                    }
2967        else:
2968            self.log.info("Terminate experiment failed for %s %s: no state" % \
2969                    (key, fid))
2970            raise service_error(service_error.req, "No saved state")
2971
2972
2973    def GetValue(self, req, fid):
2974        """
2975        Get a value from the synchronized store
2976        """
2977        req = req.get('GetValueRequestBody', None)
2978        if not req:
2979            raise service_error(service_error.req,
2980                    "Bad request format (no GetValueRequestBody)")
2981       
2982        name = req.get('name', None)
2983        wait = req.get('wait', False)
2984        rv = { 'name': name }
2985
2986        if not name:
2987            raise service_error(service_error.req, "No name?")
2988
2989        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2990
2991        if access_ok:
2992            self.log.debug("[GetValue] asking for %s " % name)
2993            try:
2994                v = self.synch_store.get_value(name, wait)
2995            except synch_store.RevokedKeyError:
2996                # No more synch on this key
2997                raise service_error(service_error.federant, 
2998                        "Synch key %s revoked" % name)
2999            if v is not None:
3000                rv['value'] = v
3001            rv['proof'] = proof.to_dict()
3002            self.log.debug("[GetValue] got %s from %s" % (v, name))
3003            return rv
3004        else:
3005            raise service_error(service_error.access, "Access Denied",
3006                    proof=proof)
3007       
3008
3009    def SetValue(self, req, fid):
3010        """
3011        Set a value in the synchronized store
3012        """
3013        req = req.get('SetValueRequestBody', None)
3014        if not req:
3015            raise service_error(service_error.req,
3016                    "Bad request format (no SetValueRequestBody)")
3017       
3018        name = req.get('name', None)
3019        v = req.get('value', '')
3020
3021        if not name:
3022            raise service_error(service_error.req, "No name?")
3023
3024        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
3025
3026        if access_ok:
3027            try:
3028                self.synch_store.set_value(name, v)
3029                self.write_store()
3030                self.log.debug("[SetValue] set %s to %s" % (name, v))
3031            except synch_store.CollisionError:
3032                # Translate into a service_error
3033                raise service_error(service_error.req,
3034                        "Value already set: %s" %name)
3035            except synch_store.RevokedKeyError:
3036                # No more synch on this key
3037                raise service_error(service_error.federant, 
3038                        "Synch key %s revoked" % name)
3039                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
3040        else:
3041            raise service_error(service_error.access, "Access Denied",
3042                    proof=proof)
Note: See TracBrowser for help on using the repository browser.