source: fedd/federation/experiment_control.py @ 2f45140

Last change on this file since 2f45140 was 65d3ac1, checked in by Ted Faber <faber@…>, 12 years ago

Debugging and defending against bad principals in append_experiment_authorization

  • Property mode set to 100644
File size: 95.6 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[25bf6cc]15import shutil
16import zipfile
[6679c122]17
[3df9b33]18import os.path
19
[3441fe3]20import traceback
[c971895]21# For parsing visualization output and splitter output
22import xml.parsers.expat
[3441fe3]23
[6c57fe9]24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
[c573278]26from string import join
[6679c122]27
[db6b092]28from urlparse import urlparse
[4708875]29from urllib2 import urlopen, URLError
[db6b092]30
[ec4fb42]31from util import *
[6bedbdba]32from deter import fedid, generate_fedid
[9460b1e]33from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]34from service_error import service_error
[2761484]35from synch_store import synch_store
[73e7f5c]36from experiment_partition import experiment_partition
[1d73342]37from experiment_control_legacy import experiment_control_legacy
[7206e5a]38from authorizer import abac_authorizer
[faea607]39from thread_pool import thread_pool, pooled_thread
[ab3d6c5]40from experiment_info import experiment_info, allocation_info, federated_service
[22a1a77]41from operation_status import operation_status
[6679c122]42
[6bedbdba]43from deter import topdl
[044dd20]44from deter import ip_allocator
45from deter import ip_addr
[f07fa49]46import list_log
[db6b092]47
[11a08b0]48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
[1d73342]55class experiment_control_local(experiment_control_legacy):
[0ea11af]56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
[79b6596]62
63    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]64   
[f069052]65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]67    call_StartSegment = service_caller('StartSegment')
[db974ed]68    call_TerminateSegment = service_caller('TerminateSegment')
[6e33086]69    call_InfoSegment = service_caller('InfoSegment')
[22a1a77]70    call_OperationSegment = service_caller('OperationSegment')
[5f6929a]71    call_Ns2Topdl = service_caller('Ns2Topdl')
[058f58e]72
[3f6bc5f]73    def __init__(self, config=None, auth=None):
[866c983]74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
[f07fa49]92        self.list_log = list_log.list_log
[866c983]93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
[6c57fe9]104        self.repodir = config.get("experiment_control", "repodir")
[7183b48]105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
[cc8d8e9]107
[866c983]108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
[35a4c01]112        self.nthreads = 10
[866c983]113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
[866c983]123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
[2761484]125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
[5f6929a]128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
[25bf6cc]129        self.grouper_url = config.get("experiment_control", "grouper_url")
[866c983]130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
[175b444]135        dt = config.get("experiment_control", "direct_transit")
[7206e5a]136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
[6e33086]139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
[139e2e2]142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
[866c983]144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]146
[db6b092]147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
[ca489e8]154
[866c983]155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
[5f6929a]158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
[866c983]159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
[7206e5a]179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
[5ecb9a3]186            self.get_access = self.legacy_get_access
[7206e5a]187        elif self.auth_type == 'abac':
[25bf6cc]188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
[7206e5a]190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
[866c983]193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
[2761484]210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
[866c983]216        # Dispatch tables
217        self.soap_services = {\
[a3ad8bd]218                'New': soap_handler('New', self.new_experiment),
[e19b75c]219                'Create': soap_handler('Create', self.create_experiment),
[866c983]220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
[65f3f29]223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[22a1a77]224                'Operation': soap_handler('Operation', self.do_operation),
[866c983]225                'Terminate': soap_handler('Terminate', 
[e19b75c]226                    self.terminate_experiment),
[2761484]227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]229        }
230
231        self.xmlrpc_services = {\
[a3ad8bd]232                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]233                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]238                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]239                    self.terminate_experiment),
[22a1a77]240                'Operation': xmlrpc_handler('Operation', self.do_operation),
[2761484]241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]243        }
[19cc408]244
[a97394b]245    # Call while holding self.state_lock
[eee2b2e]246    def write_state(self):
[866c983]247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
[40dd8c1]254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
[866c983]256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
[d3c8759]259        except EnvironmentError, e:
[866c983]260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]266
[2761484]267    @staticmethod
[29d5f7c]268    def get_alloc_ids(exp):
[2761484]269        """
[29d5f7c]270        Used by read_store and read state.  This used to be worse.
[2761484]271        """
272
[29d5f7c]273        return [ a.allocID for a in exp.get_all_allocations() ]
274
[2761484]275
[a97394b]276    # Call while holding self.state_lock
[eee2b2e]277    def read_state(self):
[866c983]278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
[cc8d8e9]283       
[866c983]284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
[d3c8759]289        except EnvironmentError, e:
[866c983]290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
[cc8d8e9]296        for s in self.state.values():
[866c983]297            try:
[cc8d8e9]298
[29d5f7c]299                eid = s.fedid
[cc8d8e9]300                if eid : 
[7206e5a]301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
[29d5f7c]304                        #self.auth.set_attribute(s['owner'], eid)
[7206e5a]305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
[cc8d8e9]314                else:
315                    raise KeyError("No experiment id")
[866c983]316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]319
[34bc05c]320    def read_mapdb(self, file):
[866c983]321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
[a11eda5]324        also adds testbeds to active if they include , active after
325        their name.
[866c983]326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
[d3c8759]343        except EnvironmentError, e:
[866c983]344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
[1ec5d4a]346        else:
347            f.close()
[2761484]348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
[d3c8759]355        except EnvironmentError, e:
[2761484]356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
[7206e5a]362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
[2761484]373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
[d3c8759]388        except EnvironmentError, e:
[2761484]389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
[25bf6cc]394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
[4708875]411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
[25bf6cc]414        finally:
415            shutil.rmtree(d)
416
[cf0ff4f]417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
[866c983]455       
[6679c122]456    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
[d3c8759]469        except EnvironmentError:
[866c983]470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
[6679c122]480
[3df9b33]481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
[9bde415]496        days = '%s' % (365 * 10)
[95be336]497        serial = '%s' % random.randint(0, 1<<16)
[3df9b33]498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
[95be336]513                        '-set_serial', serial, '-req', '-in', node_req, 
[3df9b33]514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
[9bde415]529                f.write(g.read())
[3df9b33]530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
[95be336]534            for fn in (ca_key, node_key, node_req, node_signed):
[3df9b33]535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
[0d830de]545    def gentopo(self, str):
[866c983]546        """
[1d73342]547        Generate the topology data structure from the splitter's XML
[866c983]548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
[0d830de]613
614    def genviz(self, topo):
[866c983]615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
[0ac1934]622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
[cc8d8e9]630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
[d3c8759]659        except EnvironmentError:
[866c983]660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
[db6b092]663        try:
664            dnull = open('/dev/null', 'w')
[d3c8759]665        except EnvironmentError:
[db6b092]666            service_error(service_error.internal,
[886307f]667                    "Failed to open /dev/null in genviz")
668
[866c983]669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
[d3c8759]684        except EnvironmentError:
[866c983]685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
[5954004]688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
[db6b092]695        dnull.close()
[866c983]696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
[1fed67b]716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
[866c983]719        else: return None
[d0ae12d]720
[db6b092]721
[725c55d]722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
[e19b75c]724        """
725        Release access to testbed through fedd
726        """
[db6b092]727
[fd07c48]728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
[e19b75c]730        if not uri:
[69692a9]731            raise service_error(service_error.server_config, 
[e19b75c]732                    "Unknown testbed: %s" % tb)
[db6b092]733
[e19b75c]734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
[29d5f7c]736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
[725c55d]738                    fedid(file=cert_file))
[e19b75c]739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
[29d5f7c]741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
[725c55d]742                    cert_file, cert_pwd, self.trusted_certs)
[db6b092]743
[e19b75c]744        # better error coding
[db6b092]745
[5f6929a]746    def remote_ns2topdl(self, uri, desc):
[db6b092]747
[e19b75c]748        req = {
749                'description' : { 'ns2description': desc },
[db6b092]750            }
751
[5f6929a]752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
[e19b75c]753                self.trusted_certs)
754
[5f6929a]755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
[1dcaff4]757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
[e19b75c]760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]765
[e19b75c]766    class start_segment:
[fd556d1]767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
[cc8d8e9]770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
[fd556d1]776            self.testbed = testbed
[f07fa49]777            self.log_collector = log_collector
[69692a9]778            self.response = None
[b4b19c7]779            self.node = { }
[9e5e251]780            self.subs = { }
[934dd99]781            self.tb = { }
[e83f2f2]782            self.proof = None
[b4b19c7]783
784        def make_map(self, resp):
[29d5f7c]785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
[1ae1aa2]795                    self.node[e.name] = e
[934dd99]796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
[9e5e251]798            for s in top.substrates:
799                self.subs[s.name] = s
[cc8d8e9]800
[43197eb]801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
[cc8d8e9]802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
[e02cd14]808
809            if connInfo:
810                req['connection'] = connInfo
[43197eb]811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
[6c57fe9]830            if attrs:
831                req['fedAttr'] = attrs
[cc8d8e9]832
[fd556d1]833            try:
[13e3dd2]834                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
[f07fa49]837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
[b4b19c7]843                    self.make_map(r['StartSegmentResponseBody'])
[e83f2f2]844                    if 'proof' in r: self.proof = r['proof']
[69692a9]845                    self.response = r
[f07fa49]846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
[fd556d1]849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
[cc8d8e9]854
855
[5ae3857]856
[e19b75c]857    class terminate_segment:
[fd556d1]858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
[fd556d1]866            self.testbed = testbed
[5ae3857]867
868        def __call__(self, uri, aid ):
869            req = {
[29d5f7c]870                    'allocID': {'fedid': aid }, 
[5ae3857]871                }
[a69de97]872            self.log.info("Calling terminate segment")
[fd556d1]873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
[a69de97]876                self.log.info("Terminate segment succeeded")
[fd556d1]877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
[db6b092]882
[6e33086]883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
[22a1a77]911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
[6e33086]950    def annotate_topology(self, top, data):
[f671ef7]951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
[7aaa8dc]977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
[f671ef7]981                        for a in nos.attribute:
[db3da0b]982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
[7aaa8dc]984                            os.set_attribute(a.attribute, a.value)
[f671ef7]985                        break
986                else:
[db3da0b]987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
[6e33086]990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
[f671ef7]995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
[6e33086]1002                        break
[934dd99]1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
[9e5e251]1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
[6e33086]1021
1022
1023
[43197eb]1024    def allocate_resources(self, allocated, masters, eid, expid, 
[b4b19c7]1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
[c573278]1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
[69692a9]1027
[cc8d8e9]1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
[fd07c48]1034        if tbmap is None: tbmap = { }
1035
[cc8d8e9]1036        log = alloc_log or self.log
1037
[faea607]1038        tp = thread_pool(self.nthreads)
[cc8d8e9]1039        threads = [ ]
[b4b19c7]1040        starters = [ ]
[cc8d8e9]1041
[c573278]1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
[822d31b]1047            pw = self.cert_pwd
[c573278]1048
[109a32a]1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
[ab847bc]1052            tb_attrs = copy.copy(attrs)
[faea607]1053            tp.wait_for_slot()
[9294673]1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
[ab847bc]1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
[175b444]1058                    'value': "%s-%s" % (eid, suffix)})
[ab847bc]1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
[109a32a]1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
[9294673]1065            aid = tbparams[tb].allocID
1066            if not aid:
[cc8d8e9]1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
[b4b19c7]1070            s = self.start_segment(log=log, debug=self.debug,
[c573278]1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
[b4b19c7]1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
[faea607]1076            t  = pooled_thread(\
[b4b19c7]1077                    target=s, name=tb,
[ab847bc]1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
[faea607]1079                    pdata=tp, trace_file=self.trace_file)
[69692a9]1080            threads.append(t)
1081            t.start()
[cc8d8e9]1082
[109a32a]1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
[dadc4da]1085        revoked = False
[faea607]1086        while not tp.wait_for_all_done(60.0):
[109a32a]1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
[dadc4da]1090            if not revoked and \
[f52f5df]1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
[69692a9]1102
[cc8d8e9]1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1105
[cc8d8e9]1106        # If one failed clean up, unless fail_soft is set
[32e7d93]1107        if failed:
[cc8d8e9]1108            if not fail_soft:
[faea607]1109                tp.clear()
[cc8d8e9]1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
[faea607]1112                    tp.wait_for_slot()
[9294673]1113                    uri = tbparams[tb].uri
[faea607]1114                    t  = pooled_thread(\
[32e7d93]1115                            target=self.terminate_segment(log=log,
[fd556d1]1116                                testbed=tb,
[696a689]1117                                cert_file=cert, 
1118                                cert_pwd=pw,
[32e7d93]1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
[9294673]1121                            args=(uri, tbparams[tb].allocID),
[32e7d93]1122                            name=tb,
[faea607]1123                            pdata=tp, trace_file=self.trace_file)
[cc8d8e9]1124                    t.start()
[f52f5df]1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
[faea607]1127                    tp.wait_for_all_done()
[cc8d8e9]1128
1129                # release the allocations
1130                for tb in tbparams.keys():
[725c55d]1131                    try:
[9294673]1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
[696a689]1134                                cert_file=cert, cert_pwd=pw)
[725c55d]1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
[cc8d8e9]1137                # Remove the placeholder
1138                self.state_lock.acquire()
[29d5f7c]1139                self.state[eid].status = 'failed'
[6e33086]1140                self.state[eid].updated()
[cc8d8e9]1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
[05e8da8]1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
[cc8d8e9]1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
[29d5f7c]1155            # Walk through the successes and gather the proofs
[e83f2f2]1156            proofs = { }
[b4b19c7]1157            for s in starters:
[e83f2f2]1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
[6e33086]1160            self.annotate_topology(top, starters)
[cc8d8e9]1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
[69692a9]1165        if self.cleanup:
[05e8da8]1166            self.remove_dirs(tmpdir)
[69692a9]1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1169
[b4b19c7]1170        # Insert the experiment into our state and update the disk copy.
[cc8d8e9]1171        self.state_lock.acquire()
[29d5f7c]1172        self.state[expid].status = 'active'
[cc8d8e9]1173        self.state[eid] = self.state[expid]
[29d5f7c]1174        self.state[eid].top = top
[6e33086]1175        self.state[eid].updated()
[e83f2f2]1176        # Append startup proofs
[29d5f7c]1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
[e83f2f2]1180
[cc8d8e9]1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
[895a133]1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
[913dc7a]1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
[65d3ac1]1206            # PNNL Debug
1207            self.log.debug("Append auth: %s %s" % (p, a))
1208            if p and a:
1209                # Improperly configured overrides can add bad principals.
1210                self.auth.set_attribute(p, a)
1211            else:
1212                self.log.debug("Bad append experiment_authorization %s %s" \
1213                        % (p, a))
[913dc7a]1214        self.auth.save()
1215
1216        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1217        # XXX: really a no op?
1218        #self.state[expid]['auth'].update(attrs)
[913dc7a]1219        if self.state_filename: self.write_state()
1220        if need_state_lock: self.state_lock.release()
1221
[a96d946]1222    def clear_experiment_authorization(self, expid, need_state_lock=True):
[913dc7a]1223        """
1224        Attrs is a set of attribute principal pairs that need to be removed
1225        from the authenticator.  Remove them and save the authenticator.
1226        """
1227
1228        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1229        # XXX: should be a no-op
1230        #if expid in self.state and 'auth' in self.state[expid]:
1231            #for p, a in self.state[expid]['auth']:
1232                #self.auth.unset_attribute(p, a)
1233            #self.state[expid]['auth'] = set()
[913dc7a]1234        if self.state_filename: self.write_state()
1235        if need_state_lock: self.state_lock.release()
[b67fd22]1236        self.auth.save()
[913dc7a]1237
[895a133]1238
[b4b19c7]1239    def create_experiment_state(self, fid, req, expid, expcert,
[a3ad8bd]1240            state='starting'):
[895a133]1241        """
1242        Create the initial entry in the experiment's state.  The expid and
1243        expcert are the experiment's fedid and certifacte that represents that
1244        ID, which are installed in the experiment state.  If the request
1245        includes a suggested local name that is used if possible.  If the local
1246        name is already taken by an experiment owned by this user that has
[a3ad8bd]1247        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1248        valid localname is found.  The generated local name is returned.
1249        """
1250
1251        if req.has_key('experimentID') and \
1252                req['experimentID'].has_key('localname'):
1253            overwrite = False
1254            eid = req['experimentID']['localname']
1255            # If there's an old failed experiment here with the same local name
1256            # and accessible by this user, we'll overwrite it, otherwise we'll
1257            # fall through and do the collision avoidance.
1258            old_expid = self.get_experiment_fedid(eid)
[74572ba]1259            if old_expid:
1260                users_experiment = True
1261                try:
1262                    self.check_experiment_access(fid, old_expid)
1263                except service_error, e:
1264                    if e.code == service_error.access: users_experiment = False
1265                    else: raise e
1266                if users_experiment:
1267                    self.state_lock.acquire()
[29d5f7c]1268                    status = self.state[eid].status
[74572ba]1269                    if status and status == 'failed':
1270                        # remove the old access attributes
1271                        self.clear_experiment_authorization(eid,
1272                                need_state_lock=False)
1273                        overwrite = True
1274                        del self.state[eid]
1275                        del self.state[old_expid]
1276                    self.state_lock.release()
[6031c9d]1277                else:
1278                    self.log.info('Experiment %s exists, ' % eid + \
1279                            'but this user cannot access it')
[895a133]1280            self.state_lock.acquire()
1281            while (self.state.has_key(eid) and not overwrite):
1282                eid += random.choice(string.ascii_letters)
1283            # Initial state
[29d5f7c]1284            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1285                    identity=expcert)
[895a133]1286            self.state[expid] = self.state[eid]
[913dc7a]1287            if self.state_filename: self.write_state()
1288            self.state_lock.release()
[895a133]1289        else:
1290            eid = self.exp_stem
1291            for i in range(0,5):
1292                eid += random.choice(string.ascii_letters)
1293            self.state_lock.acquire()
1294            while (self.state.has_key(eid)):
1295                eid = self.exp_stem
1296                for i in range(0,5):
1297                    eid += random.choice(string.ascii_letters)
1298            # Initial state
[29d5f7c]1299            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1300                    identity=expcert)
[895a133]1301            self.state[expid] = self.state[eid]
[913dc7a]1302            if self.state_filename: self.write_state()
1303            self.state_lock.release()
1304
1305        # Let users touch the state.  Authorize this fid and the expid itself
1306        # to touch the experiment, as well as allowing th eoverrides.
1307        self.append_experiment_authorization(eid, 
1308                set([(fid, expid), (expid,expid)] + \
1309                        [ (o, expid) for o in self.overrides]))
[895a133]1310
1311        return eid
1312
1313
1314    def allocate_ips_to_topo(self, top):
1315        """
[69692a9]1316        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1317        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1318        created and returned as a list of hostfiles entries.  We also return
1319        the allocator, because we may need to allocate IPs to portals
1320        (specifically DRAGON portals).
[895a133]1321        """
1322        subs = sorted(top.substrates, 
1323                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1324                reverse=True)
1325        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1326        ifs = { }
1327        hosts = [ ]
1328
1329        for idx, s in enumerate(subs):
[289ff7e]1330            net_size = len(s.interfaces)+2
1331
1332            a = ips.allocate(net_size)
[895a133]1333            if a :
1334                base, num = a
[289ff7e]1335                if num < net_size: 
[895a133]1336                    raise service_error(service_error.internal,
1337                            "Allocator returned wrong number of IPs??")
1338            else:
1339                raise service_error(service_error.req, 
1340                        "Cannot allocate IP addresses")
[062b991]1341            mask = ips.min_alloc
1342            while mask < net_size:
1343                mask *= 2
[289ff7e]1344
[062b991]1345            netmask = ((2**32-1) ^ (mask-1))
[895a133]1346
1347            base += 1
1348            for i in s.interfaces:
1349                i.attribute.append(
1350                        topdl.Attribute('ip4_address', 
1351                            "%s" % ip_addr(base)))
[289ff7e]1352                i.attribute.append(
1353                        topdl.Attribute('ip4_netmask', 
1354                            "%s" % ip_addr(int(netmask))))
1355
[1e7f268]1356                hname = i.element.name
[895a133]1357                if ifs.has_key(hname):
1358                    hosts.append("%s\t%s-%s %s-%d" % \
1359                            (ip_addr(base), hname, s.name, hname,
1360                                ifs[hname]))
1361                else:
1362                    ifs[hname] = 0
1363                    hosts.append("%s\t%s-%s %s-%d %s" % \
1364                            (ip_addr(base), hname, s.name, hname,
1365                                ifs[hname], hname))
1366
1367                ifs[hname] += 1
1368                base += 1
[69692a9]1369        return hosts, ips
[895a133]1370
[1d73342]1371    def get_access_to_testbeds(self, testbeds, fid, allocated, 
[725c55d]1372            tbparam, masters, tbmap, expid=None, expcert=None):
[6e63513]1373        for tb in testbeds:
[1d73342]1374            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
[c573278]1375                    expcert)
[6e63513]1376            allocated[tb] = 1
1377
[1d73342]1378    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1379            expcert=None):
[6e63513]1380        """
1381        Get access to testbed through fedd and set the parameters for that tb
1382        """
1383        def get_export_project(svcs):
1384            """
1385            Look through for the list of federated_service for this testbed
1386            objects for a project_export service, and extract the project
1387            parameter.
1388            """
1389
1390            pe = [s for s in svcs if s.name=='project_export']
1391            if len(pe) == 1:
1392                return pe[0].params.get('project', None)
1393            elif len(pe) == 0:
1394                return None
1395            else:
1396                raise service_error(service_error.req,
1397                        "More than one project export is not supported")
1398
[d0912be]1399        def add_services(svcs, type, slist, keys):
[63c6664]1400            """
[d0912be]1401            Add the given services to slist.  type is import or export.  Also
1402            add a mapping entry from the assigned id to the original service
1403            record.
[63c6664]1404            """
1405            for i, s in enumerate(svcs):
1406                idx = '%s%d' % (type, i)
[d0912be]1407                keys[idx] = s
[63c6664]1408                sr = {'id': idx, 'name': s.name, 'visibility': type }
1409                if s.params:
1410                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1411                            for k, v in s.params.items()]
1412                slist.append(sr)
1413
[6e63513]1414        uri = tbmap.get(testbed_base(tb), None)
1415        if not uri:
1416            raise service_error(service_error.server_config, 
1417                    "Unknown testbed: %s" % tb)
1418
1419        export_svcs = masters.get(tb,[])
1420        import_svcs = [ s for m in masters.values() \
1421                for s in m \
1422                    if tb in s.importers ]
1423
1424        export_project = get_export_project(export_svcs)
1425        # Compose the credential list so that IDs come before attributes
1426        creds = set()
1427        keys = set()
[c573278]1428        certs = self.auth.get_creds_for_principal(fid)
[a0119a1]1429        # Append credenials about this experiment controller - e.g. that it is
1430        # trusted.
1431        certs.update(self.auth.get_creds_for_principal(
1432            fedid(file=self.cert_file)))
[c573278]1433        if expid:
1434            certs.update(self.auth.get_creds_for_principal(expid))
1435        for c in certs:
[6e63513]1436            keys.add(c.issuer_cert())
1437            creds.add(c.attribute_cert())
1438        creds = list(keys) + list(creds)
1439
[c573278]1440        if expcert: cert, pw = expcert, None
1441        else: cert, pw = self.cert_file, self.cert_pw
1442
[6e63513]1443        # Request credentials
1444        req = {
1445                'abac_credential': creds,
1446            }
1447        # Make the service request from the services we're importing and
1448        # exporting.  Keep track of the export request ids so we can
1449        # collect the resulting info from the access response.
1450        e_keys = { }
1451        if import_svcs or export_svcs:
[63c6664]1452            slist = []
[d0912be]1453            add_services(import_svcs, 'import', slist, e_keys)
1454            add_services(export_svcs, 'export', slist, e_keys)
[63c6664]1455            req['service'] = slist
[6e63513]1456
1457        if self.local_access.has_key(uri):
1458            # Local access call
1459            req = { 'RequestAccessRequestBody' : req }
1460            r = self.local_access[uri].RequestAccess(req, 
1461                    fedid(file=self.cert_file))
1462            r = { 'RequestAccessResponseBody' : r }
1463        else:
[c573278]1464            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
[6e63513]1465
[725c55d]1466        if r.has_key('RequestAccessResponseBody'):
1467            # Through to here we have a valid response, not a fault.
1468            # Access denied is a fault, so something better or worse than
1469            # access denied has happened.
1470            r = r['RequestAccessResponseBody']
1471            self.log.debug("[get_access] Access granted")
1472        else:
1473            raise service_error(service_error.protocol,
1474                        "Bad proxy response")
[e83f2f2]1475        if 'proof' not in r:
1476            raise service_error(service_error.protocol,
1477                        "Bad access response (no access proof)")
[ab3d6c5]1478
[9294673]1479        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
[ab3d6c5]1480                tb=tb, uri=uri, proof=[r['proof']], 
1481                services=masters.get(tb, None))
[6e63513]1482
1483        # Collect the responses corresponding to the services this testbed
1484        # exports.  These will be the service requests that we will include in
1485        # the start segment requests (with appropriate visibility values) to
1486        # import and export the segments.
1487        for s in r.get('service', []):
1488            id = s.get('id', None)
[ab3d6c5]1489            # Note that this attaches the response to the object in the masters
1490            # data structure.  (The e_keys index disappears when this fcn
1491            # returns)
[6e63513]1492            if id and id in e_keys:
1493                e_keys[id].reqs.append(s)
1494
1495        # Add attributes to parameter space.  We don't allow attributes to
1496        # overlay any parameters already installed.
1497        for a in r.get('fedAttr', []):
1498            try:
[9294673]1499                if a['attribute']:
1500                    tbparam[tb].set_attribute(a['attribute'], a['value'])
[6e63513]1501            except KeyError:
1502                self.log.error("Bad attribute in response: %s" % a)
1503
1504
[7fe81be]1505    def split_topology(self, top, topo, testbeds):
[895a133]1506        """
[e02cd14]1507        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1508        """
1509        for tb in testbeds:
1510            topo[tb] = top.clone()
[7fe81be]1511            # copy in for loop allows deletions from the original
1512            for e in [ e for e in topo[tb].elements]:
[895a133]1513                etb = e.get_attribute('testbed')
[7fe81be]1514                # NB: elements without a testbed attribute won't appear in any
1515                # sub topologies. 
1516                if not etb or etb != tb:
[895a133]1517                    for i in e.interface:
1518                        for s in i.subs:
1519                            try:
1520                                s.interfaces.remove(i)
1521                            except ValueError:
1522                                raise service_error(service_error.internal,
1523                                        "Can't remove interface??")
[7fe81be]1524                    topo[tb].elements.remove(e)
[895a133]1525            topo[tb].make_indices()
1526
[2627eb3]1527    def confirm_software(self, top):
1528        """
1529        Make sure that the software to be loaded in the topo is all available
1530        before we begin making access requests, etc.  This is a subset of
1531        wrangle_software.
1532        """
1533        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1534        pkgs.update([x.location for e in top.elements for x in e.software])
1535
1536        for pkg in pkgs:
1537            loc = pkg
1538
1539            scheme, host, path = urlparse(loc)[0:3]
1540            dest = os.path.basename(path)
1541            if not scheme:
1542                if not loc.startswith('/'):
1543                    loc = "/%s" % loc
1544                loc = "file://%s" %loc
1545            # NB: if scheme was found, loc == pkg
1546            try:
1547                u = urlopen(loc)
1548                u.close()
1549            except Exception, e:
1550                raise service_error(service_error.req, 
1551                        "Cannot open %s: %s" % (loc, e))
1552        return True
1553
[895a133]1554    def wrangle_software(self, expid, top, topo, tbparams):
1555        """
1556        Copy software out to the repository directory, allocate permissions and
1557        rewrite the segment topologies to look for the software in local
1558        places.
1559        """
1560
[eac54fa]1561        # XXX: PNNL debug
1562        self.log.debug("calling wrangle software")
[895a133]1563        # Copy the rpms and tarfiles to a distribution directory from
1564        # which the federants can retrieve them
1565        linkpath = "%s/software" %  expid
1566        softdir ="%s/%s" % ( self.repodir, linkpath)
1567        softmap = { }
[2627eb3]1568
1569        # self.fedkit and self.gateway kit are lists of tuples of
1570        # (install_location, download_location) this extracts the download
1571        # locations.
1572        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1573        pkgs.update([x.location for e in top.elements for x in e.software])
[895a133]1574        try:
[eac54fa]1575            # XXX: PNNL debug
1576            self.log.debug("wrangle software: makedirs")
[895a133]1577            os.makedirs(softdir)
[eac54fa]1578            # XXX: PNNL debug
1579            self.log.debug("wrangle software: makedirs success")
[d3c8759]1580        except EnvironmentError, e:
[895a133]1581            raise service_error(
1582                    "Cannot create software directory: %s" % e)
1583        # The actual copying.  Everything's converted into a url for copying.
[913dc7a]1584        auth_attrs = set()
[895a133]1585        for pkg in pkgs:
1586            loc = pkg
1587
1588            scheme, host, path = urlparse(loc)[0:3]
1589            dest = os.path.basename(path)
1590            if not scheme:
1591                if not loc.startswith('/'):
1592                    loc = "/%s" % loc
1593                loc = "file://%s" %loc
[2627eb3]1594            # NB: if scheme was found, loc == pkg
[895a133]1595            try:
1596                u = urlopen(loc)
1597            except Exception, e:
1598                raise service_error(service_error.req, 
1599                        "Cannot open %s: %s" % (loc, e))
1600            try:
1601                f = open("%s/%s" % (softdir, dest) , "w")
1602                self.log.debug("Writing %s/%s" % (softdir,dest) )
1603                data = u.read(4096)
1604                while data:
1605                    f.write(data)
1606                    data = u.read(4096)
1607                f.close()
1608                u.close()
1609            except Exception, e:
1610                raise service_error(service_error.internal,
1611                        "Could not copy %s: %s" % (loc, e))
1612            path = re.sub("/tmp", "", linkpath)
1613            # XXX
1614            softmap[pkg] = \
[7183b48]1615                    "%s/%s/%s" %\
1616                    ( self.repo_url, path, dest)
[895a133]1617
[913dc7a]1618            # Allow the individual segments to access the software by assigning
1619            # an attribute to each testbed allocation that encodes the data to
1620            # be released.  This expression collects the data for each run of
1621            # the loop.
1622            auth_attrs.update([
[9294673]1623                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
[913dc7a]1624                        for tb in tbparams.keys()])
1625
1626        self.append_experiment_authorization(expid, auth_attrs)
[895a133]1627
1628        # Convert the software locations in the segments into the local
1629        # copies on this host
1630        for soft in [ s for tb in topo.values() \
1631                for e in tb.elements \
1632                    if getattr(e, 'software', False) \
1633                        for s in e.software ]:
1634            if softmap.has_key(soft.location):
1635                soft.location = softmap[soft.location]
1636
1637
[a3ad8bd]1638    def new_experiment(self, req, fid):
1639        """
1640        The external interface to empty initial experiment creation called from
1641        the dispatcher.
1642
1643        Creates a working directory, splits the incoming description using the
1644        splitter script and parses out the avrious subsections using the
1645        lcasses above.  Once each sub-experiment is created, use pooled threads
1646        to instantiate them and start it all up.
1647        """
[2bb8b35]1648        self.log.info("New experiment call started for %s" % fid)
[7206e5a]1649        req = req.get('NewRequestBody', None)
1650        if not req:
1651            raise service_error(service_error.req,
1652                    "Bad request format (no NewRequestBody)")
1653
[d58ee5e]1654        # import may partially succeed so always save credentials and warn
1655        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1656            self.log.debug("Failed to import delegation credentials(!)")
[25bf6cc]1657        self.get_grouper_updates(fid)
1658        self.auth.update()
[d58ee5e]1659        self.auth.save()
[c573278]1660
[8cab4c2]1661        try:
1662            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1663                    with_proof=True)
1664        except service_error, e:
1665            self.log.info("New experiment call for %s: access denied" % fid)
1666            raise e
1667
[e83f2f2]1668
1669        if not access_ok:
[2bb8b35]1670            self.log.info("New experiment call for %s: Access denied" % fid)
[e83f2f2]1671            raise service_error(service_error.access, "New access denied",
1672                    proof=[proof])
[a3ad8bd]1673
1674        try:
1675            tmpdir = tempfile.mkdtemp(prefix="split-")
[d3c8759]1676        except EnvironmentError:
[a3ad8bd]1677            raise service_error(service_error.internal, "Cannot create tmp dir")
1678
1679        # Generate an ID for the experiment (slice) and a certificate that the
1680        # allocator can use to prove they own it.  We'll ship it back through
[7206e5a]1681        # the encrypted connection.  If the requester supplied one, use it.
1682        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1683            expcert = req['experimentAccess']['X509']
[962ea25]1684            expid = fedid(certstr=expcert)
[7206e5a]1685            self.state_lock.acquire()
1686            if expid in self.state:
1687                self.state_lock.release()
1688                raise service_error(service_error.req, 
1689                        'fedid %s identifies an existing experiment' % expid)
1690            self.state_lock.release()
1691        else:
1692            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
[a3ad8bd]1693
1694        #now we're done with the tmpdir, and it should be empty
1695        if self.cleanup:
1696            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1697            os.rmdir(tmpdir)
1698        else:
1699            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1700
1701        eid = self.create_experiment_state(fid, req, expid, expcert, 
1702                state='empty')
1703
1704        rv = {
1705                'experimentID': [
1706                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1707                ],
1708                'experimentStatus': 'empty',
[e83f2f2]1709                'experimentAccess': { 'X509' : expcert },
1710                'proof': proof.to_dict(),
[a3ad8bd]1711            }
1712
[2bb8b35]1713        self.log.info("New experiment call succeeded for %s" % fid)
[a3ad8bd]1714        return rv
1715
[cf0ff4f]1716    # create_experiment sub-functions
1717
[5ecb9a3]1718    @staticmethod
[cf0ff4f]1719    def get_experiment_key(req, field='experimentID'):
[5ecb9a3]1720        """
1721        Parse the experiment identifiers out of the request (the request body
1722        tag has been removed).  Specifically this pulls either the fedid or the
1723        localname out of the experimentID field.  A fedid is preferred.  If
1724        neither is present or the request does not contain the fields,
1725        service_errors are raised.
1726        """
1727        # Get the experiment access
[cf0ff4f]1728        exp = req.get(field, None)
[5ecb9a3]1729        if exp:
1730            if exp.has_key('fedid'):
1731                key = exp['fedid']
1732            elif exp.has_key('localname'):
1733                key = exp['localname']
1734            else:
1735                raise service_error(service_error.req, "Unknown lookup type")
1736        else:
1737            raise service_error(service_error.req, "No request?")
1738
1739        return key
1740
1741    def get_experiment_ids_and_start(self, key, tmpdir):
1742        """
1743        Get the experiment name, id and access certificate from the state, and
1744        set the experiment state to 'starting'.  returns a triple (fedid,
1745        localname, access_cert_file). The access_cert_file is a copy of the
1746        contents of the access certificate, created in the tempdir with
1747        restricted permissions.  If things are confused, raise an exception.
1748        """
1749
1750        expid = eid = None
1751        self.state_lock.acquire()
[29d5f7c]1752        if key in self.state:
1753            exp = self.state[key]
1754            exp.status = "starting"
[6e33086]1755            exp.updated()
[29d5f7c]1756            expid = exp.fedid
1757            eid = exp.localname
1758            expcert = exp.identity
[5ecb9a3]1759        self.state_lock.release()
1760
1761        # make a protected copy of the access certificate so the experiment
1762        # controller can act as the experiment principal.
1763        if expcert:
1764            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1765            if not expcert_file:
1766                raise service_error(service_error.internal, 
1767                        "Cannot create temp cert file?")
1768        else:
1769            expcert_file = None
1770
1771        return (eid, expid, expcert_file)
1772
1773    def get_topology(self, req, tmpdir):
1774        """
1775        Get the ns2 content and put it into a file for parsing.  Call the local
1776        or remote parser and return the topdl.Topology.  Errors result in
1777        exceptions.  req is the request and tmpdir is a work directory.
1778        """
1779
1780        # The tcl parser needs to read a file so put the content into that file
1781        descr=req.get('experimentdescription', None)
1782        if descr:
[a7c0bcb]1783            if 'ns2description' in descr:
1784                file_content=descr['ns2description']
1785            elif 'topdldescription' in descr:
1786                return topdl.Topology(**descr['topdldescription'])
1787            else:
1788                raise service_error(service_error.req, 
1789                        'Unknown experiment description type')
[5ecb9a3]1790        else:
1791            raise service_error(service_error.req, "No experiment description")
1792
1793
1794        if self.splitter_url:
1795            self.log.debug("Calling remote topdl translator at %s" % \
1796                    self.splitter_url)
1797            top = self.remote_ns2topdl(self.splitter_url, file_content)
1798        else:
1799            tclfile = os.path.join(tmpdir, "experiment.tcl")
1800            if file_content:
1801                try:
1802                    f = open(tclfile, 'w')
1803                    f.write(file_content)
1804                    f.close()
1805                except EnvironmentError:
1806                    raise service_error(service_error.internal,
1807                            "Cannot write temp experiment description")
1808            else:
1809                raise service_error(service_error.req, 
1810                        "Only ns2descriptions supported")
1811            pid = "dummy"
1812            gid = "dummy"
1813            eid = "dummy"
1814
1815            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1816                str(self.muxmax), '-m', 'dummy']
1817
1818            tclcmd.extend([pid, gid, eid, tclfile])
1819
1820            self.log.debug("running local splitter %s", " ".join(tclcmd))
1821            # This is just fantastic.  As a side effect the parser copies
1822            # tb_compat.tcl into the current directory, so that directory
1823            # must be writable by the fedd user.  Doing this in the
1824            # temporary subdir ensures this is the case.
1825            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1826                    cwd=tmpdir)
1827            split_data = tclparser.stdout
1828
1829            top = topdl.topology_from_xml(file=split_data, top="experiment")
1830            os.remove(tclfile)
1831
1832        return top
1833
[1660f7c]1834    def get_testbed_services(self, req, testbeds):
[5ecb9a3]1835        """
[57facae]1836        Parse the services section of the request into two dicts mapping
1837        testbed to lists of federated_service objects.  The first dict maps all
1838        exporters of services to those service objects, the second maps
1839        testbeds to service objects only for services requiring portals.
[5ecb9a3]1840        """
[f96be61]1841
1842        # Sanity check the services.  Exports or imports from unknown testbeds
1843        # cause an exception.
1844        for s in req.get('service', []):
1845            for t in s.get('import', []):
1846                if t not in testbeds:
1847                    raise service_error(service_error.req, 
1848                            'Service import to unknown testbed: %s' %t)
1849            for t in s.get('export', []):
1850                if t not in testbeds:
1851                    raise service_error(service_error.req, 
1852                            'Service export from unknown testbed: %s' %t)
1853
1854
[57facae]1855        # We construct both dicts here because deriving the second is more
[f96be61]1856        # complex than it looks - both the keys and lists can differ, and it's
[57facae]1857        # much easier to generate both in one pass.
[5ecb9a3]1858        masters = { }
[57facae]1859        pmasters = { }
[5ecb9a3]1860        for s in req.get('service', []):
1861            # If this is a service request with the importall field
1862            # set, fill it out.
1863
1864            if s.get('importall', False):
1865                s['import'] = [ tb for tb in testbeds \
1866                        if tb not in s.get('export',[])]
1867                del s['importall']
1868
1869            # Add the service to masters
1870            for tb in s.get('export', []):
1871                if s.get('name', None):
1872
1873                    params = { }
1874                    for a in s.get('fedAttr', []):
1875                        params[a.get('attribute', '')] = a.get('value','')
1876
1877                    fser = federated_service(name=s['name'],
1878                            exporter=tb, importers=s.get('import',[]),
1879                            params=params)
1880                    if fser.name == 'hide_hosts' \
1881                            and 'hosts' not in fser.params:
1882                        fser.params['hosts'] = \
1883                                ",".join(tb_hosts.get(fser.exporter, []))
1884                    if tb in masters: masters[tb].append(fser)
1885                    else: masters[tb] = [fser]
[57facae]1886
1887                    if fser.portal:
1888                        if tb in pmasters: pmasters[tb].append(fser)
1889                        else: pmasters[tb] = [fser]
[5ecb9a3]1890                else:
1891                    self.log.error('Testbed service does not have name " + \
1892                            "and importers')
[57facae]1893        return masters, pmasters
[5ecb9a3]1894
[cf0ff4f]1895    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1896        """
[3df9b33]1897        Create the ssh keys necessary for interconnecting the portal nodes and
[cf0ff4f]1898        the global hosts file for letting each segment know about the IP
1899        addresses in play.  Save these into the repo.  Add attributes to the
1900        autorizer allowing access controllers to download them and return a set
[3df9b33]1901        of attributes that inform the segments where to find this stuff.  May
[cf0ff4f]1902        raise service_errors in if there are problems.
1903        """
1904        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1905        gw_secretkey_base = "fed.%s" % self.ssh_type
[3df9b33]1906        keydir = os.path.join(tmpdir, 'keys')
1907        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1908        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
[cf0ff4f]1909
1910        try:
1911            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1912        except ValueError:
1913            raise service_error(service_error.server_config, 
1914                    "Bad key type (%s)" % self.ssh_type)
1915
[3df9b33]1916        self.generate_seer_certs(keydir)
[cf0ff4f]1917
1918        # Copy configuration files into the remote file store
1919        # The config urlpath
1920        configpath = "/%s/config" % expid
1921        # The config file system location
1922        configdir ="%s%s" % ( self.repodir, configpath)
1923        try:
1924            os.makedirs(configdir)
1925        except EnvironmentError, e:
1926            raise service_error(service_error.internal,
1927                    "Cannot create config directory: %s" % e)
1928        try:
1929            f = open("%s/hosts" % configdir, "w")
1930            print >> f, string.join(hosts, '\n')
1931            f.close()
1932        except EnvironmentError, e:
1933            raise service_error(service_error.internal, 
1934                    "Cannot write hosts file: %s" % e)
1935        try:
[3df9b33]1936            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1937            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1938            copy_file(os.path.join(keydir, 'ca.pem'), 
1939                    os.path.join(configdir, 'ca.pem'))
1940            copy_file(os.path.join(keydir, 'node.pem'), 
1941                    os.path.join(configdir, 'node.pem'))
[cf0ff4f]1942        except EnvironmentError, e:
1943            raise service_error(service_error.internal, 
1944                    "Cannot copy keyfiles: %s" % e)
1945
[913dc7a]1946        # Allow the individual testbeds to access the configuration files,
1947        # again by setting an attribute for the relevant pathnames on each
1948        # allocation principal.  Yeah, that's a long list comprehension.
1949        self.append_experiment_authorization(expid, set([
[9294673]1950            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
[913dc7a]1951                    for tb in tbparams.keys() \
[3df9b33]1952                        for f in ("hosts", 'ca.pem', 'node.pem', 
1953                            gw_secretkey_base, gw_pubkey_base)]))
[cf0ff4f]1954
1955        attrs = [ 
1956                {
1957                    'attribute': 'ssh_pubkey', 
1958                    'value': '%s/%s/config/%s' % \
1959                            (self.repo_url, expid, gw_pubkey_base)
1960                },
1961                {
1962                    'attribute': 'ssh_secretkey', 
1963                    'value': '%s/%s/config/%s' % \
1964                            (self.repo_url, expid, gw_secretkey_base)
1965                },
1966                {
1967                    'attribute': 'hosts', 
1968                    'value': '%s/%s/config/hosts' % \
1969                            (self.repo_url, expid)
1970                },
[3df9b33]1971                {
1972                    'attribute': 'seer_ca_pem', 
1973                    'value': '%s/%s/config/%s' % \
1974                            (self.repo_url, expid, 'ca.pem')
1975                },
1976                {
1977                    'attribute': 'seer_node_pem', 
1978                    'value': '%s/%s/config/%s' % \
1979                            (self.repo_url, expid, 'node.pem')
1980                },
[cf0ff4f]1981            ]
1982        return attrs
1983
1984
1985    def get_vtopo(self, req, fid):
1986        """
1987        Return the stored virtual topology for this experiment
1988        """
1989        rv = None
1990        state = None
[2bb8b35]1991        self.log.info("vtopo call started for %s" %  fid)
[cf0ff4f]1992
1993        req = req.get('VtopoRequestBody', None)
1994        if not req:
1995            raise service_error(service_error.req,
1996                    "Bad request format (no VtopoRequestBody)")
1997        exp = req.get('experiment', None)
1998        if exp:
1999            if exp.has_key('fedid'):
2000                key = exp['fedid']
2001                keytype = "fedid"
2002            elif exp.has_key('localname'):
2003                key = exp['localname']
2004                keytype = "localname"
2005            else:
2006                raise service_error(service_error.req, "Unknown lookup type")
2007        else:
2008            raise service_error(service_error.req, "No request?")
2009
[8cab4c2]2010        try:
2011            proof = self.check_experiment_access(fid, key)
2012        except service_error, e:
2013            self.log.info("vtopo call failed for %s: access denied" %  fid)
2014            raise e
[cf0ff4f]2015
2016        self.state_lock.acquire()
[29d5f7c]2017        # XXX: this needs to be recalculated
[80b1e82]2018        if key in self.state:
2019            if self.state[key].top is not None:
2020                vtopo = topdl.topology_to_vtopo(self.state[key].top)
[e83f2f2]2021                rv = { 'experiment' : {keytype: key },
[80b1e82]2022                        'vtopo': vtopo,
[e83f2f2]2023                        'proof': proof.to_dict(), 
[cf0ff4f]2024                    }
2025            else:
[80b1e82]2026                state = self.state[key].status
[cf0ff4f]2027        self.state_lock.release()
2028
[2bb8b35]2029        if rv: 
2030            self.log.info("vtopo call completed for %s %s " % \
2031                (key, fid))
2032            return rv
[cf0ff4f]2033        else: 
2034            if state:
[2bb8b35]2035                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2036                    (key, fid))
[cf0ff4f]2037                raise service_error(service_error.partial, 
2038                        "Not ready: %s" % state)
2039            else:
[2bb8b35]2040                self.log.info("vtopo call completed for %s %s (No experiment)"\
2041                        % (key, fid))
[cf0ff4f]2042                raise service_error(service_error.req, "No such experiment")
2043
2044    def get_vis(self, req, fid):
2045        """
2046        Return the stored visualization for this experiment
2047        """
2048        rv = None
2049        state = None
2050
[2bb8b35]2051        self.log.info("vis call started for %s" %  fid)
[cf0ff4f]2052        req = req.get('VisRequestBody', None)
2053        if not req:
2054            raise service_error(service_error.req,
2055                    "Bad request format (no VisRequestBody)")
2056        exp = req.get('experiment', None)
2057        if exp:
2058            if exp.has_key('fedid'):
2059                key = exp['fedid']
2060                keytype = "fedid"
2061            elif exp.has_key('localname'):
2062                key = exp['localname']
2063                keytype = "localname"
2064            else:
2065                raise service_error(service_error.req, "Unknown lookup type")
2066        else:
2067            raise service_error(service_error.req, "No request?")
2068
[8cab4c2]2069        try:
2070            proof = self.check_experiment_access(fid, key)
2071        except service_error, e:
2072            self.log.info("vis call failed for %s: access denied" %  fid)
2073            raise e
[cf0ff4f]2074
2075        self.state_lock.acquire()
[80b1e82]2076        # Generate the visualization
2077        if key in self.state:
2078            if self.state[key].top is not None:
2079                try:
2080                    vis = self.genviz(
[6a50b78]2081                            topdl.topology_to_vtopo(self.state[key].top))
[80b1e82]2082                except service_error, e:
2083                    self.state_lock.release()
2084                    raise e
2085                rv =  { 'experiment' : {keytype: key },
2086                        'vis': vis,
[e83f2f2]2087                        'proof': proof.to_dict(), 
[80b1e82]2088                        }
[cf0ff4f]2089            else:
[80b1e82]2090                state = self.state[key].status
[cf0ff4f]2091        self.state_lock.release()
2092
[2bb8b35]2093        if rv: 
2094            self.log.info("vis call completed for %s %s " % \
2095                (key, fid))
2096            return rv
[cf0ff4f]2097        else:
2098            if state:
[2bb8b35]2099                self.log.info("vis call completed for %s %s (not ready)" % \
2100                    (key, fid))
[cf0ff4f]2101                raise service_error(service_error.partial, 
2102                        "Not ready: %s" % state)
2103            else:
[2bb8b35]2104                self.log.info("vis call completed for %s %s (no experiment)" % \
2105                    (key, fid))
[cf0ff4f]2106                raise service_error(service_error.req, "No such experiment")
2107
2108   
[ec3aa4d]2109    def save_federant_information(self, allocated, tbparams, eid, top):
[cf0ff4f]2110        """
2111        Store the various data that have changed in the experiment state
2112        between when it was started and the beginning of resource allocation.
2113        This is basically the information about each local allocation.  This
[e83f2f2]2114        fills in the values of the placeholder allocation in the state.  It
2115        also collects the access proofs and returns them as dicts for a
2116        response message.
[cf0ff4f]2117        """
[29d5f7c]2118        self.state_lock.acquire()
2119        exp = self.state[eid]
[ec3aa4d]2120        exp.top = top.clone()
[cf0ff4f]2121        # save federant information
2122        for k in allocated.keys():
[9294673]2123            exp.add_allocation(tbparams[k])
[ec3aa4d]2124            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
[ab3d6c5]2125                type="testbed", localname=[k], 
2126                service=[ s.to_topdl() for s in tbparams[k].services]))
[cf0ff4f]2127
[e83f2f2]2128        # Access proofs for the response message
2129        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
[9294673]2130                    for p in tbparams[k].proof]
[6e33086]2131        exp.updated()
[cf0ff4f]2132        if self.state_filename: 
2133            self.write_state()
2134        self.state_lock.release()
[e83f2f2]2135        return proofs
[cf0ff4f]2136
2137    def clear_placeholder(self, eid, expid, tmpdir):
2138        """
2139        Clear the placeholder and remove any allocated temporary dir.
2140        """
2141
2142        self.state_lock.acquire()
2143        del self.state[eid]
2144        del self.state[expid]
2145        if self.state_filename: self.write_state()
2146        self.state_lock.release()
2147        if tmpdir and self.cleanup:
2148            self.remove_dirs(tmpdir)
2149
2150    # end of create_experiment sub-functions
[5ecb9a3]2151
[e19b75c]2152    def create_experiment(self, req, fid):
[db6b092]2153        """
2154        The external interface to experiment creation called from the
2155        dispatcher.
2156
2157        Creates a working directory, splits the incoming description using the
[43197eb]2158        splitter script and parses out the various subsections using the
[1a4ee0f]2159        classes above.  Once each sub-experiment is created, use pooled threads
2160        to instantiate them and start it all up.
[db6b092]2161        """
[7183b48]2162
[2bb8b35]2163        self.log.info("Create experiment call started for %s" % fid)
[7183b48]2164        req = req.get('CreateRequestBody', None)
[5ecb9a3]2165        if req:
[cf0ff4f]2166            key = self.get_experiment_key(req)
[5ecb9a3]2167        else:
[7183b48]2168            raise service_error(service_error.req,
2169                    "Bad request format (no CreateRequestBody)")
2170
[6e63513]2171        # Import information from the requester
[d58ee5e]2172        # import may partially succeed so always save credentials and warn
[25bf6cc]2173        if not self.auth.import_credentials(data_list=req.get('credential',[])):
[cde9b98]2174            self.log.debug("Failed to import delegation credentials(!)")
[25bf6cc]2175        self.get_grouper_updates(fid)
2176        self.auth.update()
[d58ee5e]2177        self.auth.save()
[6e63513]2178
[8cab4c2]2179        try:
2180            # Make sure that the caller can talk to us
2181            proof = self.check_experiment_access(fid, key)
2182        except service_error, e:
2183            self.log.info("Create experiment call failed for %s: access denied"\
2184                    % fid)
2185            raise e
2186
[db6b092]2187
[fd07c48]2188        # Install the testbed map entries supplied with the request into a copy
2189        # of the testbed map.
2190        tbmap = dict(self.tbmap)
2191        for m in req.get('testbedmap', []):
2192            if 'testbed' in m and 'uri' in m:
2193                tbmap[m['testbed']] = m['uri']
2194
[5ecb9a3]2195        # a place to work
[db6b092]2196        try:
2197            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2198            os.mkdir(tmpdir+"/keys")
[d3c8759]2199        except EnvironmentError:
[db6b092]2200            raise service_error(service_error.internal, "Cannot create tmp dir")
2201
2202        tbparams = { }
2203
[5ecb9a3]2204        eid, expid, expcert_file = \
2205                self.get_experiment_ids_and_start(key, tmpdir)
[c573278]2206
[5ecb9a3]2207        # This catches exceptions to clear the placeholder if necessary
[db6b092]2208        try: 
[5ecb9a3]2209            if not (eid and expid):
2210                raise service_error(service_error.internal, 
2211                        "Cannot find local experiment info!?")
[5f6929a]2212
[5ecb9a3]2213            top = self.get_topology(req, tmpdir)
[2627eb3]2214            self.confirm_software(top)
[5ecb9a3]2215            # Assign the IPs
[69692a9]2216            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[1a4ee0f]2217            # Find the testbeds to look up
[5334044]2218            tb_hosts = { }
[5ecb9a3]2219            testbeds = [ ]
2220            for e in top.elements:
2221                if isinstance(e, topdl.Computer):
2222                    tb = e.get_attribute('testbed') or 'default'
2223                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2224                    else: 
2225                        tb_hosts[tb] = [ e.name ]
2226                        testbeds.append(tb)
2227
[57facae]2228            masters, pmasters = self.get_testbed_services(req, testbeds)
[895a133]2229            allocated = { }         # Testbeds we can access
2230            topo ={ }               # Sub topologies
[e02cd14]2231            connInfo = { }          # Connection information
[5334044]2232
[5ecb9a3]2233            self.get_access_to_testbeds(testbeds, fid, allocated, 
2234                    tbparams, masters, tbmap, expid, expcert_file)
[5f96438]2235
[4ffa6f8]2236            # tbactive is the set of testbeds that have NATs in front of their
2237            # portals. They need to initiate connections.
2238            tbactive = set([k for k, v in tbparams.items() \
2239                    if v.get_attribute('nat_portals')])
2240
2241            self.split_topology(top, topo, testbeds)
2242
[cf0ff4f]2243            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
[eac54fa]2244            # XXX: PNNL debug
2245            self.log.debug("Back from generate keys")
[cc8d8e9]2246
[fd07c48]2247            part = experiment_partition(self.auth, self.store_url, tbmap,
[a11eda5]2248                    self.muxmax, self.direct_transit, tbactive)
[5334044]2249            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
[2761484]2250                    connInfo, expid)
[eac54fa]2251            # XXX: PNNL debug
2252            self.log.debug("Back from add portals")
[913dc7a]2253
2254            auth_attrs = set()
[ab847bc]2255            # Now get access to the dynamic testbeds (those added above)
2256            for tb in [ t for t in topo if t not in allocated]:
[eac54fa]2257                # XXX: PNNL debug
2258                self.log.debug("dynamic testbeds %s" %tb)
[cf0ff4f]2259                self.get_access(tb, tbparams, fid, masters, tbmap, 
2260                        expid, expcert_file)
[ab847bc]2261                allocated[tb] = 1
2262                store_keys = topo[tb].get_attribute('store_keys')
2263                # Give the testbed access to keys it exports or imports
2264                if store_keys:
[b16cfc0]2265                    auth_attrs.update(set([
[9294673]2266                        (tbparams[tb].allocID, sk) \
[913dc7a]2267                                for sk in store_keys.split(" ")]))
2268
[eac54fa]2269            # XXX: PNNL debug
2270            self.log.debug("done with dynamic testbeds %s" % auth_attrs)
[913dc7a]2271            if auth_attrs:
2272                self.append_experiment_authorization(expid, auth_attrs)
[69692a9]2273
[cf0ff4f]2274            # transit and disconnected testbeds may not have a connInfo entry.
2275            # Fill in the blanks.
2276            for t in allocated.keys():
2277                if not connInfo.has_key(t):
2278                    connInfo[t] = { }
2279
[895a133]2280            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2281
[e83f2f2]2282            proofs = self.save_federant_information(allocated, tbparams, 
[ec3aa4d]2283                    eid, top)
[866c983]2284        except service_error, e:
2285            # If something goes wrong in the parse (usually an access error)
2286            # clear the placeholder state.  From here on out the code delays
[db6b092]2287            # exceptions.  Failing at this point returns a fault to the remote
2288            # caller.
[2bb8b35]2289
2290            self.log.info("Create experiment call failed for %s %s: %s" % 
2291                    (eid, fid, e))
[cf0ff4f]2292            self.clear_placeholder(eid, expid, tmpdir)
[866c983]2293            raise e
2294
[db6b092]2295        # Start the background swapper and return the starting state.  From
2296        # here on out, the state will stick around a while.
[866c983]2297
[db6b092]2298        # Create a logger that logs to the experiment's state object as well as
2299        # to the main log file.
2300        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[29d5f7c]2301        alloc_collector = self.list_log(self.state[eid].log)
[f07fa49]2302        h = logging.StreamHandler(alloc_collector)
[db6b092]2303        # XXX: there should be a global one of these rather than repeating the
2304        # code.
2305        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2306                    '%d %b %y %H:%M:%S'))
2307        alloc_log.addHandler(h)
[617592b]2308
[db6b092]2309        # Start a thread to do the resource allocation
[e19b75c]2310        t  = Thread(target=self.allocate_resources,
[43197eb]2311                args=(allocated, masters, eid, expid, tbparams, 
[b4b19c7]2312                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
[725c55d]2313                    connInfo, tbmap, expcert_file),
[db6b092]2314                name=eid)
2315        t.start()
2316
2317        rv = {
2318                'experimentID': [
2319                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2320                ],
2321                'experimentStatus': 'starting',
[e83f2f2]2322                'proof': [ proof.to_dict() ] + proofs,
[db6b092]2323            }
[2bb8b35]2324        self.log.info("Create experiment call succeeded for %s %s" % \
2325                (eid, fid))
[db6b092]2326
2327        return rv
[9479343]2328   
2329    def get_experiment_fedid(self, key):
2330        """
[db6b092]2331        find the fedid associated with the localname key in the state database.
[9479343]2332        """
2333
[db6b092]2334        rv = None
2335        self.state_lock.acquire()
[29d5f7c]2336        if key in self.state:
2337            rv = self.state[key].fedid
[db6b092]2338        self.state_lock.release()
2339        return rv
[a97394b]2340
[4064742]2341    def check_experiment_access(self, fid, key):
[866c983]2342        """
2343        Confirm that the fid has access to the experiment.  Though a request
2344        may be made in terms of a local name, the access attribute is always
2345        the experiment's fedid.
2346        """
2347        if not isinstance(key, fedid):
[db6b092]2348            key = self.get_experiment_fedid(key)
[866c983]2349
[e83f2f2]2350        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2351
2352        if access_ok:
2353            return proof
[866c983]2354        else:
[e83f2f2]2355            raise service_error(service_error.access, "Access Denied",
2356                proof)
[4064742]2357
2358
[db6b092]2359    def get_handler(self, path, fid):
[cf0ff4f]2360        """
2361        Perhaps surprisingly named, this function handles HTTP GET requests to
2362        this server (SOAP requests are POSTs).
2363        """
[7183b48]2364        self.log.info("Get handler %s %s" % (path, fid))
[bcc6fd6]2365        if len("%s" % fid) == 0:
2366            return (None, None)
[e83f2f2]2367        # XXX: log proofs?
[6c57fe9]2368        if self.auth.check_attribute(fid, path):
2369            return ("%s/%s" % (self.repodir, path), "application/binary")
2370        else:
2371            return (None, None)
[987aaa1]2372
[6e33086]2373    def update_info(self, key, force=False):
2374        top = None
2375        self.state_lock.acquire()
2376        if key in self.state:
2377            if force or self.state[key].older_than(self.info_cache_limit):
[6a50b78]2378                top = self.state[key].top
[6e33086]2379                if top is not None: top = top.clone()
2380                d1, info_params, cert, d2 = \
2381                        self.get_segment_info(self.state[key], need_lock=False)
2382        self.state_lock.release()
2383
2384        if top is None: return
2385
2386        try:
2387            tmpdir = tempfile.mkdtemp(prefix="info-")
2388        except EnvironmentError:
2389            raise service_error(service_error.internal, 
2390                    "Cannot create tmp dir")
2391        cert_file = self.make_temp_certfile(cert, tmpdir)
2392
2393        data = []
2394        try:
2395            for k, (uri, aid) in info_params.items():
2396                info=self.info_segment(log=self.log, testbed=uri,
2397                            cert_file=cert_file, cert_pwd=None,
2398                            trusted_certs=self.trusted_certs,
2399                            caller=self.call_InfoSegment)
2400                info(uri, aid)
2401                data.append(info)
2402        # Clean up the tmpdir no matter what
2403        finally:
2404            if tmpdir: self.remove_dirs(tmpdir)
2405
2406        self.annotate_topology(top, data)
2407        self.state_lock.acquire()
2408        if key in self.state:
2409            self.state[key].top = top
2410            self.state[key].updated()
2411            if self.state_filename: self.write_state()
2412        self.state_lock.release()
2413
[29d5f7c]2414   
[c52c48d]2415    def get_info(self, req, fid):
[866c983]2416        """
2417        Return all the stored info about this experiment
2418        """
2419        rv = None
2420
[2bb8b35]2421        self.log.info("Info call started for %s" %  fid)
[866c983]2422        req = req.get('InfoRequestBody', None)
2423        if not req:
2424            raise service_error(service_error.req,
[65f3f29]2425                    "Bad request format (no InfoRequestBody)")
[866c983]2426        exp = req.get('experiment', None)
[80b1e82]2427        legacy = req.get('legacy', False)
[6e33086]2428        fresh = req.get('fresh', False)
[866c983]2429        if exp:
2430            if exp.has_key('fedid'):
2431                key = exp['fedid']
2432                keytype = "fedid"
2433            elif exp.has_key('localname'):
2434                key = exp['localname']
2435                keytype = "localname"
2436            else:
2437                raise service_error(service_error.req, "Unknown lookup type")
2438        else:
2439            raise service_error(service_error.req, "No request?")
2440
[8cab4c2]2441        try:
2442            proof = self.check_experiment_access(fid, key)
2443        except service_error, e:
2444            self.log.info("Info call failed for %s: access denied" %  fid)
2445
[866c983]2446
[6e33086]2447        self.update_info(key, fresh)
2448
[866c983]2449        self.state_lock.acquire()
2450        if self.state.has_key(key):
[29d5f7c]2451            rv = self.state[key].get_info()
[6e33086]2452            # Copy the topo if we need legacy annotations
[80b1e82]2453            if legacy:
[6a50b78]2454                top = self.state[key].top
[80b1e82]2455                if top is not None: top = top.clone()
[866c983]2456        self.state_lock.release()
[2bb8b35]2457        self.log.info("Gathered Info for %s %s" % (key, fid))
[866c983]2458
[80b1e82]2459        # If the legacy visualization and topology representations are
[6e33086]2460        # requested, calculate them and add them to the return.
[80b1e82]2461        if legacy and rv is not None:
[2bb8b35]2462            self.log.info("Generating legacy Info for %s %s" % (key, fid))
[80b1e82]2463            if top is not None:
2464                vtopo = topdl.topology_to_vtopo(top)
2465                if vtopo is not None:
2466                    rv['vtopo'] = vtopo
2467                    try:
2468                        vis = self.genviz(vtopo)
2469                    except service_error, e:
2470                        self.log.debug('Problem generating visualization: %s' \
2471                                % e)
2472                        vis = None
2473                    if vis is not None:
2474                        rv['vis'] = vis
[db6b092]2475        if rv:
[2bb8b35]2476            self.log.info("Info succeded for %s %s" % (key, fid))
[29d5f7c]2477            rv['proof'] = proof.to_dict()
2478            return rv
[2bb8b35]2479        else: 
2480            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
[db6b092]2481            raise service_error(service_error.req, "No such experiment")
[7a8d667]2482
[22a1a77]2483    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2484            results):
2485        """
2486        Call OperateSegment on multiple testbeds and gather the results.
2487        op_params contains the parameters needed to contact that testbed, cert
2488        is a certificate containing the fedid to use, op is the operation,
2489        testbeds is a dict mapping testbed name to targets in that testbed,
2490        params are the parameters to include a,d results is a growing list of
2491        the results of the calls.
2492        """
2493        try:
2494            tmpdir = tempfile.mkdtemp(prefix="info-")
2495        except EnvironmentError:
2496            raise service_error(service_error.internal, 
2497                    "Cannot create tmp dir")
2498        cert_file = self.make_temp_certfile(cert, tmpdir)
2499
2500        try:
2501            for tb, targets in testbeds.items():
2502                if tb in op_params:
2503                    uri, aid = op_params[tb]
2504                    operate=self.operation_segment(log=self.log, testbed=uri,
2505                                cert_file=cert_file, cert_pwd=None,
2506                                trusted_certs=self.trusted_certs,
2507                                caller=self.call_OperationSegment)
2508                    if operate(uri, aid, op, targets, params):
2509                        if operate.status is not None:
2510                            results.extend(operate.status)
2511                            continue
2512                # Something went wrong in a weird way.  Add statuses
2513                # that reflect that to results
2514                for t in targets:
2515                    results.append(operation_status(t, 
2516                        operation_status.federant,
[b709861]2517                        'Unexpected error on %s' % tb))
[22a1a77]2518        # Clean up the tmpdir no matter what
2519        finally:
2520            if tmpdir: self.remove_dirs(tmpdir)
2521
2522    def do_operation(self, req, fid):
2523        """
2524        Find the testbeds holding each target and ask them to carry out the
2525        operation.  Return the statuses.
2526        """
2527        # Map an element to the testbed containing it
2528        def element_to_tb(e):
2529            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2530            elif isinstance(e, topdl.Testbed): return e.name
2531            else: return None
2532        # If d is an operation_status object, make it a dict
2533        def make_dict(d):
2534            if isinstance(d, dict): return d
2535            elif isinstance(d, operation_status): return d.to_dict()
2536            else: return { }
2537
[b709861]2538        def element_name(e):
2539            if isinstance(e, topdl.Computer): return e.name
2540            elif isinstance(e, topdl.Testbed): 
2541                if e.localname: return e.localname[0]
2542                else: return None
2543            else: return None
2544
[8cab4c2]2545        self.log.info("Operation call started for %s" %  fid)
[22a1a77]2546        req = req.get('OperationRequestBody', None)
2547        if not req:
2548            raise service_error(service_error.req,
2549                    "Bad request format (no OperationRequestBody)")
2550        exp = req.get('experiment', None)
2551        op = req.get('operation', None)
[b709861]2552        targets = set(req.get('target', []))
[22a1a77]2553        params = req.get('parameter', None)
2554
2555        if exp:
2556            if 'fedid' in exp:
2557                key = exp['fedid']
2558                keytype = "fedid"
2559            elif 'localname' in exp:
2560                key = exp['localname']
2561                keytype = "localname"
2562            else:
2563                raise service_error(service_error.req, "Unknown lookup type")
2564        else:
2565            raise service_error(service_error.req, "No request?")
2566
[b709861]2567        if op is None or not targets:
[22a1a77]2568            raise service_error(service_error.req, "No request?")
2569
[8cab4c2]2570        try:
2571            proof = self.check_experiment_access(fid, key)
2572        except service_error, e:
2573            self.log.info("Operation call failed for %s: access denied" %  fid)
2574            raise e
2575
[22a1a77]2576        self.state_lock.acquire()
2577        if key in self.state:
2578            d1, op_params, cert, d2 = \
[b709861]2579                    self.get_segment_info(self.state[key], need_lock=False,
2580                            key='tb')
[22a1a77]2581            top = self.state[key].top
2582            if top is not None:
2583                top = top.clone()
2584        self.state_lock.release()
2585
2586        if top is None:
[8cab4c2]2587            self.log.info("Operation call failed for %s: not active" %  fid)
[22a1a77]2588            raise service_error(service_error.partial, "No topology yet", 
2589                    proof=proof)
2590
2591        testbeds = { }
2592        results = []
2593        for e in top.elements:
[b709861]2594            ename = element_name(e)
2595            if ename in targets:
[22a1a77]2596                tb = element_to_tb(e)
[b709861]2597                targets.remove(ename)
[22a1a77]2598                if tb is not None:
[b709861]2599                    if tb in testbeds: testbeds[tb].append(ename)
2600                    else: testbeds[tb] = [ ename ]
[22a1a77]2601                else:
2602                    results.append(operation_status(e.name, 
2603                        code=operation_status.no_target, 
2604                        description='Cannot map target to testbed'))
2605
[b709861]2606        for t in targets:
2607            results.append(operation_status(t, operation_status.no_target))
2608
[22a1a77]2609        self.operate_on_segments(op_params, cert, op, testbeds, params,
2610                results)
2611
[8cab4c2]2612        self.log.info("Operation call succeeded for %s" %  fid)
[22a1a77]2613        return { 
2614                'experiment': exp, 
[b709861]2615                'status': [make_dict(r) for r in results],
[22a1a77]2616                'proof': proof.to_dict()
2617                }
2618
2619
[65f3f29]2620    def get_multi_info(self, req, fid):
2621        """
2622        Return all the stored info that this fedid can access
2623        """
[e83f2f2]2624        rv = { 'info': [ ], 'proof': [ ] }
[65f3f29]2625
[2bb8b35]2626        self.log.info("Multi Info call started for %s" %  fid)
[25bf6cc]2627        self.get_grouper_updates(fid)
2628        self.auth.update()
2629        self.auth.save()
[db6b092]2630        self.state_lock.acquire()
2631        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2632            try:
[e83f2f2]2633                proof = self.check_experiment_access(fid, key)
[829246e]2634            except service_error, e:
2635                if e.code == service_error.access:
2636                    continue
2637                else:
[2bb8b35]2638                    self.log.info("Multi Info call failed for %s: %s" %  \
2639                            (e,fid))
[829246e]2640                    self.state_lock.release()
2641                    raise e
[65f3f29]2642
[db6b092]2643            if self.state.has_key(key):
[29d5f7c]2644                e = self.state[key].get_info()
2645                e['proof'] = proof.to_dict()
[db6b092]2646                rv['info'].append(e)
[e83f2f2]2647                rv['proof'].append(proof.to_dict())
[65f3f29]2648        self.state_lock.release()
[2bb8b35]2649        self.log.info("Multi Info call succeeded for %s" %  fid)
[db6b092]2650        return rv
[65f3f29]2651
[cf0ff4f]2652    def check_termination_status(self, fed_exp, force):
[e07c8f3]2653        """
[cf0ff4f]2654        Confirm that the experiment is sin a valid state to stop (or force it)
2655        return the state - invalid states for deletion and force settings cause
2656        exceptions.
[e07c8f3]2657        """
[cf0ff4f]2658        self.state_lock.acquire()
[29d5f7c]2659        status = fed_exp.status
[e07c8f3]2660
[cf0ff4f]2661        if status:
2662            if status in ('starting', 'terminating'):
2663                if not force:
2664                    self.state_lock.release()
2665                    raise service_error(service_error.partial, 
2666                            'Experiment still being created or destroyed')
2667                else:
2668                    self.log.warning('Experiment in %s state ' % status + \
2669                            'being terminated by force.')
2670            self.state_lock.release()
2671            return status
[725c55d]2672        else:
[cf0ff4f]2673            # No status??? trouble
2674            self.state_lock.release()
2675            raise service_error(service_error.internal,
2676                    "Experiment has no status!?")
2677
[b709861]2678    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
[cf0ff4f]2679        ids = []
2680        term_params = { }
[6e33086]2681        if need_lock: self.state_lock.acquire()
[29d5f7c]2682        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2683        expcert = fed_exp.identity
2684        repo = "%s" % fed_exp.fedid
[cf0ff4f]2685
2686        # Collect the allocation/segment ids into a dict keyed by the fedid
[29d5f7c]2687        # of the allocation that contains a tuple of uri, aid
2688        for i, fed in enumerate(fed_exp.get_all_allocations()):
2689            uri = fed.uri
2690            aid = fed.allocID
[b709861]2691            if key == 'aid': term_params[aid] = (uri, aid)
2692            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2693
[6e33086]2694        if need_lock: self.state_lock.release()
2695        return ids, term_params, expcert, repo
2696
2697
2698    def get_termination_info(self, fed_exp):
2699        self.state_lock.acquire()
2700        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
[cf0ff4f]2701        # Change the experiment state
[29d5f7c]2702        fed_exp.status = 'terminating'
[6e33086]2703        fed_exp.updated()
[cf0ff4f]2704        if self.state_filename: self.write_state()
2705        self.state_lock.release()
2706
2707        return ids, term_params, expcert, repo
2708
2709
2710    def deallocate_resources(self, term_params, expcert, status, force, 
2711            dealloc_log):
2712        tmpdir = None
2713        # This try block makes sure the tempdir is cleared
2714        try:
2715            # If no expcert, try the deallocation as the experiment
2716            # controller instance.
2717            if expcert and self.auth_type != 'legacy': 
2718                try:
2719                    tmpdir = tempfile.mkdtemp(prefix="term-")
2720                except EnvironmentError:
2721                    raise service_error(service_error.internal, 
2722                            "Cannot create tmp dir")
2723                cert_file = self.make_temp_certfile(expcert, tmpdir)
2724                pw = None
2725            else: 
2726                cert_file = self.cert_file
2727                pw = self.cert_pwd
2728
2729            # Stop everyone.  NB, wait_for_all waits until a thread starts
2730            # and then completes, so we can't wait if nothing starts.  So,
2731            # no tbparams, no start.
2732            if len(term_params) > 0:
2733                tp = thread_pool(self.nthreads)
2734                for k, (uri, aid) in term_params.items():
2735                    # Create and start a thread to stop the segment
2736                    tp.wait_for_slot()
2737                    t  = pooled_thread(\
2738                            target=self.terminate_segment(log=dealloc_log,
2739                                testbed=uri,
2740                                cert_file=cert_file, 
2741                                cert_pwd=pw,
2742                                trusted_certs=self.trusted_certs,
2743                                caller=self.call_TerminateSegment),
2744                            args=(uri, aid), name=k,
2745                            pdata=tp, trace_file=self.trace_file)
2746                    t.start()
2747                # Wait for completions
2748                tp.wait_for_all_done()
2749
2750            # release the allocations (failed experiments have done this
2751            # already, and starting experiments may be in odd states, so we
2752            # ignore errors releasing those allocations
2753            try: 
2754                for k, (uri, aid)  in term_params.items():
2755                    self.release_access(None, aid, uri=uri,
2756                            cert_file=cert_file, cert_pwd=pw)
2757            except service_error, e:
2758                if status != 'failed' and not force:
2759                    raise e
2760
2761        # Clean up the tmpdir no matter what
2762        finally:
2763            if tmpdir: self.remove_dirs(tmpdir)
2764
[7a8d667]2765    def terminate_experiment(self, req, fid):
[866c983]2766        """
2767        Swap this experiment out on the federants and delete the shared
2768        information
2769        """
[2bb8b35]2770        self.log.info("Terminate experiment call started for %s" % fid)
[866c983]2771        tbparams = { }
2772        req = req.get('TerminateRequestBody', None)
2773        if not req:
2774            raise service_error(service_error.req,
2775                    "Bad request format (no TerminateRequestBody)")
2776
[cf0ff4f]2777        key = self.get_experiment_key(req, 'experiment')
[8cab4c2]2778        try:
2779            proof = self.check_experiment_access(fid, key)
2780        except service_error, e:
2781            self.log.info(
2782                    "Terminate experiment call failed for %s: access denied" \
2783                            % fid)
2784            raise e
[cf0ff4f]2785        exp = req.get('experiment', False)
2786        force = req.get('force', False)
[866c983]2787
[db6b092]2788        dealloc_list = [ ]
[46e4682]2789
2790
[5ae3857]2791        # Create a logger that logs to the dealloc_list as well as to the main
2792        # log file.
2793        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
[a69de97]2794        dealloc_log.info("Terminating %s " %key)
[5ae3857]2795        h = logging.StreamHandler(self.list_log(dealloc_list))
2796        # XXX: there should be a global one of these rather than repeating the
2797        # code.
2798        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2799                    '%d %b %y %H:%M:%S'))
2800        dealloc_log.addHandler(h)
2801
2802        self.state_lock.acquire()
2803        fed_exp = self.state.get(key, None)
[cf0ff4f]2804        self.state_lock.release()
[e07c8f3]2805        repo = None
[5ae3857]2806
2807        if fed_exp:
[cf0ff4f]2808            status = self.check_termination_status(fed_exp, force)
[6e33086]2809            # get_termination_info updates the experiment state
[cf0ff4f]2810            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2811            self.deallocate_resources(term_params, expcert, status, force, 
2812                    dealloc_log)
[5ae3857]2813
2814            # Remove the terminated experiment
2815            self.state_lock.acquire()
2816            for id in ids:
[a96d946]2817                self.clear_experiment_authorization(id, need_state_lock=False)
[cf0ff4f]2818                if id in self.state: del self.state[id]
[5ae3857]2819
2820            if self.state_filename: self.write_state()
2821            self.state_lock.release()
2822
[2761484]2823            # Delete any synch points associated with this experiment.  All
2824            # synch points begin with the fedid of the experiment.
2825            fedid_keys = set(["fedid:%s" % f for f in ids \
2826                    if isinstance(f, fedid)])
2827            for k in self.synch_store.all_keys():
2828                try:
2829                    if len(k) > 45 and k[0:46] in fedid_keys:
2830                        self.synch_store.del_value(k)
[dadc4da]2831                except synch_store.BadDeletionError:
[2761484]2832                    pass
2833            self.write_store()
[e07c8f3]2834
2835            # Remove software and other cached stuff from the filesystem.
2836            if repo:
2837                self.remove_dirs("%s/%s" % (self.repodir, repo))
[e83f2f2]2838       
[2bb8b35]2839            self.log.info("Terminate experiment succeeded for %s %s" % \
2840                    (key, fid))
[5ae3857]2841            return { 
2842                    'experiment': exp , 
[cf0ff4f]2843                    'deallocationLog': string.join(dealloc_list, ''),
[e83f2f2]2844                    'proof': [proof.to_dict()],
[5ae3857]2845                    }
2846        else:
[2bb8b35]2847            self.log.info("Terminate experiment failed for %s %s: no state" % \
2848                    (key, fid))
[5ae3857]2849            raise service_error(service_error.req, "No saved state")
[2761484]2850
2851
2852    def GetValue(self, req, fid):
2853        """
2854        Get a value from the synchronized store
2855        """
2856        req = req.get('GetValueRequestBody', None)
2857        if not req:
2858            raise service_error(service_error.req,
2859                    "Bad request format (no GetValueRequestBody)")
2860       
[cf0ff4f]2861        name = req.get('name', None)
2862        wait = req.get('wait', False)
[2761484]2863        rv = { 'name': name }
2864
[e83f2f2]2865        if not name:
2866            raise service_error(service_error.req, "No name?")
2867
2868        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2869
2870        if access_ok:
[d8442da]2871            self.log.debug("[GetValue] asking for %s " % name)
[dadc4da]2872            try:
2873                v = self.synch_store.get_value(name, wait)
2874            except synch_store.RevokedKeyError:
2875                # No more synch on this key
2876                raise service_error(service_error.federant, 
2877                        "Synch key %s revoked" % name)
[2761484]2878            if v is not None:
2879                rv['value'] = v
[e83f2f2]2880            rv['proof'] = proof.to_dict()
[109a32a]2881            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2882            return rv
2883        else:
[e83f2f2]2884            raise service_error(service_error.access, "Access Denied",
2885                    proof=proof)
[2761484]2886       
2887
2888    def SetValue(self, req, fid):
2889        """
2890        Set a value in the synchronized store
2891        """
2892        req = req.get('SetValueRequestBody', None)
2893        if not req:
2894            raise service_error(service_error.req,
2895                    "Bad request format (no SetValueRequestBody)")
2896       
[cf0ff4f]2897        name = req.get('name', None)
2898        v = req.get('value', '')
[2761484]2899
[e83f2f2]2900        if not name:
2901            raise service_error(service_error.req, "No name?")
2902
2903        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2904
2905        if access_ok:
[2761484]2906            try:
2907                self.synch_store.set_value(name, v)
2908                self.write_store()
[109a32a]2909                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2910            except synch_store.CollisionError:
2911                # Translate into a service_error
2912                raise service_error(service_error.req,
2913                        "Value already set: %s" %name)
[dadc4da]2914            except synch_store.RevokedKeyError:
2915                # No more synch on this key
2916                raise service_error(service_error.federant, 
2917                        "Synch key %s revoked" % name)
[e83f2f2]2918                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
[2761484]2919        else:
[e83f2f2]2920            raise service_error(service_error.access, "Access Denied",
2921                    proof=proof)
Note: See TracBrowser for help on using the repository browser.