source: fedd/federation/experiment_control.py @ 5ae9d94

compt_changes
Last change on this file since 5ae9d94 was 934dd99, checked in by Ted Faber <faber@…>, 13 years ago

Merge information from testbed objects into experiment descriptiuon

  • Property mode set to 100644
File size: 93.3 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3df9b33]16import os.path
17
[3441fe3]18import traceback
[c971895]19# For parsing visualization output and splitter output
20import xml.parsers.expat
[3441fe3]21
[6c57fe9]22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
[c573278]24from string import join
[6679c122]25
[db6b092]26from urlparse import urlparse
27from urllib2 import urlopen
28
[ec4fb42]29from util import *
[6bedbdba]30from deter import fedid, generate_fedid
[9460b1e]31from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]32from service_error import service_error
[2761484]33from synch_store import synch_store
[73e7f5c]34from experiment_partition import experiment_partition
[1d73342]35from experiment_control_legacy import experiment_control_legacy
[7206e5a]36from authorizer import abac_authorizer
[faea607]37from thread_pool import thread_pool, pooled_thread
[ab3d6c5]38from experiment_info import experiment_info, allocation_info, federated_service
[22a1a77]39from operation_status import operation_status
[6679c122]40
[6bedbdba]41from deter import topdl
[044dd20]42from deter import ip_allocator
43from deter import ip_addr
[f07fa49]44import list_log
[db6b092]45
[11a08b0]46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
[1d73342]53class experiment_control_local(experiment_control_legacy):
[0ea11af]54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
[79b6596]60
61    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]62   
[f069052]63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]65    call_StartSegment = service_caller('StartSegment')
[db974ed]66    call_TerminateSegment = service_caller('TerminateSegment')
[6e33086]67    call_InfoSegment = service_caller('InfoSegment')
[22a1a77]68    call_OperationSegment = service_caller('OperationSegment')
[5f6929a]69    call_Ns2Topdl = service_caller('Ns2Topdl')
[058f58e]70
[3f6bc5f]71    def __init__(self, config=None, auth=None):
[866c983]72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
[f07fa49]90        self.list_log = list_log.list_log
[866c983]91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
[6c57fe9]102        self.repodir = config.get("experiment_control", "repodir")
[7183b48]103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
[cc8d8e9]105
[866c983]106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
[35a4c01]110        self.nthreads = 10
[866c983]111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
[866c983]121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
[2761484]123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
[5f6929a]126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
[866c983]127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131
[175b444]132        dt = config.get("experiment_control", "direct_transit")
[7206e5a]133        self.auth_type = config.get('experiment_control', 'auth_type') \
134                or 'legacy'
135        self.auth_dir = config.get('experiment_control', 'auth_dir')
[6e33086]136        # XXX: document this!
137        self.info_cache_limit = \
138                config.getint('experiment_control', 'info_cache', 600)
[139e2e2]139        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
140        else: self.direct_transit = [ ]
[866c983]141        # NB for internal master/slave ops, not experiment setup
142        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]143
[db6b092]144        self.overrides = set([])
145        ovr = config.get('experiment_control', 'overrides')
146        if ovr:
147            for o in ovr.split(","):
148                o = o.strip()
149                if o.startswith('fedid:'): o = o[len('fedid:'):]
150                self.overrides.add(fedid(hexstr=o))
[ca489e8]151
[866c983]152        self.state = { }
153        self.state_lock = Lock()
154        self.tclsh = "/usr/local/bin/otclsh"
[5f6929a]155        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
[866c983]156                config.get("experiment_control", "tcl_splitter",
157                        "/usr/testbed/lib/ns2ir/parse.tcl")
158        mapdb_file = config.get("experiment_control", "mapdb")
159        self.trace_file = sys.stderr
160
161        self.def_expstart = \
162                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
163                "/tmp/federate";
164        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
165                "FEDDIR/hosts";
166        self.def_gwstart = \
167                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
168                "/tmp/bridge.log";
169        self.def_mgwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
171                "/tmp/bridge.log";
172        self.def_gwimage = "FBSD61-TUNNEL2";
173        self.def_gwtype = "pc";
174        self.local_access = { }
175
[7206e5a]176        if self.auth_type == 'legacy':
177            if auth:
178                self.auth = auth
179            else:
180                self.log.error( "[access]: No authorizer initialized, " +\
181                        "creating local one.")
182                auth = authorizer()
[5ecb9a3]183            self.get_access = self.legacy_get_access
[7206e5a]184        elif self.auth_type == 'abac':
185            self.auth = abac_authorizer(load=self.auth_dir)
186        else:
187            raise service_error(service_error.internal, 
188                    "Unknown auth_type: %s" % self.auth_type)
[866c983]189
190        if mapdb_file:
191            self.read_mapdb(mapdb_file)
192        else:
193            self.log.warn("[experiment_control] No testbed map, using defaults")
194            self.tbmap = { 
195                    'deter':'https://users.isi.deterlab.net:23235',
196                    'emulab':'https://users.isi.deterlab.net:23236',
197                    'ucb':'https://users.isi.deterlab.net:23237',
198                    }
199
200        # Grab saved state.  OK to do this w/o locking because it's read only
201        # and only one thread should be in existence that can see self.state at
202        # this point.
203        if self.state_filename:
204            self.read_state()
205
[2761484]206        if self.store_filename:
207            self.read_store()
208        else:
209            self.log.warning("No saved synch store")
210            self.synch_store = synch_store
211
[866c983]212        # Dispatch tables
213        self.soap_services = {\
[a3ad8bd]214                'New': soap_handler('New', self.new_experiment),
[e19b75c]215                'Create': soap_handler('Create', self.create_experiment),
[866c983]216                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
217                'Vis': soap_handler('Vis', self.get_vis),
218                'Info': soap_handler('Info', self.get_info),
[65f3f29]219                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[22a1a77]220                'Operation': soap_handler('Operation', self.do_operation),
[866c983]221                'Terminate': soap_handler('Terminate', 
[e19b75c]222                    self.terminate_experiment),
[2761484]223                'GetValue': soap_handler('GetValue', self.GetValue),
224                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]225        }
226
227        self.xmlrpc_services = {\
[a3ad8bd]228                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]229                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]230                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
231                'Vis': xmlrpc_handler('Vis', self.get_vis),
232                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]233                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]234                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]235                    self.terminate_experiment),
[22a1a77]236                'Operation': xmlrpc_handler('Operation', self.do_operation),
[2761484]237                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
238                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]239        }
[19cc408]240
[a97394b]241    # Call while holding self.state_lock
[eee2b2e]242    def write_state(self):
[866c983]243        """
244        Write a new copy of experiment state after copying the existing state
245        to a backup.
246
247        State format is a simple pickling of the state dictionary.
248        """
249        if os.access(self.state_filename, os.W_OK):
[40dd8c1]250            copy_file(self.state_filename, \
251                    "%s.bak" % self.state_filename)
[866c983]252        try:
253            f = open(self.state_filename, 'w')
254            pickle.dump(self.state, f)
[d3c8759]255        except EnvironmentError, e:
[866c983]256            self.log.error("Can't write file %s: %s" % \
257                    (self.state_filename, e))
258        except pickle.PicklingError, e:
259            self.log.error("Pickling problem: %s" % e)
260        except TypeError, e:
261            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]262
[2761484]263    @staticmethod
[29d5f7c]264    def get_alloc_ids(exp):
[2761484]265        """
[29d5f7c]266        Used by read_store and read state.  This used to be worse.
[2761484]267        """
268
[29d5f7c]269        return [ a.allocID for a in exp.get_all_allocations() ]
270
[2761484]271
[a97394b]272    # Call while holding self.state_lock
[eee2b2e]273    def read_state(self):
[866c983]274        """
275        Read a new copy of experiment state.  Old state is overwritten.
276
277        State format is a simple pickling of the state dictionary.
278        """
[cc8d8e9]279       
[866c983]280        try:
281            f = open(self.state_filename, "r")
282            self.state = pickle.load(f)
283            self.log.debug("[read_state]: Read state from %s" % \
284                    self.state_filename)
[d3c8759]285        except EnvironmentError, e:
[866c983]286            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
287                    % (self.state_filename, e))
288        except pickle.UnpicklingError, e:
289            self.log.warning(("[read_state]: No saved state: " + \
290                    "Unpickling failed: %s") % e)
291       
[cc8d8e9]292        for s in self.state.values():
[866c983]293            try:
[cc8d8e9]294
[29d5f7c]295                eid = s.fedid
[cc8d8e9]296                if eid : 
[7206e5a]297                    if self.auth_type == 'legacy':
298                        # XXX: legacy
299                        # Give the owner rights to the experiment
[29d5f7c]300                        #self.auth.set_attribute(s['owner'], eid)
[7206e5a]301                        # And holders of the eid as well
302                        self.auth.set_attribute(eid, eid)
303                        # allow overrides to control experiments as well
304                        for o in self.overrides:
305                            self.auth.set_attribute(o, eid)
306                        # Set permissions to allow reading of the software
307                        # repo, if any, as well.
308                        for a in self.get_alloc_ids(s):
309                            self.auth.set_attribute(a, 'repo/%s' % eid)
[cc8d8e9]310                else:
311                    raise KeyError("No experiment id")
[866c983]312            except KeyError, e:
313                self.log.warning("[read_state]: State ownership or identity " +\
314                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]315
[34bc05c]316    def read_mapdb(self, file):
[866c983]317        """
318        Read a simple colon separated list of mappings for the
319        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
[a11eda5]320        also adds testbeds to active if they include , active after
321        their name.
[866c983]322        """
323
324        self.tbmap = { }
[a11eda5]325        self.tbactive = set()
[866c983]326        lineno =0
327        try:
328            f = open(file, "r")
329            for line in f:
330                lineno += 1
331                line = line.strip()
332                if line.startswith('#') or len(line) == 0:
333                    continue
334                try:
335                    label, url = line.split(':', 1)
[a11eda5]336                    if ',' in label:
337                        label, act = label.split(',', 1)
338                        active = (act.strip() == 'active')
339                    else:
340                        active = False
[866c983]341                    self.tbmap[label] = url
[a11eda5]342                    if active: self.tbactive.add(label)
[866c983]343                except ValueError, e:
344                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
345                            "map db: %s %s" % (lineno, line, e))
[d3c8759]346        except EnvironmentError, e:
[866c983]347            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
348                    "open %s: %s" % (file, e))
[1ec5d4a]349        else:
350            f.close()
[2761484]351
352    def read_store(self):
353        try:
354            self.synch_store = synch_store()
355            self.synch_store.load(self.store_filename)
356            self.log.debug("[read_store]: Read store from %s" % \
357                    self.store_filename)
[d3c8759]358        except EnvironmentError, e:
[2761484]359            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
360                    % (self.state_filename, e))
361            self.synch_store = synch_store()
362
363        # Set the initial permissions on data in the store.  XXX: This ad hoc
364        # authorization attribute initialization is getting out of hand.
[7206e5a]365        # XXX: legacy
366        if self.auth_type == 'legacy':
367            for k in self.synch_store.all_keys():
368                try:
369                    if k.startswith('fedid:'):
370                        fid = fedid(hexstr=k[6:46])
371                        if self.state.has_key(fid):
372                            for a in self.get_alloc_ids(self.state[fid]):
373                                self.auth.set_attribute(a, k)
374                except ValueError, e:
375                    self.log.warn("Cannot deduce permissions for %s" % k)
[2761484]376
377
378    def write_store(self):
379        """
380        Write a new copy of synch_store after writing current state
381        to a backup.  We use the internal synch_store pickle method to avoid
382        incinsistent data.
383
384        State format is a simple pickling of the store.
385        """
386        if os.access(self.store_filename, os.W_OK):
387            copy_file(self.store_filename, \
388                    "%s.bak" % self.store_filename)
389        try:
390            self.synch_store.save(self.store_filename)
[d3c8759]391        except EnvironmentError, e:
[2761484]392            self.log.error("Can't write file %s: %s" % \
393                    (self.store_filename, e))
394        except TypeError, e:
395            self.log.error("Pickling problem (TypeError): %s" % e)
396
[cf0ff4f]397
398    def remove_dirs(self, dir):
399        """
400        Remove the directory tree and all files rooted at dir.  Log any errors,
401        but continue.
402        """
403        self.log.debug("[removedirs]: removing %s" % dir)
404        try:
405            for path, dirs, files in os.walk(dir, topdown=False):
406                for f in files:
407                    os.remove(os.path.join(path, f))
408                for d in dirs:
409                    os.rmdir(os.path.join(path, d))
410            os.rmdir(dir)
411        except EnvironmentError, e:
412            self.log.error("Error deleting directory tree in %s" % e);
413
414    @staticmethod
415    def make_temp_certfile(expcert, tmpdir):
416        """
417        make a protected copy of the access certificate so the experiment
418        controller can act as the experiment principal.  mkstemp is the most
419        secure way to do that. The directory should be created by
420        mkdtemp.  Return the filename.
421        """
422        if expcert and tmpdir:
423            try:
424                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
425                f = os.fdopen(certf, 'w')
426                print >> f, expcert
427                f.close()
428            except EnvironmentError, e:
429                raise service_error(service_error.internal, 
430                        "Cannot create temp cert file?")
431            return certfn
432        else:
433            return None
434
[866c983]435       
[6679c122]436    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]437        """
438        Generate a set of keys for the gateways to use to talk.
439
440        Keys are of type type and are stored in the required dest file.
441        """
442        valid_types = ("rsa", "dsa")
443        t = type.lower();
444        if t not in valid_types: raise ValueError
445        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
446
447        try:
448            trace = open("/dev/null", "w")
[d3c8759]449        except EnvironmentError:
[866c983]450            raise service_error(service_error.internal,
451                    "Cannot open /dev/null??");
452
453        # May raise CalledProcessError
454        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]455        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]456        if rv != 0:
457            raise service_error(service_error.internal, 
458                    "Cannot generate nonce ssh keys.  %s return code %d" \
459                            % (self.ssh_keygen, rv))
[6679c122]460
[3df9b33]461    def generate_seer_certs(self, destdir):
462        '''
463        Create a SEER ca cert and a node cert in destdir/ca.pem and
464        destdir/node.pem respectively.  These will be distributed throughout
465        the federated experiment.  This routine reports errors via
466        service_errors.
467        '''
468        openssl = '/usr/bin/openssl'
469        # All the filenames and parameters we need for openssl calls below
470        ca_key =os.path.join(destdir, 'ca.key') 
471        ca_pem = os.path.join(destdir, 'ca.pem')
472        node_key =os.path.join(destdir, 'node.key') 
473        node_pem = os.path.join(destdir, 'node.pem')
474        node_req = os.path.join(destdir, 'node.req')
475        node_signed = os.path.join(destdir, 'node.signed')
[9bde415]476        days = '%s' % (365 * 10)
[95be336]477        serial = '%s' % random.randint(0, 1<<16)
[3df9b33]478
479        try:
480            # Sequence of calls to create a CA key, create a ca cert, create a
481            # node key, node signing request, and finally a signed node
482            # certificate.
483            sequence = (
484                    (openssl, 'genrsa', '-out', ca_key, '1024'),
485                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
486                        ca_pem, '-days', days, '-subj', 
487                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
488                    (openssl, 'genrsa', '-out', node_key, '1024'),
489                    (openssl, 'req', '-new', '-key', node_key, '-out', 
490                        node_req, '-days', days, '-subj', 
491                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
492                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
[95be336]493                        '-set_serial', serial, '-req', '-in', node_req, 
[3df9b33]494                        '-out', node_signed, '-days', days),
495                )
496            # Do all that stuff; bail if there's an error, and push all the
497            # output to dev/null.
498            for cmd in sequence:
499                trace = open("/dev/null", "w")
500                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
501                if rv != 0:
502                    raise service_error(service_error.internal, 
503                            "Cannot generate SEER certs.  %s return code %d" \
504                                    % (' '.join(cmd), rv))
505            # Concatinate the node key and signed certificate into node.pem
506            f = open(node_pem, 'w')
507            for comp in (node_signed, node_key):
508                g = open(comp, 'r')
[9bde415]509                f.write(g.read())
[3df9b33]510                g.close()
511            f.close()
512
513            # Throw out intermediaries.
[95be336]514            for fn in (ca_key, node_key, node_req, node_signed):
[3df9b33]515                os.unlink(fn)
516
517        except EnvironmentError, e:
518            # Any difficulties with the file system wind up here
519            raise service_error(service_error.internal,
520                    "File error on  %s while creating SEER certs: %s" % \
521                            (e.filename, e.strerror))
522
523
524
[0d830de]525    def gentopo(self, str):
[866c983]526        """
[1d73342]527        Generate the topology data structure from the splitter's XML
[866c983]528        representation of it.
529
530        The topology XML looks like:
531            <experiment>
532                <nodes>
533                    <node><vname></vname><ips>ip1:ip2</ips></node>
534                </nodes>
535                <lans>
536                    <lan>
537                        <vname></vname><vnode></vnode><ip></ip>
538                        <bandwidth></bandwidth><member>node:port</member>
539                    </lan>
540                </lans>
541        """
542        class topo_parse:
543            """
544            Parse the topology XML and create the dats structure.
545            """
546            def __init__(self):
547                # Typing of the subelements for data conversion
548                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
549                self.int_subelements = ( 'bandwidth',)
550                self.float_subelements = ( 'delay',)
551                # The final data structure
552                self.nodes = [ ]
553                self.lans =  [ ]
554                self.topo = { \
555                        'node': self.nodes,\
556                        'lan' : self.lans,\
557                    }
558                self.element = { }  # Current element being created
559                self.chars = ""     # Last text seen
560
561            def end_element(self, name):
562                # After each sub element the contents is added to the current
563                # element or to the appropriate list.
564                if name == 'node':
565                    self.nodes.append(self.element)
566                    self.element = { }
567                elif name == 'lan':
568                    self.lans.append(self.element)
569                    self.element = { }
570                elif name in self.str_subelements:
571                    self.element[name] = self.chars
572                    self.chars = ""
573                elif name in self.int_subelements:
574                    self.element[name] = int(self.chars)
575                    self.chars = ""
576                elif name in self.float_subelements:
577                    self.element[name] = float(self.chars)
578                    self.chars = ""
579
580            def found_chars(self, data):
581                self.chars += data.rstrip()
582
583
584        tp = topo_parse();
585        parser = xml.parsers.expat.ParserCreate()
586        parser.EndElementHandler = tp.end_element
587        parser.CharacterDataHandler = tp.found_chars
588
589        parser.Parse(str)
590
591        return tp.topo
592       
[0d830de]593
594    def genviz(self, topo):
[866c983]595        """
596        Generate the visualization the virtual topology
597        """
598
599        neato = "/usr/local/bin/neato"
600        # These are used to parse neato output and to create the visualization
601        # file.
[0ac1934]602        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]603        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
604                "%s</type></node>"
605
606        try:
607            # Node names
608            nodes = [ n['vname'] for n in topo['node'] ]
609            topo_lans = topo['lan']
[cc8d8e9]610        except KeyError, e:
611            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]612
613        lans = { }
614        links = { }
615
616        # Walk through the virtual topology, organizing the connections into
617        # 2-node connections (links) and more-than-2-node connections (lans).
618        # When a lan is created, it's added to the list of nodes (there's a
619        # node in the visualization for the lan).
620        for l in topo_lans:
621            if links.has_key(l['vname']):
622                if len(links[l['vname']]) < 2:
623                    links[l['vname']].append(l['vnode'])
624                else:
625                    nodes.append(l['vname'])
626                    lans[l['vname']] = links[l['vname']]
627                    del links[l['vname']]
628                    lans[l['vname']].append(l['vnode'])
629            elif lans.has_key(l['vname']):
630                lans[l['vname']].append(l['vnode'])
631            else:
632                links[l['vname']] = [ l['vnode'] ]
633
634
635        # Open up a temporary file for dot to turn into a visualization
636        try:
637            df, dotname = tempfile.mkstemp()
638            dotfile = os.fdopen(df, 'w')
[d3c8759]639        except EnvironmentError:
[866c983]640            raise service_error(service_error.internal,
641                    "Failed to open file in genviz")
642
[db6b092]643        try:
644            dnull = open('/dev/null', 'w')
[d3c8759]645        except EnvironmentError:
[db6b092]646            service_error(service_error.internal,
[886307f]647                    "Failed to open /dev/null in genviz")
648
[866c983]649        # Generate a dot/neato input file from the links, nodes and lans
650        try:
651            print >>dotfile, "graph G {"
652            for n in nodes:
653                print >>dotfile, '\t"%s"' % n
654            for l in links.keys():
655                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
656            for l in lans.keys():
657                for n in lans[l]:
658                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
659            print >>dotfile, "}"
660            dotfile.close()
661        except TypeError:
662            raise service_error(service_error.internal,
663                    "Single endpoint link in vtopo")
[d3c8759]664        except EnvironmentError:
[866c983]665            raise service_error(service_error.internal, "Cannot write dot file")
666
667        # Use dot to create a visualization
[5954004]668        try:
669            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
670                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
671                stderr=dnull, close_fds=True)
672        except EnvironmentError:
673            raise service_error(service_error.internal, 
674                    "Cannot generate visualization: is graphviz available?")
[db6b092]675        dnull.close()
[866c983]676
677        # Translate dot to vis format
678        vis_nodes = [ ]
679        vis = { 'node': vis_nodes }
680        for line in dot.stdout:
681            m = vis_re.match(line)
682            if m:
683                vn = m.group(1)
684                vis_node = {'name': vn, \
685                        'x': float(m.group(2)),\
686                        'y' : float(m.group(3)),\
687                    }
688                if vn in links.keys() or vn in lans.keys():
689                    vis_node['type'] = 'lan'
690                else:
691                    vis_node['type'] = 'node'
692                vis_nodes.append(vis_node)
693        rv = dot.wait()
694
695        os.remove(dotname)
[1fed67b]696        # XXX: graphviz seems to use low return codes for warnings, like
697        # "couldn't find font"
698        if rv < 2 : return vis
[866c983]699        else: return None
[d0ae12d]700
[db6b092]701
[725c55d]702    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
703            cert_pwd=None):
[e19b75c]704        """
705        Release access to testbed through fedd
706        """
[db6b092]707
[fd07c48]708        if not uri and tbmap:
709            uri = tbmap.get(tb, None)
[e19b75c]710        if not uri:
[69692a9]711            raise service_error(service_error.server_config, 
[e19b75c]712                    "Unknown testbed: %s" % tb)
[db6b092]713
[e19b75c]714        if self.local_access.has_key(uri):
715            resp = self.local_access[uri].ReleaseAccess(\
[29d5f7c]716                    { 'ReleaseAccessRequestBody' : 
717                        {'allocID': {'fedid': aid}},}, 
[725c55d]718                    fedid(file=cert_file))
[e19b75c]719            resp = { 'ReleaseAccessResponseBody': resp } 
720        else:
[29d5f7c]721            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
[725c55d]722                    cert_file, cert_pwd, self.trusted_certs)
[db6b092]723
[e19b75c]724        # better error coding
[db6b092]725
[5f6929a]726    def remote_ns2topdl(self, uri, desc):
[db6b092]727
[e19b75c]728        req = {
729                'description' : { 'ns2description': desc },
[db6b092]730            }
731
[5f6929a]732        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
[e19b75c]733                self.trusted_certs)
734
[5f6929a]735        if r.has_key('Ns2TopdlResponseBody'):
736            r = r['Ns2TopdlResponseBody']
[1dcaff4]737            ed = r.get('experimentdescription', None)
738            if ed.has_key('topdldescription'):
739                return topdl.Topology(**ed['topdldescription'])
[e19b75c]740            else:
741                raise service_error(service_error.protocol, 
742                        "Bad splitter response (no output)")
743        else:
744            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]745
[e19b75c]746    class start_segment:
[fd556d1]747        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]748                cert_pwd=None, trusted_certs=None, caller=None,
749                log_collector=None):
[cc8d8e9]750            self.log = log
751            self.debug = debug
752            self.cert_file = cert_file
753            self.cert_pwd = cert_pwd
754            self.trusted_certs = None
755            self.caller = caller
[fd556d1]756            self.testbed = testbed
[f07fa49]757            self.log_collector = log_collector
[69692a9]758            self.response = None
[b4b19c7]759            self.node = { }
[9e5e251]760            self.subs = { }
[934dd99]761            self.tb = { }
[e83f2f2]762            self.proof = None
[b4b19c7]763
764        def make_map(self, resp):
[29d5f7c]765            if 'segmentdescription' not in resp  or \
766                    'topdldescription' not in resp['segmentdescription']:
767                self.log.warn('No topology returned from startsegment')
768                return 
769
770            top = topdl.Topology(
771                    **resp['segmentdescription']['topdldescription'])
772
773            for e in top.elements:
774                if isinstance(e, topdl.Computer):
[1ae1aa2]775                    self.node[e.name] = e
[934dd99]776                elif isinstance(e, topdl.Testbed):
777                    self.tb[e.uri] = e
[9e5e251]778            for s in top.substrates:
779                self.subs[s.name] = s
[cc8d8e9]780
[43197eb]781        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
[cc8d8e9]782            req = {
783                    'allocID': { 'fedid' : aid }, 
784                    'segmentdescription': { 
785                        'topdldescription': topo.to_dict(),
786                    },
787                }
[e02cd14]788
789            if connInfo:
790                req['connection'] = connInfo
[43197eb]791
792            import_svcs = [ s for m in masters.values() \
793                    for s in m if self.testbed in s.importers]
794
795            if import_svcs or self.testbed in masters:
796                req['service'] = []
797
798            for s in import_svcs:
799                for r in s.reqs:
800                    sr = copy.deepcopy(r)
801                    sr['visibility'] = 'import';
802                    req['service'].append(sr)
803
804            for s in masters.get(self.testbed, []):
805                for r in s.reqs:
806                    sr = copy.deepcopy(r)
807                    sr['visibility'] = 'export';
808                    req['service'].append(sr)
809
[6c57fe9]810            if attrs:
811                req['fedAttr'] = attrs
[cc8d8e9]812
[fd556d1]813            try:
[13e3dd2]814                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]815                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
816                        self.trusted_certs)
[f07fa49]817                if r.has_key('StartSegmentResponseBody'):
818                    lval = r['StartSegmentResponseBody'].get('allocationLog',
819                            None)
820                    if lval and self.log_collector:
821                        for line in  lval.splitlines(True):
822                            self.log_collector.write(line)
[b4b19c7]823                    self.make_map(r['StartSegmentResponseBody'])
[e83f2f2]824                    if 'proof' in r: self.proof = r['proof']
[69692a9]825                    self.response = r
[f07fa49]826                else:
827                    raise service_error(service_error.internal, 
828                            "Bad response!?: %s" %r)
[fd556d1]829                return True
830            except service_error, e:
831                self.log.error("Start segment failed on %s: %s" % \
832                        (self.testbed, e))
833                return False
[cc8d8e9]834
835
[5ae3857]836
[e19b75c]837    class terminate_segment:
[fd556d1]838        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]839                cert_pwd=None, trusted_certs=None, caller=None):
840            self.log = log
841            self.debug = debug
842            self.cert_file = cert_file
843            self.cert_pwd = cert_pwd
844            self.trusted_certs = None
845            self.caller = caller
[fd556d1]846            self.testbed = testbed
[5ae3857]847
848        def __call__(self, uri, aid ):
849            req = {
[29d5f7c]850                    'allocID': {'fedid': aid }, 
[5ae3857]851                }
[a69de97]852            self.log.info("Calling terminate segment")
[fd556d1]853            try:
854                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
855                        self.trusted_certs)
[a69de97]856                self.log.info("Terminate segment succeeded")
[fd556d1]857                return True
858            except service_error, e:
859                self.log.error("Terminate segment failed on %s: %s" % \
860                        (self.testbed, e))
861                return False
[db6b092]862
[6e33086]863    class info_segment(start_segment):
864        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
865                cert_pwd=None, trusted_certs=None, caller=None,
866                log_collector=None):
867            experiment_control_local.start_segment.__init__(self, debug, 
868                    log, testbed, cert_file, cert_pwd, trusted_certs, 
869                    caller, log_collector)
870
871        def __call__(self, uri, aid):
872            req = { 'allocID': { 'fedid' : aid } }
873
874            try:
875                self.log.debug("Calling InfoSegment at %s " % uri)
876                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
877                        self.trusted_certs)
878                if r.has_key('InfoSegmentResponseBody'):
879                    self.make_map(r['InfoSegmentResponseBody'])
880                    if 'proof' in r: self.proof = r['proof']
881                    self.response = r
882                else:
883                    raise service_error(service_error.internal, 
884                            "Bad response!?: %s" %r)
885                return True
886            except service_error, e:
887                self.log.error("Info segment failed on %s: %s" % \
888                        (self.testbed, e))
889                return False
890
[22a1a77]891    class operation_segment:
892        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
893                cert_pwd=None, trusted_certs=None, caller=None,
894                log_collector=None):
895            self.log = log
896            self.debug = debug
897            self.cert_file = cert_file
898            self.cert_pwd = cert_pwd
899            self.trusted_certs = None
900            self.caller = caller
901            self.testbed = testbed
902            self.status = None
903
904        def __call__(self, uri, aid, op, targets, params):
905            req = { 
906                    'allocID': { 'fedid' : aid },
907                    'operation': op,
908                    'target': targets,
909                    }
910            if params: req['parameter'] = params
911
912
913            try:
914                self.log.debug("Calling OperationSegment at %s " % uri)
915                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
916                        self.trusted_certs)
917                if 'OperationSegmentResponseBody' in r:
918                    r = r['OperationSegmentResponseBody']
919                    if 'status' in r:
920                        self.status = r['status']
921                else:
922                    raise service_error(service_error.internal, 
923                            "Bad response!?: %s" %r)
924                return True
925            except service_error, e:
926                self.log.error("Operation segment failed on %s: %s" % \
927                        (self.testbed, e))
928                return False
929
[6e33086]930    def annotate_topology(self, top, data):
[f671ef7]931        # These routines do various parts of the annotation
932        def add_new_names(nl, l):
933            """ add any names in nl to the list in l """
934            for n in nl:
935                if n not in l: l.append(n)
936       
937        def merge_services(ne, e):
938            for ns in ne.service:
939                # NB: the else is on the for
940                for s in e.service:
941                    if ns.name == s.name:
942                        s.importer = ns.importer
943                        s.param = ns.param
944                        s.description = ns.description
945                        s.status = ns.status
946                        break
947                else:
948                    e.service.append(ns)
949       
950        def merge_oses(ne, e):
951            """
952            Merge the operating system entries of ne into e
953            """
954            for nos in ne.os:
955                # NB: the else is on the for
956                for os in e.os:
[7aaa8dc]957                    if nos.name == os.name:
958                        os.version = nos.version
959                        os.version = nos.distribution
960                        os.version = nos.distributionversion
[f671ef7]961                        for a in nos.attribute:
[db3da0b]962                            if os.get_attribute(a.attribute):
963                                os.remove_attribute(a.attribute)
[7aaa8dc]964                            os.set_attribute(a.attribute, a.value)
[f671ef7]965                        break
966                else:
[db3da0b]967                    # If both nodes have one OS, this is a replacement
968                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
969                    else: e.os.append(nos)
[6e33086]970
971        # Annotate the topology with embedding info
972        for e in top.elements:
973            if isinstance(e, topdl.Computer):
974                for s in data:
[f671ef7]975                    ne = s.node.get(e.name, None)
976                    if ne is not None:
977                        add_new_names(ne.localname, e.localname)
978                        e.status = ne.status
979                        merge_services(ne, e)
980                        add_new_names(ne.operation, e.operation)
981                        if ne.os: merge_oses(ne, e)
[6e33086]982                        break
[934dd99]983            elif isinstance(e,topdl.Testbed):
984                for s in data:
985                    ne = s.tb.get(e.uri, None)
986                    if ne is not None:
987                        add_new_names(ne.localname, e.localname)
988                        add_new_names(ne.operation, e.operation)
989                        merge_services(ne, e)
990                        for a in ne.attribute:
991                            e.set_attribute(a.attribute, a.value)
[9e5e251]992        # Annotate substrates
993        for s in top.substrates:
994            for d in data:
995                ss = d.subs.get(s.name, None)
996                if ss is not None:
997                    if ss.capacity is not None:
998                        s.capacity = ss.capacity
999                    if s.latency is not None:
1000                        s.latency = ss.latency
[6e33086]1001
1002
1003
[43197eb]1004    def allocate_resources(self, allocated, masters, eid, expid, 
[b4b19c7]1005            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
[c573278]1006            attrs=None, connInfo={}, tbmap=None, expcert=None):
[69692a9]1007
[cc8d8e9]1008        started = { }           # Testbeds where a sub-experiment started
1009                                # successfully
1010
1011        # XXX
1012        fail_soft = False
1013
[fd07c48]1014        if tbmap is None: tbmap = { }
1015
[cc8d8e9]1016        log = alloc_log or self.log
1017
[faea607]1018        tp = thread_pool(self.nthreads)
[cc8d8e9]1019        threads = [ ]
[b4b19c7]1020        starters = [ ]
[cc8d8e9]1021
[c573278]1022        if expcert:
1023            cert = expcert
1024            pw = None
1025        else:
1026            cert = self.cert_file
[822d31b]1027            pw = self.cert_pwd
[c573278]1028
[109a32a]1029        for tb in allocated.keys():
1030            # Create and start a thread to start the segment, and save it
1031            # to get the return value later
[ab847bc]1032            tb_attrs = copy.copy(attrs)
[faea607]1033            tp.wait_for_slot()
[9294673]1034            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
[ab847bc]1035            base, suffix = split_testbed(tb)
1036            if suffix:
1037                tb_attrs.append({'attribute': 'experiment_name', 
[175b444]1038                    'value': "%s-%s" % (eid, suffix)})
[ab847bc]1039            else:
1040                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
[109a32a]1041            if not uri:
1042                raise service_error(service_error.internal, 
1043                        "Unknown testbed %s !?" % tb)
1044
[9294673]1045            aid = tbparams[tb].allocID
1046            if not aid:
[cc8d8e9]1047                raise service_error(service_error.internal, 
1048                        "No alloc id for testbed %s !?" % tb)
1049
[b4b19c7]1050            s = self.start_segment(log=log, debug=self.debug,
[c573278]1051                    testbed=tb, cert_file=cert,
1052                    cert_pwd=pw, trusted_certs=self.trusted_certs,
[b4b19c7]1053                    caller=self.call_StartSegment,
1054                    log_collector=log_collector)
1055            starters.append(s)
[faea607]1056            t  = pooled_thread(\
[b4b19c7]1057                    target=s, name=tb,
[ab847bc]1058                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
[faea607]1059                    pdata=tp, trace_file=self.trace_file)
[69692a9]1060            threads.append(t)
1061            t.start()
[cc8d8e9]1062
[109a32a]1063        # Wait until all finish (keep pinging the log, though)
1064        mins = 0
[dadc4da]1065        revoked = False
[faea607]1066        while not tp.wait_for_all_done(60.0):
[109a32a]1067            mins += 1
1068            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1069                    % mins)
[dadc4da]1070            if not revoked and \
[f52f5df]1071                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1072                # a testbed has failed.  Revoke this experiment's
1073                # synchronizarion values so that sub experiments will not
1074                # deadlock waiting for synchronization that will never happen
1075                self.log.info("A subexperiment has failed to swap in, " + \
1076                        "revoking synch keys")
1077                var_key = "fedid:%s" % expid
1078                for k in self.synch_store.all_keys():
1079                    if len(k) > 45 and k[0:46] == var_key:
1080                        self.synch_store.revoke_key(k)
1081                revoked = True
[69692a9]1082
[cc8d8e9]1083        failed = [ t.getName() for t in threads if not t.rv ]
1084        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1085
[cc8d8e9]1086        # If one failed clean up, unless fail_soft is set
[32e7d93]1087        if failed:
[cc8d8e9]1088            if not fail_soft:
[faea607]1089                tp.clear()
[cc8d8e9]1090                for tb in succeeded:
1091                    # Create and start a thread to stop the segment
[faea607]1092                    tp.wait_for_slot()
[9294673]1093                    uri = tbparams[tb].uri
[faea607]1094                    t  = pooled_thread(\
[32e7d93]1095                            target=self.terminate_segment(log=log,
[fd556d1]1096                                testbed=tb,
[696a689]1097                                cert_file=cert, 
1098                                cert_pwd=pw,
[32e7d93]1099                                trusted_certs=self.trusted_certs,
1100                                caller=self.call_TerminateSegment),
[9294673]1101                            args=(uri, tbparams[tb].allocID),
[32e7d93]1102                            name=tb,
[faea607]1103                            pdata=tp, trace_file=self.trace_file)
[cc8d8e9]1104                    t.start()
[f52f5df]1105                # Wait until all finish (if any are being stopped)
1106                if succeeded:
[faea607]1107                    tp.wait_for_all_done()
[cc8d8e9]1108
1109                # release the allocations
1110                for tb in tbparams.keys():
[725c55d]1111                    try:
[9294673]1112                        self.release_access(tb, tbparams[tb].allocID, 
1113                                tbmap=tbmap, uri=tbparams[tb].uri,
[696a689]1114                                cert_file=cert, cert_pwd=pw)
[725c55d]1115                    except service_error, e:
1116                        self.log.warn("Error releasing access: %s" % e.desc)
[cc8d8e9]1117                # Remove the placeholder
1118                self.state_lock.acquire()
[29d5f7c]1119                self.state[eid].status = 'failed'
[6e33086]1120                self.state[eid].updated()
[cc8d8e9]1121                if self.state_filename: self.write_state()
1122                self.state_lock.release()
[05e8da8]1123                # Remove the repo dir
1124                self.remove_dirs("%s/%s" %(self.repodir, expid))
1125                # Walk up tmpdir, deleting as we go
1126                if self.cleanup:
1127                    self.remove_dirs(tmpdir)
1128                else:
1129                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1130
[cc8d8e9]1131
1132                log.error("Swap in failed on %s" % ",".join(failed))
1133                return
1134        else:
[29d5f7c]1135            # Walk through the successes and gather the proofs
[e83f2f2]1136            proofs = { }
[b4b19c7]1137            for s in starters:
[e83f2f2]1138                if s.proof:
1139                    proofs[s.testbed] = s.proof
[6e33086]1140            self.annotate_topology(top, starters)
[cc8d8e9]1141            log.info("[start_segment]: Experiment %s active" % eid)
1142
1143
1144        # Walk up tmpdir, deleting as we go
[69692a9]1145        if self.cleanup:
[05e8da8]1146            self.remove_dirs(tmpdir)
[69692a9]1147        else:
1148            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1149
[b4b19c7]1150        # Insert the experiment into our state and update the disk copy.
[cc8d8e9]1151        self.state_lock.acquire()
[29d5f7c]1152        self.state[expid].status = 'active'
[cc8d8e9]1153        self.state[eid] = self.state[expid]
[29d5f7c]1154        self.state[eid].top = top
[6e33086]1155        self.state[eid].updated()
[e83f2f2]1156        # Append startup proofs
[29d5f7c]1157        for f in self.state[eid].get_all_allocations():
1158            if f.tb in proofs:
1159                f.proof.append(proofs[f.tb])
[e83f2f2]1160
[cc8d8e9]1161        if self.state_filename: self.write_state()
1162        self.state_lock.release()
1163        return
1164
1165
[895a133]1166    def add_kit(self, e, kit):
1167        """
1168        Add a Software object created from the list of (install, location)
1169        tuples passed as kit  to the software attribute of an object e.  We
1170        do this enough to break out the code, but it's kind of a hack to
1171        avoid changing the old tuple rep.
1172        """
1173
1174        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1175
1176        if isinstance(e.software, list): e.software.extend(s)
1177        else: e.software = s
1178
[913dc7a]1179    def append_experiment_authorization(self, expid, attrs, 
1180            need_state_lock=True):
1181        """
1182        Append the authorization information to system state
1183        """
1184
1185        for p, a in attrs:
1186            self.auth.set_attribute(p, a)
1187        self.auth.save()
1188
1189        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1190        # XXX: really a no op?
1191        #self.state[expid]['auth'].update(attrs)
[913dc7a]1192        if self.state_filename: self.write_state()
1193        if need_state_lock: self.state_lock.release()
1194
[a96d946]1195    def clear_experiment_authorization(self, expid, need_state_lock=True):
[913dc7a]1196        """
1197        Attrs is a set of attribute principal pairs that need to be removed
1198        from the authenticator.  Remove them and save the authenticator.
1199        """
1200
1201        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1202        # XXX: should be a no-op
1203        #if expid in self.state and 'auth' in self.state[expid]:
1204            #for p, a in self.state[expid]['auth']:
1205                #self.auth.unset_attribute(p, a)
1206            #self.state[expid]['auth'] = set()
[913dc7a]1207        if self.state_filename: self.write_state()
1208        if need_state_lock: self.state_lock.release()
[b67fd22]1209        self.auth.save()
[913dc7a]1210
[895a133]1211
[b4b19c7]1212    def create_experiment_state(self, fid, req, expid, expcert,
[a3ad8bd]1213            state='starting'):
[895a133]1214        """
1215        Create the initial entry in the experiment's state.  The expid and
1216        expcert are the experiment's fedid and certifacte that represents that
1217        ID, which are installed in the experiment state.  If the request
1218        includes a suggested local name that is used if possible.  If the local
1219        name is already taken by an experiment owned by this user that has
[a3ad8bd]1220        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1221        valid localname is found.  The generated local name is returned.
1222        """
1223
1224        if req.has_key('experimentID') and \
1225                req['experimentID'].has_key('localname'):
1226            overwrite = False
1227            eid = req['experimentID']['localname']
1228            # If there's an old failed experiment here with the same local name
1229            # and accessible by this user, we'll overwrite it, otherwise we'll
1230            # fall through and do the collision avoidance.
1231            old_expid = self.get_experiment_fedid(eid)
[74572ba]1232            if old_expid:
1233                users_experiment = True
1234                try:
1235                    self.check_experiment_access(fid, old_expid)
1236                except service_error, e:
1237                    if e.code == service_error.access: users_experiment = False
1238                    else: raise e
1239                if users_experiment:
1240                    self.state_lock.acquire()
[29d5f7c]1241                    status = self.state[eid].status
[74572ba]1242                    if status and status == 'failed':
1243                        # remove the old access attributes
1244                        self.clear_experiment_authorization(eid,
1245                                need_state_lock=False)
1246                        overwrite = True
1247                        del self.state[eid]
1248                        del self.state[old_expid]
1249                    self.state_lock.release()
[6031c9d]1250                else:
1251                    self.log.info('Experiment %s exists, ' % eid + \
1252                            'but this user cannot access it')
[895a133]1253            self.state_lock.acquire()
1254            while (self.state.has_key(eid) and not overwrite):
1255                eid += random.choice(string.ascii_letters)
1256            # Initial state
[29d5f7c]1257            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1258                    identity=expcert)
[895a133]1259            self.state[expid] = self.state[eid]
[913dc7a]1260            if self.state_filename: self.write_state()
1261            self.state_lock.release()
[895a133]1262        else:
1263            eid = self.exp_stem
1264            for i in range(0,5):
1265                eid += random.choice(string.ascii_letters)
1266            self.state_lock.acquire()
1267            while (self.state.has_key(eid)):
1268                eid = self.exp_stem
1269                for i in range(0,5):
1270                    eid += random.choice(string.ascii_letters)
1271            # Initial state
[29d5f7c]1272            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1273                    identity=expcert)
[895a133]1274            self.state[expid] = self.state[eid]
[913dc7a]1275            if self.state_filename: self.write_state()
1276            self.state_lock.release()
1277
1278        # Let users touch the state.  Authorize this fid and the expid itself
1279        # to touch the experiment, as well as allowing th eoverrides.
1280        self.append_experiment_authorization(eid, 
1281                set([(fid, expid), (expid,expid)] + \
1282                        [ (o, expid) for o in self.overrides]))
[895a133]1283
1284        return eid
1285
1286
1287    def allocate_ips_to_topo(self, top):
1288        """
[69692a9]1289        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1290        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1291        created and returned as a list of hostfiles entries.  We also return
1292        the allocator, because we may need to allocate IPs to portals
1293        (specifically DRAGON portals).
[895a133]1294        """
1295        subs = sorted(top.substrates, 
1296                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1297                reverse=True)
1298        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1299        ifs = { }
1300        hosts = [ ]
1301
1302        for idx, s in enumerate(subs):
[289ff7e]1303            net_size = len(s.interfaces)+2
1304
1305            a = ips.allocate(net_size)
[895a133]1306            if a :
1307                base, num = a
[289ff7e]1308                if num < net_size: 
[895a133]1309                    raise service_error(service_error.internal,
1310                            "Allocator returned wrong number of IPs??")
1311            else:
1312                raise service_error(service_error.req, 
1313                        "Cannot allocate IP addresses")
[062b991]1314            mask = ips.min_alloc
1315            while mask < net_size:
1316                mask *= 2
[289ff7e]1317
[062b991]1318            netmask = ((2**32-1) ^ (mask-1))
[895a133]1319
1320            base += 1
1321            for i in s.interfaces:
1322                i.attribute.append(
1323                        topdl.Attribute('ip4_address', 
1324                            "%s" % ip_addr(base)))
[289ff7e]1325                i.attribute.append(
1326                        topdl.Attribute('ip4_netmask', 
1327                            "%s" % ip_addr(int(netmask))))
1328
[1e7f268]1329                hname = i.element.name
[895a133]1330                if ifs.has_key(hname):
1331                    hosts.append("%s\t%s-%s %s-%d" % \
1332                            (ip_addr(base), hname, s.name, hname,
1333                                ifs[hname]))
1334                else:
1335                    ifs[hname] = 0
1336                    hosts.append("%s\t%s-%s %s-%d %s" % \
1337                            (ip_addr(base), hname, s.name, hname,
1338                                ifs[hname], hname))
1339
1340                ifs[hname] += 1
1341                base += 1
[69692a9]1342        return hosts, ips
[895a133]1343
[1d73342]1344    def get_access_to_testbeds(self, testbeds, fid, allocated, 
[725c55d]1345            tbparam, masters, tbmap, expid=None, expcert=None):
[6e63513]1346        for tb in testbeds:
[1d73342]1347            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
[c573278]1348                    expcert)
[6e63513]1349            allocated[tb] = 1
1350
[1d73342]1351    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1352            expcert=None):
[6e63513]1353        """
1354        Get access to testbed through fedd and set the parameters for that tb
1355        """
1356        def get_export_project(svcs):
1357            """
1358            Look through for the list of federated_service for this testbed
1359            objects for a project_export service, and extract the project
1360            parameter.
1361            """
1362
1363            pe = [s for s in svcs if s.name=='project_export']
1364            if len(pe) == 1:
1365                return pe[0].params.get('project', None)
1366            elif len(pe) == 0:
1367                return None
1368            else:
1369                raise service_error(service_error.req,
1370                        "More than one project export is not supported")
1371
[d0912be]1372        def add_services(svcs, type, slist, keys):
[63c6664]1373            """
[d0912be]1374            Add the given services to slist.  type is import or export.  Also
1375            add a mapping entry from the assigned id to the original service
1376            record.
[63c6664]1377            """
1378            for i, s in enumerate(svcs):
1379                idx = '%s%d' % (type, i)
[d0912be]1380                keys[idx] = s
[63c6664]1381                sr = {'id': idx, 'name': s.name, 'visibility': type }
1382                if s.params:
1383                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1384                            for k, v in s.params.items()]
1385                slist.append(sr)
1386
[6e63513]1387        uri = tbmap.get(testbed_base(tb), None)
1388        if not uri:
1389            raise service_error(service_error.server_config, 
1390                    "Unknown testbed: %s" % tb)
1391
1392        export_svcs = masters.get(tb,[])
1393        import_svcs = [ s for m in masters.values() \
1394                for s in m \
1395                    if tb in s.importers ]
1396
1397        export_project = get_export_project(export_svcs)
1398        # Compose the credential list so that IDs come before attributes
1399        creds = set()
1400        keys = set()
[c573278]1401        certs = self.auth.get_creds_for_principal(fid)
[a0119a1]1402        # Append credenials about this experiment controller - e.g. that it is
1403        # trusted.
1404        certs.update(self.auth.get_creds_for_principal(
1405            fedid(file=self.cert_file)))
[c573278]1406        if expid:
1407            certs.update(self.auth.get_creds_for_principal(expid))
1408        for c in certs:
[6e63513]1409            keys.add(c.issuer_cert())
1410            creds.add(c.attribute_cert())
1411        creds = list(keys) + list(creds)
1412
[c573278]1413        if expcert: cert, pw = expcert, None
1414        else: cert, pw = self.cert_file, self.cert_pw
1415
[6e63513]1416        # Request credentials
1417        req = {
1418                'abac_credential': creds,
1419            }
1420        # Make the service request from the services we're importing and
1421        # exporting.  Keep track of the export request ids so we can
1422        # collect the resulting info from the access response.
1423        e_keys = { }
1424        if import_svcs or export_svcs:
[63c6664]1425            slist = []
[d0912be]1426            add_services(import_svcs, 'import', slist, e_keys)
1427            add_services(export_svcs, 'export', slist, e_keys)
[63c6664]1428            req['service'] = slist
[6e63513]1429
1430        if self.local_access.has_key(uri):
1431            # Local access call
1432            req = { 'RequestAccessRequestBody' : req }
1433            r = self.local_access[uri].RequestAccess(req, 
1434                    fedid(file=self.cert_file))
1435            r = { 'RequestAccessResponseBody' : r }
1436        else:
[c573278]1437            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
[6e63513]1438
[725c55d]1439        if r.has_key('RequestAccessResponseBody'):
1440            # Through to here we have a valid response, not a fault.
1441            # Access denied is a fault, so something better or worse than
1442            # access denied has happened.
1443            r = r['RequestAccessResponseBody']
1444            self.log.debug("[get_access] Access granted")
1445        else:
1446            raise service_error(service_error.protocol,
1447                        "Bad proxy response")
[e83f2f2]1448        if 'proof' not in r:
1449            raise service_error(service_error.protocol,
1450                        "Bad access response (no access proof)")
[ab3d6c5]1451
[9294673]1452        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
[ab3d6c5]1453                tb=tb, uri=uri, proof=[r['proof']], 
1454                services=masters.get(tb, None))
[6e63513]1455
1456        # Collect the responses corresponding to the services this testbed
1457        # exports.  These will be the service requests that we will include in
1458        # the start segment requests (with appropriate visibility values) to
1459        # import and export the segments.
1460        for s in r.get('service', []):
1461            id = s.get('id', None)
[ab3d6c5]1462            # Note that this attaches the response to the object in the masters
1463            # data structure.  (The e_keys index disappears when this fcn
1464            # returns)
[6e63513]1465            if id and id in e_keys:
1466                e_keys[id].reqs.append(s)
1467
1468        # Add attributes to parameter space.  We don't allow attributes to
1469        # overlay any parameters already installed.
1470        for a in r.get('fedAttr', []):
1471            try:
[9294673]1472                if a['attribute']:
1473                    tbparam[tb].set_attribute(a['attribute'], a['value'])
[6e63513]1474            except KeyError:
1475                self.log.error("Bad attribute in response: %s" % a)
1476
1477
[7fe81be]1478    def split_topology(self, top, topo, testbeds):
[895a133]1479        """
[e02cd14]1480        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1481        """
1482        for tb in testbeds:
1483            topo[tb] = top.clone()
[7fe81be]1484            # copy in for loop allows deletions from the original
1485            for e in [ e for e in topo[tb].elements]:
[895a133]1486                etb = e.get_attribute('testbed')
[7fe81be]1487                # NB: elements without a testbed attribute won't appear in any
1488                # sub topologies. 
1489                if not etb or etb != tb:
[895a133]1490                    for i in e.interface:
1491                        for s in i.subs:
1492                            try:
1493                                s.interfaces.remove(i)
1494                            except ValueError:
1495                                raise service_error(service_error.internal,
1496                                        "Can't remove interface??")
[7fe81be]1497                    topo[tb].elements.remove(e)
[895a133]1498            topo[tb].make_indices()
1499
[2627eb3]1500    def confirm_software(self, top):
1501        """
1502        Make sure that the software to be loaded in the topo is all available
1503        before we begin making access requests, etc.  This is a subset of
1504        wrangle_software.
1505        """
1506        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1507        pkgs.update([x.location for e in top.elements for x in e.software])
1508
1509        for pkg in pkgs:
1510            loc = pkg
1511
1512            scheme, host, path = urlparse(loc)[0:3]
1513            dest = os.path.basename(path)
1514            if not scheme:
1515                if not loc.startswith('/'):
1516                    loc = "/%s" % loc
1517                loc = "file://%s" %loc
1518            # NB: if scheme was found, loc == pkg
1519            try:
1520                u = urlopen(loc)
1521                u.close()
1522            except Exception, e:
1523                raise service_error(service_error.req, 
1524                        "Cannot open %s: %s" % (loc, e))
1525        return True
1526
[895a133]1527    def wrangle_software(self, expid, top, topo, tbparams):
1528        """
1529        Copy software out to the repository directory, allocate permissions and
1530        rewrite the segment topologies to look for the software in local
1531        places.
1532        """
1533
1534        # Copy the rpms and tarfiles to a distribution directory from
1535        # which the federants can retrieve them
1536        linkpath = "%s/software" %  expid
1537        softdir ="%s/%s" % ( self.repodir, linkpath)
1538        softmap = { }
[2627eb3]1539
1540        # self.fedkit and self.gateway kit are lists of tuples of
1541        # (install_location, download_location) this extracts the download
1542        # locations.
1543        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1544        pkgs.update([x.location for e in top.elements for x in e.software])
[895a133]1545        try:
1546            os.makedirs(softdir)
[d3c8759]1547        except EnvironmentError, e:
[895a133]1548            raise service_error(
1549                    "Cannot create software directory: %s" % e)
1550        # The actual copying.  Everything's converted into a url for copying.
[913dc7a]1551        auth_attrs = set()
[895a133]1552        for pkg in pkgs:
1553            loc = pkg
1554
1555            scheme, host, path = urlparse(loc)[0:3]
1556            dest = os.path.basename(path)
1557            if not scheme:
1558                if not loc.startswith('/'):
1559                    loc = "/%s" % loc
1560                loc = "file://%s" %loc
[2627eb3]1561            # NB: if scheme was found, loc == pkg
[895a133]1562            try:
1563                u = urlopen(loc)
1564            except Exception, e:
1565                raise service_error(service_error.req, 
1566                        "Cannot open %s: %s" % (loc, e))
1567            try:
1568                f = open("%s/%s" % (softdir, dest) , "w")
1569                self.log.debug("Writing %s/%s" % (softdir,dest) )
1570                data = u.read(4096)
1571                while data:
1572                    f.write(data)
1573                    data = u.read(4096)
1574                f.close()
1575                u.close()
1576            except Exception, e:
1577                raise service_error(service_error.internal,
1578                        "Could not copy %s: %s" % (loc, e))
1579            path = re.sub("/tmp", "", linkpath)
1580            # XXX
1581            softmap[pkg] = \
[7183b48]1582                    "%s/%s/%s" %\
1583                    ( self.repo_url, path, dest)
[895a133]1584
[913dc7a]1585            # Allow the individual segments to access the software by assigning
1586            # an attribute to each testbed allocation that encodes the data to
1587            # be released.  This expression collects the data for each run of
1588            # the loop.
1589            auth_attrs.update([
[9294673]1590                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
[913dc7a]1591                        for tb in tbparams.keys()])
1592
1593        self.append_experiment_authorization(expid, auth_attrs)
[895a133]1594
1595        # Convert the software locations in the segments into the local
1596        # copies on this host
1597        for soft in [ s for tb in topo.values() \
1598                for e in tb.elements \
1599                    if getattr(e, 'software', False) \
1600                        for s in e.software ]:
1601            if softmap.has_key(soft.location):
1602                soft.location = softmap[soft.location]
1603
1604
[a3ad8bd]1605    def new_experiment(self, req, fid):
1606        """
1607        The external interface to empty initial experiment creation called from
1608        the dispatcher.
1609
1610        Creates a working directory, splits the incoming description using the
1611        splitter script and parses out the avrious subsections using the
1612        lcasses above.  Once each sub-experiment is created, use pooled threads
1613        to instantiate them and start it all up.
1614        """
[2bb8b35]1615        self.log.info("New experiment call started for %s" % fid)
[7206e5a]1616        req = req.get('NewRequestBody', None)
1617        if not req:
1618            raise service_error(service_error.req,
1619                    "Bad request format (no NewRequestBody)")
1620
1621        if self.auth.import_credentials(data_list=req.get('credential', [])):
1622            self.auth.save()
[c573278]1623
[8cab4c2]1624        try:
1625            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1626                    with_proof=True)
1627        except service_error, e:
1628            self.log.info("New experiment call for %s: access denied" % fid)
1629            raise e
1630
[e83f2f2]1631
1632        if not access_ok:
[2bb8b35]1633            self.log.info("New experiment call for %s: Access denied" % fid)
[e83f2f2]1634            raise service_error(service_error.access, "New access denied",
1635                    proof=[proof])
[a3ad8bd]1636
1637        try:
1638            tmpdir = tempfile.mkdtemp(prefix="split-")
[d3c8759]1639        except EnvironmentError:
[a3ad8bd]1640            raise service_error(service_error.internal, "Cannot create tmp dir")
1641
1642        # Generate an ID for the experiment (slice) and a certificate that the
1643        # allocator can use to prove they own it.  We'll ship it back through
[7206e5a]1644        # the encrypted connection.  If the requester supplied one, use it.
1645        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1646            expcert = req['experimentAccess']['X509']
[962ea25]1647            expid = fedid(certstr=expcert)
[7206e5a]1648            self.state_lock.acquire()
1649            if expid in self.state:
1650                self.state_lock.release()
1651                raise service_error(service_error.req, 
1652                        'fedid %s identifies an existing experiment' % expid)
1653            self.state_lock.release()
1654        else:
1655            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
[a3ad8bd]1656
1657        #now we're done with the tmpdir, and it should be empty
1658        if self.cleanup:
1659            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1660            os.rmdir(tmpdir)
1661        else:
1662            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1663
1664        eid = self.create_experiment_state(fid, req, expid, expcert, 
1665                state='empty')
1666
1667        rv = {
1668                'experimentID': [
1669                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1670                ],
1671                'experimentStatus': 'empty',
[e83f2f2]1672                'experimentAccess': { 'X509' : expcert },
1673                'proof': proof.to_dict(),
[a3ad8bd]1674            }
1675
[2bb8b35]1676        self.log.info("New experiment call succeeded for %s" % fid)
[a3ad8bd]1677        return rv
1678
[cf0ff4f]1679    # create_experiment sub-functions
1680
[5ecb9a3]1681    @staticmethod
[cf0ff4f]1682    def get_experiment_key(req, field='experimentID'):
[5ecb9a3]1683        """
1684        Parse the experiment identifiers out of the request (the request body
1685        tag has been removed).  Specifically this pulls either the fedid or the
1686        localname out of the experimentID field.  A fedid is preferred.  If
1687        neither is present or the request does not contain the fields,
1688        service_errors are raised.
1689        """
1690        # Get the experiment access
[cf0ff4f]1691        exp = req.get(field, None)
[5ecb9a3]1692        if exp:
1693            if exp.has_key('fedid'):
1694                key = exp['fedid']
1695            elif exp.has_key('localname'):
1696                key = exp['localname']
1697            else:
1698                raise service_error(service_error.req, "Unknown lookup type")
1699        else:
1700            raise service_error(service_error.req, "No request?")
1701
1702        return key
1703
1704    def get_experiment_ids_and_start(self, key, tmpdir):
1705        """
1706        Get the experiment name, id and access certificate from the state, and
1707        set the experiment state to 'starting'.  returns a triple (fedid,
1708        localname, access_cert_file). The access_cert_file is a copy of the
1709        contents of the access certificate, created in the tempdir with
1710        restricted permissions.  If things are confused, raise an exception.
1711        """
1712
1713        expid = eid = None
1714        self.state_lock.acquire()
[29d5f7c]1715        if key in self.state:
1716            exp = self.state[key]
1717            exp.status = "starting"
[6e33086]1718            exp.updated()
[29d5f7c]1719            expid = exp.fedid
1720            eid = exp.localname
1721            expcert = exp.identity
[5ecb9a3]1722        self.state_lock.release()
1723
1724        # make a protected copy of the access certificate so the experiment
1725        # controller can act as the experiment principal.
1726        if expcert:
1727            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1728            if not expcert_file:
1729                raise service_error(service_error.internal, 
1730                        "Cannot create temp cert file?")
1731        else:
1732            expcert_file = None
1733
1734        return (eid, expid, expcert_file)
1735
1736    def get_topology(self, req, tmpdir):
1737        """
1738        Get the ns2 content and put it into a file for parsing.  Call the local
1739        or remote parser and return the topdl.Topology.  Errors result in
1740        exceptions.  req is the request and tmpdir is a work directory.
1741        """
1742
1743        # The tcl parser needs to read a file so put the content into that file
1744        descr=req.get('experimentdescription', None)
1745        if descr:
[a7c0bcb]1746            if 'ns2description' in descr:
1747                file_content=descr['ns2description']
1748            elif 'topdldescription' in descr:
1749                return topdl.Topology(**descr['topdldescription'])
1750            else:
1751                raise service_error(service_error.req, 
1752                        'Unknown experiment description type')
[5ecb9a3]1753        else:
1754            raise service_error(service_error.req, "No experiment description")
1755
1756
1757        if self.splitter_url:
1758            self.log.debug("Calling remote topdl translator at %s" % \
1759                    self.splitter_url)
1760            top = self.remote_ns2topdl(self.splitter_url, file_content)
1761        else:
1762            tclfile = os.path.join(tmpdir, "experiment.tcl")
1763            if file_content:
1764                try:
1765                    f = open(tclfile, 'w')
1766                    f.write(file_content)
1767                    f.close()
1768                except EnvironmentError:
1769                    raise service_error(service_error.internal,
1770                            "Cannot write temp experiment description")
1771            else:
1772                raise service_error(service_error.req, 
1773                        "Only ns2descriptions supported")
1774            pid = "dummy"
1775            gid = "dummy"
1776            eid = "dummy"
1777
1778            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1779                str(self.muxmax), '-m', 'dummy']
1780
1781            tclcmd.extend([pid, gid, eid, tclfile])
1782
1783            self.log.debug("running local splitter %s", " ".join(tclcmd))
1784            # This is just fantastic.  As a side effect the parser copies
1785            # tb_compat.tcl into the current directory, so that directory
1786            # must be writable by the fedd user.  Doing this in the
1787            # temporary subdir ensures this is the case.
1788            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1789                    cwd=tmpdir)
1790            split_data = tclparser.stdout
1791
1792            top = topdl.topology_from_xml(file=split_data, top="experiment")
1793            os.remove(tclfile)
1794
1795        return top
1796
[1660f7c]1797    def get_testbed_services(self, req, testbeds):
[5ecb9a3]1798        """
[57facae]1799        Parse the services section of the request into two dicts mapping
1800        testbed to lists of federated_service objects.  The first dict maps all
1801        exporters of services to those service objects, the second maps
1802        testbeds to service objects only for services requiring portals.
[5ecb9a3]1803        """
[57facae]1804        # We construct both dicts here because deriving the second is more
1805        # comples than it looks - both the keys and lists can differ, and it's
1806        # much easier to generate both in one pass.
[5ecb9a3]1807        masters = { }
[57facae]1808        pmasters = { }
[5ecb9a3]1809        for s in req.get('service', []):
1810            # If this is a service request with the importall field
1811            # set, fill it out.
1812
1813            if s.get('importall', False):
1814                s['import'] = [ tb for tb in testbeds \
1815                        if tb not in s.get('export',[])]
1816                del s['importall']
1817
1818            # Add the service to masters
1819            for tb in s.get('export', []):
1820                if s.get('name', None):
1821
1822                    params = { }
1823                    for a in s.get('fedAttr', []):
1824                        params[a.get('attribute', '')] = a.get('value','')
1825
1826                    fser = federated_service(name=s['name'],
1827                            exporter=tb, importers=s.get('import',[]),
1828                            params=params)
1829                    if fser.name == 'hide_hosts' \
1830                            and 'hosts' not in fser.params:
1831                        fser.params['hosts'] = \
1832                                ",".join(tb_hosts.get(fser.exporter, []))
1833                    if tb in masters: masters[tb].append(fser)
1834                    else: masters[tb] = [fser]
[57facae]1835
1836                    if fser.portal:
1837                        if tb in pmasters: pmasters[tb].append(fser)
1838                        else: pmasters[tb] = [fser]
[5ecb9a3]1839                else:
1840                    self.log.error('Testbed service does not have name " + \
1841                            "and importers')
[57facae]1842        return masters, pmasters
[5ecb9a3]1843
[cf0ff4f]1844    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1845        """
[3df9b33]1846        Create the ssh keys necessary for interconnecting the portal nodes and
[cf0ff4f]1847        the global hosts file for letting each segment know about the IP
1848        addresses in play.  Save these into the repo.  Add attributes to the
1849        autorizer allowing access controllers to download them and return a set
[3df9b33]1850        of attributes that inform the segments where to find this stuff.  May
[cf0ff4f]1851        raise service_errors in if there are problems.
1852        """
1853        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1854        gw_secretkey_base = "fed.%s" % self.ssh_type
[3df9b33]1855        keydir = os.path.join(tmpdir, 'keys')
1856        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1857        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
[cf0ff4f]1858
1859        try:
1860            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1861        except ValueError:
1862            raise service_error(service_error.server_config, 
1863                    "Bad key type (%s)" % self.ssh_type)
1864
[3df9b33]1865        self.generate_seer_certs(keydir)
[cf0ff4f]1866
1867        # Copy configuration files into the remote file store
1868        # The config urlpath
1869        configpath = "/%s/config" % expid
1870        # The config file system location
1871        configdir ="%s%s" % ( self.repodir, configpath)
1872        try:
1873            os.makedirs(configdir)
1874        except EnvironmentError, e:
1875            raise service_error(service_error.internal,
1876                    "Cannot create config directory: %s" % e)
1877        try:
1878            f = open("%s/hosts" % configdir, "w")
1879            print >> f, string.join(hosts, '\n')
1880            f.close()
1881        except EnvironmentError, e:
1882            raise service_error(service_error.internal, 
1883                    "Cannot write hosts file: %s" % e)
1884        try:
[3df9b33]1885            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1886            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1887            copy_file(os.path.join(keydir, 'ca.pem'), 
1888                    os.path.join(configdir, 'ca.pem'))
1889            copy_file(os.path.join(keydir, 'node.pem'), 
1890                    os.path.join(configdir, 'node.pem'))
[cf0ff4f]1891        except EnvironmentError, e:
1892            raise service_error(service_error.internal, 
1893                    "Cannot copy keyfiles: %s" % e)
1894
[913dc7a]1895        # Allow the individual testbeds to access the configuration files,
1896        # again by setting an attribute for the relevant pathnames on each
1897        # allocation principal.  Yeah, that's a long list comprehension.
1898        self.append_experiment_authorization(expid, set([
[9294673]1899            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
[913dc7a]1900                    for tb in tbparams.keys() \
[3df9b33]1901                        for f in ("hosts", 'ca.pem', 'node.pem', 
1902                            gw_secretkey_base, gw_pubkey_base)]))
[cf0ff4f]1903
1904        attrs = [ 
1905                {
1906                    'attribute': 'ssh_pubkey', 
1907                    'value': '%s/%s/config/%s' % \
1908                            (self.repo_url, expid, gw_pubkey_base)
1909                },
1910                {
1911                    'attribute': 'ssh_secretkey', 
1912                    'value': '%s/%s/config/%s' % \
1913                            (self.repo_url, expid, gw_secretkey_base)
1914                },
1915                {
1916                    'attribute': 'hosts', 
1917                    'value': '%s/%s/config/hosts' % \
1918                            (self.repo_url, expid)
1919                },
[3df9b33]1920                {
1921                    'attribute': 'seer_ca_pem', 
1922                    'value': '%s/%s/config/%s' % \
1923                            (self.repo_url, expid, 'ca.pem')
1924                },
1925                {
1926                    'attribute': 'seer_node_pem', 
1927                    'value': '%s/%s/config/%s' % \
1928                            (self.repo_url, expid, 'node.pem')
1929                },
[cf0ff4f]1930            ]
1931        return attrs
1932
1933
1934    def get_vtopo(self, req, fid):
1935        """
1936        Return the stored virtual topology for this experiment
1937        """
1938        rv = None
1939        state = None
[2bb8b35]1940        self.log.info("vtopo call started for %s" %  fid)
[cf0ff4f]1941
1942        req = req.get('VtopoRequestBody', None)
1943        if not req:
1944            raise service_error(service_error.req,
1945                    "Bad request format (no VtopoRequestBody)")
1946        exp = req.get('experiment', None)
1947        if exp:
1948            if exp.has_key('fedid'):
1949                key = exp['fedid']
1950                keytype = "fedid"
1951            elif exp.has_key('localname'):
1952                key = exp['localname']
1953                keytype = "localname"
1954            else:
1955                raise service_error(service_error.req, "Unknown lookup type")
1956        else:
1957            raise service_error(service_error.req, "No request?")
1958
[8cab4c2]1959        try:
1960            proof = self.check_experiment_access(fid, key)
1961        except service_error, e:
1962            self.log.info("vtopo call failed for %s: access denied" %  fid)
1963            raise e
[cf0ff4f]1964
1965        self.state_lock.acquire()
[29d5f7c]1966        # XXX: this needs to be recalculated
[80b1e82]1967        if key in self.state:
1968            if self.state[key].top is not None:
1969                vtopo = topdl.topology_to_vtopo(self.state[key].top)
[e83f2f2]1970                rv = { 'experiment' : {keytype: key },
[80b1e82]1971                        'vtopo': vtopo,
[e83f2f2]1972                        'proof': proof.to_dict(), 
[cf0ff4f]1973                    }
1974            else:
[80b1e82]1975                state = self.state[key].status
[cf0ff4f]1976        self.state_lock.release()
1977
[2bb8b35]1978        if rv: 
1979            self.log.info("vtopo call completed for %s %s " % \
1980                (key, fid))
1981            return rv
[cf0ff4f]1982        else: 
1983            if state:
[2bb8b35]1984                self.log.info("vtopo call completed for %s %s (Not ready)" % \
1985                    (key, fid))
[cf0ff4f]1986                raise service_error(service_error.partial, 
1987                        "Not ready: %s" % state)
1988            else:
[2bb8b35]1989                self.log.info("vtopo call completed for %s %s (No experiment)"\
1990                        % (key, fid))
[cf0ff4f]1991                raise service_error(service_error.req, "No such experiment")
1992
1993    def get_vis(self, req, fid):
1994        """
1995        Return the stored visualization for this experiment
1996        """
1997        rv = None
1998        state = None
1999
[2bb8b35]2000        self.log.info("vis call started for %s" %  fid)
[cf0ff4f]2001        req = req.get('VisRequestBody', None)
2002        if not req:
2003            raise service_error(service_error.req,
2004                    "Bad request format (no VisRequestBody)")
2005        exp = req.get('experiment', None)
2006        if exp:
2007            if exp.has_key('fedid'):
2008                key = exp['fedid']
2009                keytype = "fedid"
2010            elif exp.has_key('localname'):
2011                key = exp['localname']
2012                keytype = "localname"
2013            else:
2014                raise service_error(service_error.req, "Unknown lookup type")
2015        else:
2016            raise service_error(service_error.req, "No request?")
2017
[8cab4c2]2018        try:
2019            proof = self.check_experiment_access(fid, key)
2020        except service_error, e:
2021            self.log.info("vis call failed for %s: access denied" %  fid)
2022            raise e
[cf0ff4f]2023
2024        self.state_lock.acquire()
[80b1e82]2025        # Generate the visualization
2026        if key in self.state:
2027            if self.state[key].top is not None:
2028                try:
2029                    vis = self.genviz(
[6a50b78]2030                            topdl.topology_to_vtopo(self.state[key].top))
[80b1e82]2031                except service_error, e:
2032                    self.state_lock.release()
2033                    raise e
2034                rv =  { 'experiment' : {keytype: key },
2035                        'vis': vis,
[e83f2f2]2036                        'proof': proof.to_dict(), 
[80b1e82]2037                        }
[cf0ff4f]2038            else:
[80b1e82]2039                state = self.state[key].status
[cf0ff4f]2040        self.state_lock.release()
2041
[2bb8b35]2042        if rv: 
2043            self.log.info("vis call completed for %s %s " % \
2044                (key, fid))
2045            return rv
[cf0ff4f]2046        else:
2047            if state:
[2bb8b35]2048                self.log.info("vis call completed for %s %s (not ready)" % \
2049                    (key, fid))
[cf0ff4f]2050                raise service_error(service_error.partial, 
2051                        "Not ready: %s" % state)
2052            else:
[2bb8b35]2053                self.log.info("vis call completed for %s %s (no experiment)" % \
2054                    (key, fid))
[cf0ff4f]2055                raise service_error(service_error.req, "No such experiment")
2056
2057   
[ec3aa4d]2058    def save_federant_information(self, allocated, tbparams, eid, top):
[cf0ff4f]2059        """
2060        Store the various data that have changed in the experiment state
2061        between when it was started and the beginning of resource allocation.
2062        This is basically the information about each local allocation.  This
[e83f2f2]2063        fills in the values of the placeholder allocation in the state.  It
2064        also collects the access proofs and returns them as dicts for a
2065        response message.
[cf0ff4f]2066        """
[29d5f7c]2067        self.state_lock.acquire()
2068        exp = self.state[eid]
[ec3aa4d]2069        exp.top = top.clone()
[cf0ff4f]2070        # save federant information
2071        for k in allocated.keys():
[9294673]2072            exp.add_allocation(tbparams[k])
[ec3aa4d]2073            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
[ab3d6c5]2074                type="testbed", localname=[k], 
2075                service=[ s.to_topdl() for s in tbparams[k].services]))
[cf0ff4f]2076
[e83f2f2]2077        # Access proofs for the response message
2078        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
[9294673]2079                    for p in tbparams[k].proof]
[6e33086]2080        exp.updated()
[cf0ff4f]2081        if self.state_filename: 
2082            self.write_state()
2083        self.state_lock.release()
[e83f2f2]2084        return proofs
[cf0ff4f]2085
2086    def clear_placeholder(self, eid, expid, tmpdir):
2087        """
2088        Clear the placeholder and remove any allocated temporary dir.
2089        """
2090
2091        self.state_lock.acquire()
2092        del self.state[eid]
2093        del self.state[expid]
2094        if self.state_filename: self.write_state()
2095        self.state_lock.release()
2096        if tmpdir and self.cleanup:
2097            self.remove_dirs(tmpdir)
2098
2099    # end of create_experiment sub-functions
[5ecb9a3]2100
[e19b75c]2101    def create_experiment(self, req, fid):
[db6b092]2102        """
2103        The external interface to experiment creation called from the
2104        dispatcher.
2105
2106        Creates a working directory, splits the incoming description using the
[43197eb]2107        splitter script and parses out the various subsections using the
[1a4ee0f]2108        classes above.  Once each sub-experiment is created, use pooled threads
2109        to instantiate them and start it all up.
[db6b092]2110        """
[7183b48]2111
[2bb8b35]2112        self.log.info("Create experiment call started for %s" % fid)
[7183b48]2113        req = req.get('CreateRequestBody', None)
[5ecb9a3]2114        if req:
[cf0ff4f]2115            key = self.get_experiment_key(req)
[5ecb9a3]2116        else:
[7183b48]2117            raise service_error(service_error.req,
2118                    "Bad request format (no CreateRequestBody)")
2119
[6e63513]2120        # Import information from the requester
2121        if self.auth.import_credentials(data_list=req.get('credential', [])):
2122            self.auth.save()
[cde9b98]2123        else:
2124            self.log.debug("Failed to import delegation credentials(!)")
[6e63513]2125
[8cab4c2]2126        try:
2127            # Make sure that the caller can talk to us
2128            proof = self.check_experiment_access(fid, key)
2129        except service_error, e:
2130            self.log.info("Create experiment call failed for %s: access denied"\
2131                    % fid)
2132            raise e
2133
[db6b092]2134
[fd07c48]2135        # Install the testbed map entries supplied with the request into a copy
2136        # of the testbed map.
2137        tbmap = dict(self.tbmap)
[a11eda5]2138        tbactive = set(self.tbactive)
[fd07c48]2139        for m in req.get('testbedmap', []):
2140            if 'testbed' in m and 'uri' in m:
2141                tbmap[m['testbed']] = m['uri']
[a11eda5]2142                if 'active' in m and m['active']: tbactive.add(m['testbed'])
[fd07c48]2143
[5ecb9a3]2144        # a place to work
[db6b092]2145        try:
2146            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2147            os.mkdir(tmpdir+"/keys")
[d3c8759]2148        except EnvironmentError:
[db6b092]2149            raise service_error(service_error.internal, "Cannot create tmp dir")
2150
2151        tbparams = { }
2152
[5ecb9a3]2153        eid, expid, expcert_file = \
2154                self.get_experiment_ids_and_start(key, tmpdir)
[c573278]2155
[5ecb9a3]2156        # This catches exceptions to clear the placeholder if necessary
[db6b092]2157        try: 
[5ecb9a3]2158            if not (eid and expid):
2159                raise service_error(service_error.internal, 
2160                        "Cannot find local experiment info!?")
[5f6929a]2161
[5ecb9a3]2162            top = self.get_topology(req, tmpdir)
[2627eb3]2163            self.confirm_software(top)
[5ecb9a3]2164            # Assign the IPs
[69692a9]2165            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[1a4ee0f]2166            # Find the testbeds to look up
[5334044]2167            tb_hosts = { }
[5ecb9a3]2168            testbeds = [ ]
2169            for e in top.elements:
2170                if isinstance(e, topdl.Computer):
2171                    tb = e.get_attribute('testbed') or 'default'
2172                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2173                    else: 
2174                        tb_hosts[tb] = [ e.name ]
2175                        testbeds.append(tb)
2176
[57facae]2177            masters, pmasters = self.get_testbed_services(req, testbeds)
[895a133]2178            allocated = { }         # Testbeds we can access
2179            topo ={ }               # Sub topologies
[e02cd14]2180            connInfo = { }          # Connection information
[5334044]2181
[2627eb3]2182            self.split_topology(top, topo, testbeds)
2183
[5ecb9a3]2184            self.get_access_to_testbeds(testbeds, fid, allocated, 
2185                    tbparams, masters, tbmap, expid, expcert_file)
[5f96438]2186
[cf0ff4f]2187            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
[cc8d8e9]2188
[fd07c48]2189            part = experiment_partition(self.auth, self.store_url, tbmap,
[a11eda5]2190                    self.muxmax, self.direct_transit, tbactive)
[5334044]2191            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
[2761484]2192                    connInfo, expid)
[913dc7a]2193
2194            auth_attrs = set()
[ab847bc]2195            # Now get access to the dynamic testbeds (those added above)
2196            for tb in [ t for t in topo if t not in allocated]:
[cf0ff4f]2197                self.get_access(tb, tbparams, fid, masters, tbmap, 
2198                        expid, expcert_file)
[ab847bc]2199                allocated[tb] = 1
2200                store_keys = topo[tb].get_attribute('store_keys')
2201                # Give the testbed access to keys it exports or imports
2202                if store_keys:
[b16cfc0]2203                    auth_attrs.update(set([
[9294673]2204                        (tbparams[tb].allocID, sk) \
[913dc7a]2205                                for sk in store_keys.split(" ")]))
2206
2207            if auth_attrs:
2208                self.append_experiment_authorization(expid, auth_attrs)
[69692a9]2209
[cf0ff4f]2210            # transit and disconnected testbeds may not have a connInfo entry.
2211            # Fill in the blanks.
2212            for t in allocated.keys():
2213                if not connInfo.has_key(t):
2214                    connInfo[t] = { }
2215
[895a133]2216            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2217
[e83f2f2]2218            proofs = self.save_federant_information(allocated, tbparams, 
[ec3aa4d]2219                    eid, top)
[866c983]2220        except service_error, e:
2221            # If something goes wrong in the parse (usually an access error)
2222            # clear the placeholder state.  From here on out the code delays
[db6b092]2223            # exceptions.  Failing at this point returns a fault to the remote
2224            # caller.
[2bb8b35]2225
2226            self.log.info("Create experiment call failed for %s %s: %s" % 
2227                    (eid, fid, e))
[cf0ff4f]2228            self.clear_placeholder(eid, expid, tmpdir)
[866c983]2229            raise e
2230
[db6b092]2231        # Start the background swapper and return the starting state.  From
2232        # here on out, the state will stick around a while.
[866c983]2233
[db6b092]2234        # Create a logger that logs to the experiment's state object as well as
2235        # to the main log file.
2236        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[29d5f7c]2237        alloc_collector = self.list_log(self.state[eid].log)
[f07fa49]2238        h = logging.StreamHandler(alloc_collector)
[db6b092]2239        # XXX: there should be a global one of these rather than repeating the
2240        # code.
2241        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2242                    '%d %b %y %H:%M:%S'))
2243        alloc_log.addHandler(h)
[617592b]2244
[db6b092]2245        # Start a thread to do the resource allocation
[e19b75c]2246        t  = Thread(target=self.allocate_resources,
[43197eb]2247                args=(allocated, masters, eid, expid, tbparams, 
[b4b19c7]2248                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
[725c55d]2249                    connInfo, tbmap, expcert_file),
[db6b092]2250                name=eid)
2251        t.start()
2252
2253        rv = {
2254                'experimentID': [
2255                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2256                ],
2257                'experimentStatus': 'starting',
[e83f2f2]2258                'proof': [ proof.to_dict() ] + proofs,
[db6b092]2259            }
[2bb8b35]2260        self.log.info("Create experiment call succeeded for %s %s" % \
2261                (eid, fid))
[db6b092]2262
2263        return rv
[9479343]2264   
2265    def get_experiment_fedid(self, key):
2266        """
[db6b092]2267        find the fedid associated with the localname key in the state database.
[9479343]2268        """
2269
[db6b092]2270        rv = None
2271        self.state_lock.acquire()
[29d5f7c]2272        if key in self.state:
2273            rv = self.state[key].fedid
[db6b092]2274        self.state_lock.release()
2275        return rv
[a97394b]2276
[4064742]2277    def check_experiment_access(self, fid, key):
[866c983]2278        """
2279        Confirm that the fid has access to the experiment.  Though a request
2280        may be made in terms of a local name, the access attribute is always
2281        the experiment's fedid.
2282        """
2283        if not isinstance(key, fedid):
[db6b092]2284            key = self.get_experiment_fedid(key)
[866c983]2285
[e83f2f2]2286        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2287
2288        if access_ok:
2289            return proof
[866c983]2290        else:
[e83f2f2]2291            raise service_error(service_error.access, "Access Denied",
2292                proof)
[4064742]2293
2294
[db6b092]2295    def get_handler(self, path, fid):
[cf0ff4f]2296        """
2297        Perhaps surprisingly named, this function handles HTTP GET requests to
2298        this server (SOAP requests are POSTs).
2299        """
[7183b48]2300        self.log.info("Get handler %s %s" % (path, fid))
[e83f2f2]2301        # XXX: log proofs?
[6c57fe9]2302        if self.auth.check_attribute(fid, path):
2303            return ("%s/%s" % (self.repodir, path), "application/binary")
2304        else:
2305            return (None, None)
[987aaa1]2306
[6e33086]2307    def update_info(self, key, force=False):
2308        top = None
2309        self.state_lock.acquire()
2310        if key in self.state:
2311            if force or self.state[key].older_than(self.info_cache_limit):
[6a50b78]2312                top = self.state[key].top
[6e33086]2313                if top is not None: top = top.clone()
2314                d1, info_params, cert, d2 = \
2315                        self.get_segment_info(self.state[key], need_lock=False)
2316        self.state_lock.release()
2317
2318        if top is None: return
2319
2320        try:
2321            tmpdir = tempfile.mkdtemp(prefix="info-")
2322        except EnvironmentError:
2323            raise service_error(service_error.internal, 
2324                    "Cannot create tmp dir")
2325        cert_file = self.make_temp_certfile(cert, tmpdir)
2326
2327        data = []
2328        try:
2329            for k, (uri, aid) in info_params.items():
2330                info=self.info_segment(log=self.log, testbed=uri,
2331                            cert_file=cert_file, cert_pwd=None,
2332                            trusted_certs=self.trusted_certs,
2333                            caller=self.call_InfoSegment)
2334                info(uri, aid)
2335                data.append(info)
2336        # Clean up the tmpdir no matter what
2337        finally:
2338            if tmpdir: self.remove_dirs(tmpdir)
2339
2340        self.annotate_topology(top, data)
2341        self.state_lock.acquire()
2342        if key in self.state:
2343            self.state[key].top = top
2344            self.state[key].updated()
2345            if self.state_filename: self.write_state()
2346        self.state_lock.release()
2347
[29d5f7c]2348   
[c52c48d]2349    def get_info(self, req, fid):
[866c983]2350        """
2351        Return all the stored info about this experiment
2352        """
2353        rv = None
2354
[2bb8b35]2355        self.log.info("Info call started for %s" %  fid)
[866c983]2356        req = req.get('InfoRequestBody', None)
2357        if not req:
2358            raise service_error(service_error.req,
[65f3f29]2359                    "Bad request format (no InfoRequestBody)")
[866c983]2360        exp = req.get('experiment', None)
[80b1e82]2361        legacy = req.get('legacy', False)
[6e33086]2362        fresh = req.get('fresh', False)
[866c983]2363        if exp:
2364            if exp.has_key('fedid'):
2365                key = exp['fedid']
2366                keytype = "fedid"
2367            elif exp.has_key('localname'):
2368                key = exp['localname']
2369                keytype = "localname"
2370            else:
2371                raise service_error(service_error.req, "Unknown lookup type")
2372        else:
2373            raise service_error(service_error.req, "No request?")
2374
[8cab4c2]2375        try:
2376            proof = self.check_experiment_access(fid, key)
2377        except service_error, e:
2378            self.log.info("Info call failed for %s: access denied" %  fid)
2379
[866c983]2380
[6e33086]2381        self.update_info(key, fresh)
2382
[866c983]2383        self.state_lock.acquire()
2384        if self.state.has_key(key):
[29d5f7c]2385            rv = self.state[key].get_info()
[6e33086]2386            # Copy the topo if we need legacy annotations
[80b1e82]2387            if legacy:
[6a50b78]2388                top = self.state[key].top
[80b1e82]2389                if top is not None: top = top.clone()
[866c983]2390        self.state_lock.release()
[2bb8b35]2391        self.log.info("Gathered Info for %s %s" % (key, fid))
[866c983]2392
[80b1e82]2393        # If the legacy visualization and topology representations are
[6e33086]2394        # requested, calculate them and add them to the return.
[80b1e82]2395        if legacy and rv is not None:
[2bb8b35]2396            self.log.info("Generating legacy Info for %s %s" % (key, fid))
[80b1e82]2397            if top is not None:
2398                vtopo = topdl.topology_to_vtopo(top)
2399                if vtopo is not None:
2400                    rv['vtopo'] = vtopo
2401                    try:
2402                        vis = self.genviz(vtopo)
2403                    except service_error, e:
2404                        self.log.debug('Problem generating visualization: %s' \
2405                                % e)
2406                        vis = None
2407                    if vis is not None:
2408                        rv['vis'] = vis
[db6b092]2409        if rv:
[2bb8b35]2410            self.log.info("Info succeded for %s %s" % (key, fid))
[29d5f7c]2411            rv['proof'] = proof.to_dict()
2412            return rv
[2bb8b35]2413        else: 
2414            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
[db6b092]2415            raise service_error(service_error.req, "No such experiment")
[7a8d667]2416
[22a1a77]2417    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2418            results):
2419        """
2420        Call OperateSegment on multiple testbeds and gather the results.
2421        op_params contains the parameters needed to contact that testbed, cert
2422        is a certificate containing the fedid to use, op is the operation,
2423        testbeds is a dict mapping testbed name to targets in that testbed,
2424        params are the parameters to include a,d results is a growing list of
2425        the results of the calls.
2426        """
2427        try:
2428            tmpdir = tempfile.mkdtemp(prefix="info-")
2429        except EnvironmentError:
2430            raise service_error(service_error.internal, 
2431                    "Cannot create tmp dir")
2432        cert_file = self.make_temp_certfile(cert, tmpdir)
2433
2434        try:
2435            for tb, targets in testbeds.items():
2436                if tb in op_params:
2437                    uri, aid = op_params[tb]
2438                    operate=self.operation_segment(log=self.log, testbed=uri,
2439                                cert_file=cert_file, cert_pwd=None,
2440                                trusted_certs=self.trusted_certs,
2441                                caller=self.call_OperationSegment)
2442                    if operate(uri, aid, op, targets, params):
2443                        if operate.status is not None:
2444                            results.extend(operate.status)
2445                            continue
2446                # Something went wrong in a weird way.  Add statuses
2447                # that reflect that to results
2448                for t in targets:
2449                    results.append(operation_status(t, 
2450                        operation_status.federant,
[b709861]2451                        'Unexpected error on %s' % tb))
[22a1a77]2452        # Clean up the tmpdir no matter what
2453        finally:
2454            if tmpdir: self.remove_dirs(tmpdir)
2455
2456    def do_operation(self, req, fid):
2457        """
2458        Find the testbeds holding each target and ask them to carry out the
2459        operation.  Return the statuses.
2460        """
2461        # Map an element to the testbed containing it
2462        def element_to_tb(e):
2463            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2464            elif isinstance(e, topdl.Testbed): return e.name
2465            else: return None
2466        # If d is an operation_status object, make it a dict
2467        def make_dict(d):
2468            if isinstance(d, dict): return d
2469            elif isinstance(d, operation_status): return d.to_dict()
2470            else: return { }
2471
[b709861]2472        def element_name(e):
2473            if isinstance(e, topdl.Computer): return e.name
2474            elif isinstance(e, topdl.Testbed): 
2475                if e.localname: return e.localname[0]
2476                else: return None
2477            else: return None
2478
[8cab4c2]2479        self.log.info("Operation call started for %s" %  fid)
[22a1a77]2480        req = req.get('OperationRequestBody', None)
2481        if not req:
2482            raise service_error(service_error.req,
2483                    "Bad request format (no OperationRequestBody)")
2484        exp = req.get('experiment', None)
2485        op = req.get('operation', None)
[b709861]2486        targets = set(req.get('target', []))
[22a1a77]2487        params = req.get('parameter', None)
2488
2489        if exp:
2490            if 'fedid' in exp:
2491                key = exp['fedid']
2492                keytype = "fedid"
2493            elif 'localname' in exp:
2494                key = exp['localname']
2495                keytype = "localname"
2496            else:
2497                raise service_error(service_error.req, "Unknown lookup type")
2498        else:
2499            raise service_error(service_error.req, "No request?")
2500
[b709861]2501        if op is None or not targets:
[22a1a77]2502            raise service_error(service_error.req, "No request?")
2503
[8cab4c2]2504        try:
2505            proof = self.check_experiment_access(fid, key)
2506        except service_error, e:
2507            self.log.info("Operation call failed for %s: access denied" %  fid)
2508            raise e
2509
[22a1a77]2510        self.state_lock.acquire()
2511        if key in self.state:
2512            d1, op_params, cert, d2 = \
[b709861]2513                    self.get_segment_info(self.state[key], need_lock=False,
2514                            key='tb')
[22a1a77]2515            top = self.state[key].top
2516            if top is not None:
2517                top = top.clone()
2518        self.state_lock.release()
2519
2520        if top is None:
[8cab4c2]2521            self.log.info("Operation call failed for %s: not active" %  fid)
[22a1a77]2522            raise service_error(service_error.partial, "No topology yet", 
2523                    proof=proof)
2524
2525        testbeds = { }
2526        results = []
2527        for e in top.elements:
[b709861]2528            ename = element_name(e)
2529            if ename in targets:
[22a1a77]2530                tb = element_to_tb(e)
[b709861]2531                targets.remove(ename)
[22a1a77]2532                if tb is not None:
[b709861]2533                    if tb in testbeds: testbeds[tb].append(ename)
2534                    else: testbeds[tb] = [ ename ]
[22a1a77]2535                else:
2536                    results.append(operation_status(e.name, 
2537                        code=operation_status.no_target, 
2538                        description='Cannot map target to testbed'))
2539
[b709861]2540        for t in targets:
2541            results.append(operation_status(t, operation_status.no_target))
2542
[22a1a77]2543        self.operate_on_segments(op_params, cert, op, testbeds, params,
2544                results)
2545
[8cab4c2]2546        self.log.info("Operation call succeeded for %s" %  fid)
[22a1a77]2547        return { 
2548                'experiment': exp, 
[b709861]2549                'status': [make_dict(r) for r in results],
[22a1a77]2550                'proof': proof.to_dict()
2551                }
2552
2553
[65f3f29]2554    def get_multi_info(self, req, fid):
2555        """
2556        Return all the stored info that this fedid can access
2557        """
[e83f2f2]2558        rv = { 'info': [ ], 'proof': [ ] }
[65f3f29]2559
[2bb8b35]2560        self.log.info("Multi Info call started for %s" %  fid)
[db6b092]2561        self.state_lock.acquire()
2562        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2563            try:
[e83f2f2]2564                proof = self.check_experiment_access(fid, key)
[829246e]2565            except service_error, e:
2566                if e.code == service_error.access:
2567                    continue
2568                else:
[2bb8b35]2569                    self.log.info("Multi Info call failed for %s: %s" %  \
2570                            (e,fid))
[829246e]2571                    self.state_lock.release()
2572                    raise e
[65f3f29]2573
[db6b092]2574            if self.state.has_key(key):
[29d5f7c]2575                e = self.state[key].get_info()
2576                e['proof'] = proof.to_dict()
[db6b092]2577                rv['info'].append(e)
[e83f2f2]2578                rv['proof'].append(proof.to_dict())
[65f3f29]2579        self.state_lock.release()
[2bb8b35]2580        self.log.info("Multi Info call succeeded for %s" %  fid)
[db6b092]2581        return rv
[65f3f29]2582
[cf0ff4f]2583    def check_termination_status(self, fed_exp, force):
[e07c8f3]2584        """
[cf0ff4f]2585        Confirm that the experiment is sin a valid state to stop (or force it)
2586        return the state - invalid states for deletion and force settings cause
2587        exceptions.
[e07c8f3]2588        """
[cf0ff4f]2589        self.state_lock.acquire()
[29d5f7c]2590        status = fed_exp.status
[e07c8f3]2591
[cf0ff4f]2592        if status:
2593            if status in ('starting', 'terminating'):
2594                if not force:
2595                    self.state_lock.release()
2596                    raise service_error(service_error.partial, 
2597                            'Experiment still being created or destroyed')
2598                else:
2599                    self.log.warning('Experiment in %s state ' % status + \
2600                            'being terminated by force.')
2601            self.state_lock.release()
2602            return status
[725c55d]2603        else:
[cf0ff4f]2604            # No status??? trouble
2605            self.state_lock.release()
2606            raise service_error(service_error.internal,
2607                    "Experiment has no status!?")
2608
[b709861]2609    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
[cf0ff4f]2610        ids = []
2611        term_params = { }
[6e33086]2612        if need_lock: self.state_lock.acquire()
[29d5f7c]2613        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2614        expcert = fed_exp.identity
2615        repo = "%s" % fed_exp.fedid
[cf0ff4f]2616
2617        # Collect the allocation/segment ids into a dict keyed by the fedid
[29d5f7c]2618        # of the allocation that contains a tuple of uri, aid
2619        for i, fed in enumerate(fed_exp.get_all_allocations()):
2620            uri = fed.uri
2621            aid = fed.allocID
[b709861]2622            if key == 'aid': term_params[aid] = (uri, aid)
2623            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2624
[6e33086]2625        if need_lock: self.state_lock.release()
2626        return ids, term_params, expcert, repo
2627
2628
2629    def get_termination_info(self, fed_exp):
2630        self.state_lock.acquire()
2631        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
[cf0ff4f]2632        # Change the experiment state
[29d5f7c]2633        fed_exp.status = 'terminating'
[6e33086]2634        fed_exp.updated()
[cf0ff4f]2635        if self.state_filename: self.write_state()
2636        self.state_lock.release()
2637
2638        return ids, term_params, expcert, repo
2639
2640
2641    def deallocate_resources(self, term_params, expcert, status, force, 
2642            dealloc_log):
2643        tmpdir = None
2644        # This try block makes sure the tempdir is cleared
2645        try:
2646            # If no expcert, try the deallocation as the experiment
2647            # controller instance.
2648            if expcert and self.auth_type != 'legacy': 
2649                try:
2650                    tmpdir = tempfile.mkdtemp(prefix="term-")
2651                except EnvironmentError:
2652                    raise service_error(service_error.internal, 
2653                            "Cannot create tmp dir")
2654                cert_file = self.make_temp_certfile(expcert, tmpdir)
2655                pw = None
2656            else: 
2657                cert_file = self.cert_file
2658                pw = self.cert_pwd
2659
2660            # Stop everyone.  NB, wait_for_all waits until a thread starts
2661            # and then completes, so we can't wait if nothing starts.  So,
2662            # no tbparams, no start.
2663            if len(term_params) > 0:
2664                tp = thread_pool(self.nthreads)
2665                for k, (uri, aid) in term_params.items():
2666                    # Create and start a thread to stop the segment
2667                    tp.wait_for_slot()
2668                    t  = pooled_thread(\
2669                            target=self.terminate_segment(log=dealloc_log,
2670                                testbed=uri,
2671                                cert_file=cert_file, 
2672                                cert_pwd=pw,
2673                                trusted_certs=self.trusted_certs,
2674                                caller=self.call_TerminateSegment),
2675                            args=(uri, aid), name=k,
2676                            pdata=tp, trace_file=self.trace_file)
2677                    t.start()
2678                # Wait for completions
2679                tp.wait_for_all_done()
2680
2681            # release the allocations (failed experiments have done this
2682            # already, and starting experiments may be in odd states, so we
2683            # ignore errors releasing those allocations
2684            try: 
2685                for k, (uri, aid)  in term_params.items():
2686                    self.release_access(None, aid, uri=uri,
2687                            cert_file=cert_file, cert_pwd=pw)
2688            except service_error, e:
2689                if status != 'failed' and not force:
2690                    raise e
2691
2692        # Clean up the tmpdir no matter what
2693        finally:
2694            if tmpdir: self.remove_dirs(tmpdir)
2695
[7a8d667]2696    def terminate_experiment(self, req, fid):
[866c983]2697        """
2698        Swap this experiment out on the federants and delete the shared
2699        information
2700        """
[2bb8b35]2701        self.log.info("Terminate experiment call started for %s" % fid)
[866c983]2702        tbparams = { }
2703        req = req.get('TerminateRequestBody', None)
2704        if not req:
2705            raise service_error(service_error.req,
2706                    "Bad request format (no TerminateRequestBody)")
2707
[cf0ff4f]2708        key = self.get_experiment_key(req, 'experiment')
[8cab4c2]2709        try:
2710            proof = self.check_experiment_access(fid, key)
2711        except service_error, e:
2712            self.log.info(
2713                    "Terminate experiment call failed for %s: access denied" \
2714                            % fid)
2715            raise e
[cf0ff4f]2716        exp = req.get('experiment', False)
2717        force = req.get('force', False)
[866c983]2718
[db6b092]2719        dealloc_list = [ ]
[46e4682]2720
2721
[5ae3857]2722        # Create a logger that logs to the dealloc_list as well as to the main
2723        # log file.
2724        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
[a69de97]2725        dealloc_log.info("Terminating %s " %key)
[5ae3857]2726        h = logging.StreamHandler(self.list_log(dealloc_list))
2727        # XXX: there should be a global one of these rather than repeating the
2728        # code.
2729        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2730                    '%d %b %y %H:%M:%S'))
2731        dealloc_log.addHandler(h)
2732
2733        self.state_lock.acquire()
2734        fed_exp = self.state.get(key, None)
[cf0ff4f]2735        self.state_lock.release()
[e07c8f3]2736        repo = None
[5ae3857]2737
2738        if fed_exp:
[cf0ff4f]2739            status = self.check_termination_status(fed_exp, force)
[6e33086]2740            # get_termination_info updates the experiment state
[cf0ff4f]2741            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2742            self.deallocate_resources(term_params, expcert, status, force, 
2743                    dealloc_log)
[5ae3857]2744
2745            # Remove the terminated experiment
2746            self.state_lock.acquire()
2747            for id in ids:
[a96d946]2748                self.clear_experiment_authorization(id, need_state_lock=False)
[cf0ff4f]2749                if id in self.state: del self.state[id]
[5ae3857]2750
2751            if self.state_filename: self.write_state()
2752            self.state_lock.release()
2753
[2761484]2754            # Delete any synch points associated with this experiment.  All
2755            # synch points begin with the fedid of the experiment.
2756            fedid_keys = set(["fedid:%s" % f for f in ids \
2757                    if isinstance(f, fedid)])
2758            for k in self.synch_store.all_keys():
2759                try:
2760                    if len(k) > 45 and k[0:46] in fedid_keys:
2761                        self.synch_store.del_value(k)
[dadc4da]2762                except synch_store.BadDeletionError:
[2761484]2763                    pass
2764            self.write_store()
[e07c8f3]2765
2766            # Remove software and other cached stuff from the filesystem.
2767            if repo:
2768                self.remove_dirs("%s/%s" % (self.repodir, repo))
[e83f2f2]2769       
[2bb8b35]2770            self.log.info("Terminate experiment succeeded for %s %s" % \
2771                    (key, fid))
[5ae3857]2772            return { 
2773                    'experiment': exp , 
[cf0ff4f]2774                    'deallocationLog': string.join(dealloc_list, ''),
[e83f2f2]2775                    'proof': [proof.to_dict()],
[5ae3857]2776                    }
2777        else:
[2bb8b35]2778            self.log.info("Terminate experiment failed for %s %s: no state" % \
2779                    (key, fid))
[5ae3857]2780            raise service_error(service_error.req, "No saved state")
[2761484]2781
2782
2783    def GetValue(self, req, fid):
2784        """
2785        Get a value from the synchronized store
2786        """
2787        req = req.get('GetValueRequestBody', None)
2788        if not req:
2789            raise service_error(service_error.req,
2790                    "Bad request format (no GetValueRequestBody)")
2791       
[cf0ff4f]2792        name = req.get('name', None)
2793        wait = req.get('wait', False)
[2761484]2794        rv = { 'name': name }
2795
[e83f2f2]2796        if not name:
2797            raise service_error(service_error.req, "No name?")
2798
2799        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2800
2801        if access_ok:
[d8442da]2802            self.log.debug("[GetValue] asking for %s " % name)
[dadc4da]2803            try:
2804                v = self.synch_store.get_value(name, wait)
2805            except synch_store.RevokedKeyError:
2806                # No more synch on this key
2807                raise service_error(service_error.federant, 
2808                        "Synch key %s revoked" % name)
[2761484]2809            if v is not None:
2810                rv['value'] = v
[e83f2f2]2811            rv['proof'] = proof.to_dict()
[109a32a]2812            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2813            return rv
2814        else:
[e83f2f2]2815            raise service_error(service_error.access, "Access Denied",
2816                    proof=proof)
[2761484]2817       
2818
2819    def SetValue(self, req, fid):
2820        """
2821        Set a value in the synchronized store
2822        """
2823        req = req.get('SetValueRequestBody', None)
2824        if not req:
2825            raise service_error(service_error.req,
2826                    "Bad request format (no SetValueRequestBody)")
2827       
[cf0ff4f]2828        name = req.get('name', None)
2829        v = req.get('value', '')
[2761484]2830
[e83f2f2]2831        if not name:
2832            raise service_error(service_error.req, "No name?")
2833
2834        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2835
2836        if access_ok:
[2761484]2837            try:
2838                self.synch_store.set_value(name, v)
2839                self.write_store()
[109a32a]2840                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2841            except synch_store.CollisionError:
2842                # Translate into a service_error
2843                raise service_error(service_error.req,
2844                        "Value already set: %s" %name)
[dadc4da]2845            except synch_store.RevokedKeyError:
2846                # No more synch on this key
2847                raise service_error(service_error.federant, 
2848                        "Synch key %s revoked" % name)
[e83f2f2]2849                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
[2761484]2850        else:
[e83f2f2]2851            raise service_error(service_error.access, "Access Denied",
2852                    proof=proof)
Note: See TracBrowser for help on using the repository browser.