source: fedd/federation/experiment_control.py @ e5a595e

compt_changes
Last change on this file since e5a595e was f96be61, checked in by Ted Faber <faber@…>, 12 years ago

Send an error back on service requests to unknown testbeds

  • Property mode set to 100644
File size: 94.8 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[25bf6cc]15import shutil
16import zipfile
[6679c122]17
[3df9b33]18import os.path
19
[3441fe3]20import traceback
[c971895]21# For parsing visualization output and splitter output
22import xml.parsers.expat
[3441fe3]23
[6c57fe9]24from threading import Lock, Thread, Condition
25from subprocess import call, Popen, PIPE
[c573278]26from string import join
[6679c122]27
[db6b092]28from urlparse import urlparse
[4708875]29from urllib2 import urlopen, URLError
[db6b092]30
[ec4fb42]31from util import *
[6bedbdba]32from deter import fedid, generate_fedid
[9460b1e]33from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]34from service_error import service_error
[2761484]35from synch_store import synch_store
[73e7f5c]36from experiment_partition import experiment_partition
[1d73342]37from experiment_control_legacy import experiment_control_legacy
[7206e5a]38from authorizer import abac_authorizer
[faea607]39from thread_pool import thread_pool, pooled_thread
[ab3d6c5]40from experiment_info import experiment_info, allocation_info, federated_service
[22a1a77]41from operation_status import operation_status
[6679c122]42
[6bedbdba]43from deter import topdl
[044dd20]44from deter import ip_allocator
45from deter import ip_addr
[f07fa49]46import list_log
[db6b092]47
[11a08b0]48
49class nullHandler(logging.Handler):
50    def emit(self, record): pass
51
52fl = logging.getLogger("fedd.experiment_control")
53fl.addHandler(nullHandler())
54
[1d73342]55class experiment_control_local(experiment_control_legacy):
[0ea11af]56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
[79b6596]62
63    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]64   
[f069052]65    call_RequestAccess = service_caller('RequestAccess')
66    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]67    call_StartSegment = service_caller('StartSegment')
[db974ed]68    call_TerminateSegment = service_caller('TerminateSegment')
[6e33086]69    call_InfoSegment = service_caller('InfoSegment')
[22a1a77]70    call_OperationSegment = service_caller('OperationSegment')
[5f6929a]71    call_Ns2Topdl = service_caller('Ns2Topdl')
[058f58e]72
[3f6bc5f]73    def __init__(self, config=None, auth=None):
[866c983]74        """
75        Intialize the various attributes, most from the config object
76        """
77
78        def parse_tarfile_list(tf):
79            """
80            Parse a tarfile list from the configuration.  This is a set of
81            paths and tarfiles separated by spaces.
82            """
83            rv = [ ]
84            if tf is not None:
85                tl = tf.split()
86                while len(tl) > 1:
87                    p, t = tl[0:2]
88                    del tl[0:2]
89                    rv.append((p, t))
90            return rv
91
[f07fa49]92        self.list_log = list_log.list_log
[866c983]93
94        self.cert_file = config.get("experiment_control", "cert_file")
95        if self.cert_file:
96            self.cert_pwd = config.get("experiment_control", "cert_pwd")
97        else:
98            self.cert_file = config.get("globals", "cert_file")
99            self.cert_pwd = config.get("globals", "cert_pwd")
100
101        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
102                or config.get("globals", "trusted_certs")
103
[6c57fe9]104        self.repodir = config.get("experiment_control", "repodir")
[7183b48]105        self.repo_url = config.get("experiment_control", "repo_url", 
106                "https://users.isi.deterlab.net:23235");
[cc8d8e9]107
[866c983]108        self.exp_stem = "fed-stem"
109        self.log = logging.getLogger("fedd.experiment_control")
110        set_log_level(config, "experiment_control", self.log)
111        self.muxmax = 2
[35a4c01]112        self.nthreads = 10
[866c983]113        self.randomize_experiments = False
114
115        self.splitter = None
116        self.ssh_keygen = "/usr/bin/ssh-keygen"
117        self.ssh_identity_file = None
118
119
120        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]121        self.cleanup = not config.getboolean("experiment_control", 
122                "leave_tmpfiles")
[866c983]123        self.state_filename = config.get("experiment_control", 
124                "experiment_state")
[2761484]125        self.store_filename = config.get("experiment_control", 
126                "synch_store")
127        self.store_url = config.get("experiment_control", "store_url")
[5f6929a]128        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
[25bf6cc]129        self.grouper_url = config.get("experiment_control", "grouper_url")
[866c983]130        self.fedkit = parse_tarfile_list(\
131                config.get("experiment_control", "fedkit"))
132        self.gatewaykit = parse_tarfile_list(\
133                config.get("experiment_control", "gatewaykit"))
134
[175b444]135        dt = config.get("experiment_control", "direct_transit")
[7206e5a]136        self.auth_type = config.get('experiment_control', 'auth_type') \
137                or 'legacy'
138        self.auth_dir = config.get('experiment_control', 'auth_dir')
[6e33086]139        # XXX: document this!
140        self.info_cache_limit = \
141                config.getint('experiment_control', 'info_cache', 600)
[139e2e2]142        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
143        else: self.direct_transit = [ ]
[866c983]144        # NB for internal master/slave ops, not experiment setup
145        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]146
[db6b092]147        self.overrides = set([])
148        ovr = config.get('experiment_control', 'overrides')
149        if ovr:
150            for o in ovr.split(","):
151                o = o.strip()
152                if o.startswith('fedid:'): o = o[len('fedid:'):]
153                self.overrides.add(fedid(hexstr=o))
[ca489e8]154
[866c983]155        self.state = { }
156        self.state_lock = Lock()
157        self.tclsh = "/usr/local/bin/otclsh"
[5f6929a]158        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
[866c983]159                config.get("experiment_control", "tcl_splitter",
160                        "/usr/testbed/lib/ns2ir/parse.tcl")
161        mapdb_file = config.get("experiment_control", "mapdb")
162        self.trace_file = sys.stderr
163
164        self.def_expstart = \
165                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
166                "/tmp/federate";
167        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
168                "FEDDIR/hosts";
169        self.def_gwstart = \
170                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
171                "/tmp/bridge.log";
172        self.def_mgwstart = \
173                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
174                "/tmp/bridge.log";
175        self.def_gwimage = "FBSD61-TUNNEL2";
176        self.def_gwtype = "pc";
177        self.local_access = { }
178
[7206e5a]179        if self.auth_type == 'legacy':
180            if auth:
181                self.auth = auth
182            else:
183                self.log.error( "[access]: No authorizer initialized, " +\
184                        "creating local one.")
185                auth = authorizer()
[5ecb9a3]186            self.get_access = self.legacy_get_access
[7206e5a]187        elif self.auth_type == 'abac':
[25bf6cc]188            self.auth = abac_authorizer(load=self.auth_dir, 
189                    update=os.path.join(self.auth_dir,'update'))
[7206e5a]190        else:
191            raise service_error(service_error.internal, 
192                    "Unknown auth_type: %s" % self.auth_type)
[866c983]193
194        if mapdb_file:
195            self.read_mapdb(mapdb_file)
196        else:
197            self.log.warn("[experiment_control] No testbed map, using defaults")
198            self.tbmap = { 
199                    'deter':'https://users.isi.deterlab.net:23235',
200                    'emulab':'https://users.isi.deterlab.net:23236',
201                    'ucb':'https://users.isi.deterlab.net:23237',
202                    }
203
204        # Grab saved state.  OK to do this w/o locking because it's read only
205        # and only one thread should be in existence that can see self.state at
206        # this point.
207        if self.state_filename:
208            self.read_state()
209
[2761484]210        if self.store_filename:
211            self.read_store()
212        else:
213            self.log.warning("No saved synch store")
214            self.synch_store = synch_store
215
[866c983]216        # Dispatch tables
217        self.soap_services = {\
[a3ad8bd]218                'New': soap_handler('New', self.new_experiment),
[e19b75c]219                'Create': soap_handler('Create', self.create_experiment),
[866c983]220                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
221                'Vis': soap_handler('Vis', self.get_vis),
222                'Info': soap_handler('Info', self.get_info),
[65f3f29]223                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[22a1a77]224                'Operation': soap_handler('Operation', self.do_operation),
[866c983]225                'Terminate': soap_handler('Terminate', 
[e19b75c]226                    self.terminate_experiment),
[2761484]227                'GetValue': soap_handler('GetValue', self.GetValue),
228                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]229        }
230
231        self.xmlrpc_services = {\
[a3ad8bd]232                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]233                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]234                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
235                'Vis': xmlrpc_handler('Vis', self.get_vis),
236                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]237                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]238                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]239                    self.terminate_experiment),
[22a1a77]240                'Operation': xmlrpc_handler('Operation', self.do_operation),
[2761484]241                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
242                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]243        }
[19cc408]244
[a97394b]245    # Call while holding self.state_lock
[eee2b2e]246    def write_state(self):
[866c983]247        """
248        Write a new copy of experiment state after copying the existing state
249        to a backup.
250
251        State format is a simple pickling of the state dictionary.
252        """
253        if os.access(self.state_filename, os.W_OK):
[40dd8c1]254            copy_file(self.state_filename, \
255                    "%s.bak" % self.state_filename)
[866c983]256        try:
257            f = open(self.state_filename, 'w')
258            pickle.dump(self.state, f)
[d3c8759]259        except EnvironmentError, e:
[866c983]260            self.log.error("Can't write file %s: %s" % \
261                    (self.state_filename, e))
262        except pickle.PicklingError, e:
263            self.log.error("Pickling problem: %s" % e)
264        except TypeError, e:
265            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]266
[2761484]267    @staticmethod
[29d5f7c]268    def get_alloc_ids(exp):
[2761484]269        """
[29d5f7c]270        Used by read_store and read state.  This used to be worse.
[2761484]271        """
272
[29d5f7c]273        return [ a.allocID for a in exp.get_all_allocations() ]
274
[2761484]275
[a97394b]276    # Call while holding self.state_lock
[eee2b2e]277    def read_state(self):
[866c983]278        """
279        Read a new copy of experiment state.  Old state is overwritten.
280
281        State format is a simple pickling of the state dictionary.
282        """
[cc8d8e9]283       
[866c983]284        try:
285            f = open(self.state_filename, "r")
286            self.state = pickle.load(f)
287            self.log.debug("[read_state]: Read state from %s" % \
288                    self.state_filename)
[d3c8759]289        except EnvironmentError, e:
[866c983]290            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
291                    % (self.state_filename, e))
292        except pickle.UnpicklingError, e:
293            self.log.warning(("[read_state]: No saved state: " + \
294                    "Unpickling failed: %s") % e)
295       
[cc8d8e9]296        for s in self.state.values():
[866c983]297            try:
[cc8d8e9]298
[29d5f7c]299                eid = s.fedid
[cc8d8e9]300                if eid : 
[7206e5a]301                    if self.auth_type == 'legacy':
302                        # XXX: legacy
303                        # Give the owner rights to the experiment
[29d5f7c]304                        #self.auth.set_attribute(s['owner'], eid)
[7206e5a]305                        # And holders of the eid as well
306                        self.auth.set_attribute(eid, eid)
307                        # allow overrides to control experiments as well
308                        for o in self.overrides:
309                            self.auth.set_attribute(o, eid)
310                        # Set permissions to allow reading of the software
311                        # repo, if any, as well.
312                        for a in self.get_alloc_ids(s):
313                            self.auth.set_attribute(a, 'repo/%s' % eid)
[cc8d8e9]314                else:
315                    raise KeyError("No experiment id")
[866c983]316            except KeyError, e:
317                self.log.warning("[read_state]: State ownership or identity " +\
318                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]319
[34bc05c]320    def read_mapdb(self, file):
[866c983]321        """
322        Read a simple colon separated list of mappings for the
323        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
[a11eda5]324        also adds testbeds to active if they include , active after
325        their name.
[866c983]326        """
327
328        self.tbmap = { }
329        lineno =0
330        try:
331            f = open(file, "r")
332            for line in f:
333                lineno += 1
334                line = line.strip()
335                if line.startswith('#') or len(line) == 0:
336                    continue
337                try:
338                    label, url = line.split(':', 1)
339                    self.tbmap[label] = url
340                except ValueError, e:
341                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
342                            "map db: %s %s" % (lineno, line, e))
[d3c8759]343        except EnvironmentError, e:
[866c983]344            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
345                    "open %s: %s" % (file, e))
[1ec5d4a]346        else:
347            f.close()
[2761484]348
349    def read_store(self):
350        try:
351            self.synch_store = synch_store()
352            self.synch_store.load(self.store_filename)
353            self.log.debug("[read_store]: Read store from %s" % \
354                    self.store_filename)
[d3c8759]355        except EnvironmentError, e:
[2761484]356            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
357                    % (self.state_filename, e))
358            self.synch_store = synch_store()
359
360        # Set the initial permissions on data in the store.  XXX: This ad hoc
361        # authorization attribute initialization is getting out of hand.
[7206e5a]362        # XXX: legacy
363        if self.auth_type == 'legacy':
364            for k in self.synch_store.all_keys():
365                try:
366                    if k.startswith('fedid:'):
367                        fid = fedid(hexstr=k[6:46])
368                        if self.state.has_key(fid):
369                            for a in self.get_alloc_ids(self.state[fid]):
370                                self.auth.set_attribute(a, k)
371                except ValueError, e:
372                    self.log.warn("Cannot deduce permissions for %s" % k)
[2761484]373
374
375    def write_store(self):
376        """
377        Write a new copy of synch_store after writing current state
378        to a backup.  We use the internal synch_store pickle method to avoid
379        incinsistent data.
380
381        State format is a simple pickling of the store.
382        """
383        if os.access(self.store_filename, os.W_OK):
384            copy_file(self.store_filename, \
385                    "%s.bak" % self.store_filename)
386        try:
387            self.synch_store.save(self.store_filename)
[d3c8759]388        except EnvironmentError, e:
[2761484]389            self.log.error("Can't write file %s: %s" % \
390                    (self.store_filename, e))
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
[25bf6cc]394    # XXX this may belong somewhere else
395
396    def get_grouper_updates(self, fid):
397        if self.grouper_url is None: return
398        d = tempfile.mkdtemp()
399        try:
400            fstr = "%s" % fid
401            # XXX locking
402            zipname = os.path.join(d, 'grouper.zip')
403            dest = os.path.join(self.auth_dir, 'update')
404            resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr))
405            f = open(zipname, 'w')
406            f.write(resp.read())
407            f.close()
408            zf = zipfile.ZipFile(zipname, 'r')
409            zf.extractall(dest)
410            zf.close()
[4708875]411        except URLError, e:
412            self.log.error("Cannot connect to grouper: %s" % e)
413            pass
[25bf6cc]414        finally:
415            shutil.rmtree(d)
416
[cf0ff4f]417
418    def remove_dirs(self, dir):
419        """
420        Remove the directory tree and all files rooted at dir.  Log any errors,
421        but continue.
422        """
423        self.log.debug("[removedirs]: removing %s" % dir)
424        try:
425            for path, dirs, files in os.walk(dir, topdown=False):
426                for f in files:
427                    os.remove(os.path.join(path, f))
428                for d in dirs:
429                    os.rmdir(os.path.join(path, d))
430            os.rmdir(dir)
431        except EnvironmentError, e:
432            self.log.error("Error deleting directory tree in %s" % e);
433
434    @staticmethod
435    def make_temp_certfile(expcert, tmpdir):
436        """
437        make a protected copy of the access certificate so the experiment
438        controller can act as the experiment principal.  mkstemp is the most
439        secure way to do that. The directory should be created by
440        mkdtemp.  Return the filename.
441        """
442        if expcert and tmpdir:
443            try:
444                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
445                f = os.fdopen(certf, 'w')
446                print >> f, expcert
447                f.close()
448            except EnvironmentError, e:
449                raise service_error(service_error.internal, 
450                        "Cannot create temp cert file?")
451            return certfn
452        else:
453            return None
454
[866c983]455       
[6679c122]456    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]457        """
458        Generate a set of keys for the gateways to use to talk.
459
460        Keys are of type type and are stored in the required dest file.
461        """
462        valid_types = ("rsa", "dsa")
463        t = type.lower();
464        if t not in valid_types: raise ValueError
465        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
466
467        try:
468            trace = open("/dev/null", "w")
[d3c8759]469        except EnvironmentError:
[866c983]470            raise service_error(service_error.internal,
471                    "Cannot open /dev/null??");
472
473        # May raise CalledProcessError
474        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]475        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]476        if rv != 0:
477            raise service_error(service_error.internal, 
478                    "Cannot generate nonce ssh keys.  %s return code %d" \
479                            % (self.ssh_keygen, rv))
[6679c122]480
[3df9b33]481    def generate_seer_certs(self, destdir):
482        '''
483        Create a SEER ca cert and a node cert in destdir/ca.pem and
484        destdir/node.pem respectively.  These will be distributed throughout
485        the federated experiment.  This routine reports errors via
486        service_errors.
487        '''
488        openssl = '/usr/bin/openssl'
489        # All the filenames and parameters we need for openssl calls below
490        ca_key =os.path.join(destdir, 'ca.key') 
491        ca_pem = os.path.join(destdir, 'ca.pem')
492        node_key =os.path.join(destdir, 'node.key') 
493        node_pem = os.path.join(destdir, 'node.pem')
494        node_req = os.path.join(destdir, 'node.req')
495        node_signed = os.path.join(destdir, 'node.signed')
[9bde415]496        days = '%s' % (365 * 10)
[95be336]497        serial = '%s' % random.randint(0, 1<<16)
[3df9b33]498
499        try:
500            # Sequence of calls to create a CA key, create a ca cert, create a
501            # node key, node signing request, and finally a signed node
502            # certificate.
503            sequence = (
504                    (openssl, 'genrsa', '-out', ca_key, '1024'),
505                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
506                        ca_pem, '-days', days, '-subj', 
507                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
508                    (openssl, 'genrsa', '-out', node_key, '1024'),
509                    (openssl, 'req', '-new', '-key', node_key, '-out', 
510                        node_req, '-days', days, '-subj', 
511                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
512                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
[95be336]513                        '-set_serial', serial, '-req', '-in', node_req, 
[3df9b33]514                        '-out', node_signed, '-days', days),
515                )
516            # Do all that stuff; bail if there's an error, and push all the
517            # output to dev/null.
518            for cmd in sequence:
519                trace = open("/dev/null", "w")
520                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
521                if rv != 0:
522                    raise service_error(service_error.internal, 
523                            "Cannot generate SEER certs.  %s return code %d" \
524                                    % (' '.join(cmd), rv))
525            # Concatinate the node key and signed certificate into node.pem
526            f = open(node_pem, 'w')
527            for comp in (node_signed, node_key):
528                g = open(comp, 'r')
[9bde415]529                f.write(g.read())
[3df9b33]530                g.close()
531            f.close()
532
533            # Throw out intermediaries.
[95be336]534            for fn in (ca_key, node_key, node_req, node_signed):
[3df9b33]535                os.unlink(fn)
536
537        except EnvironmentError, e:
538            # Any difficulties with the file system wind up here
539            raise service_error(service_error.internal,
540                    "File error on  %s while creating SEER certs: %s" % \
541                            (e.filename, e.strerror))
542
543
544
[0d830de]545    def gentopo(self, str):
[866c983]546        """
[1d73342]547        Generate the topology data structure from the splitter's XML
[866c983]548        representation of it.
549
550        The topology XML looks like:
551            <experiment>
552                <nodes>
553                    <node><vname></vname><ips>ip1:ip2</ips></node>
554                </nodes>
555                <lans>
556                    <lan>
557                        <vname></vname><vnode></vnode><ip></ip>
558                        <bandwidth></bandwidth><member>node:port</member>
559                    </lan>
560                </lans>
561        """
562        class topo_parse:
563            """
564            Parse the topology XML and create the dats structure.
565            """
566            def __init__(self):
567                # Typing of the subelements for data conversion
568                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
569                self.int_subelements = ( 'bandwidth',)
570                self.float_subelements = ( 'delay',)
571                # The final data structure
572                self.nodes = [ ]
573                self.lans =  [ ]
574                self.topo = { \
575                        'node': self.nodes,\
576                        'lan' : self.lans,\
577                    }
578                self.element = { }  # Current element being created
579                self.chars = ""     # Last text seen
580
581            def end_element(self, name):
582                # After each sub element the contents is added to the current
583                # element or to the appropriate list.
584                if name == 'node':
585                    self.nodes.append(self.element)
586                    self.element = { }
587                elif name == 'lan':
588                    self.lans.append(self.element)
589                    self.element = { }
590                elif name in self.str_subelements:
591                    self.element[name] = self.chars
592                    self.chars = ""
593                elif name in self.int_subelements:
594                    self.element[name] = int(self.chars)
595                    self.chars = ""
596                elif name in self.float_subelements:
597                    self.element[name] = float(self.chars)
598                    self.chars = ""
599
600            def found_chars(self, data):
601                self.chars += data.rstrip()
602
603
604        tp = topo_parse();
605        parser = xml.parsers.expat.ParserCreate()
606        parser.EndElementHandler = tp.end_element
607        parser.CharacterDataHandler = tp.found_chars
608
609        parser.Parse(str)
610
611        return tp.topo
612       
[0d830de]613
614    def genviz(self, topo):
[866c983]615        """
616        Generate the visualization the virtual topology
617        """
618
619        neato = "/usr/local/bin/neato"
620        # These are used to parse neato output and to create the visualization
621        # file.
[0ac1934]622        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]623        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
624                "%s</type></node>"
625
626        try:
627            # Node names
628            nodes = [ n['vname'] for n in topo['node'] ]
629            topo_lans = topo['lan']
[cc8d8e9]630        except KeyError, e:
631            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]632
633        lans = { }
634        links = { }
635
636        # Walk through the virtual topology, organizing the connections into
637        # 2-node connections (links) and more-than-2-node connections (lans).
638        # When a lan is created, it's added to the list of nodes (there's a
639        # node in the visualization for the lan).
640        for l in topo_lans:
641            if links.has_key(l['vname']):
642                if len(links[l['vname']]) < 2:
643                    links[l['vname']].append(l['vnode'])
644                else:
645                    nodes.append(l['vname'])
646                    lans[l['vname']] = links[l['vname']]
647                    del links[l['vname']]
648                    lans[l['vname']].append(l['vnode'])
649            elif lans.has_key(l['vname']):
650                lans[l['vname']].append(l['vnode'])
651            else:
652                links[l['vname']] = [ l['vnode'] ]
653
654
655        # Open up a temporary file for dot to turn into a visualization
656        try:
657            df, dotname = tempfile.mkstemp()
658            dotfile = os.fdopen(df, 'w')
[d3c8759]659        except EnvironmentError:
[866c983]660            raise service_error(service_error.internal,
661                    "Failed to open file in genviz")
662
[db6b092]663        try:
664            dnull = open('/dev/null', 'w')
[d3c8759]665        except EnvironmentError:
[db6b092]666            service_error(service_error.internal,
[886307f]667                    "Failed to open /dev/null in genviz")
668
[866c983]669        # Generate a dot/neato input file from the links, nodes and lans
670        try:
671            print >>dotfile, "graph G {"
672            for n in nodes:
673                print >>dotfile, '\t"%s"' % n
674            for l in links.keys():
675                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
676            for l in lans.keys():
677                for n in lans[l]:
678                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
679            print >>dotfile, "}"
680            dotfile.close()
681        except TypeError:
682            raise service_error(service_error.internal,
683                    "Single endpoint link in vtopo")
[d3c8759]684        except EnvironmentError:
[866c983]685            raise service_error(service_error.internal, "Cannot write dot file")
686
687        # Use dot to create a visualization
[5954004]688        try:
689            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
690                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
691                stderr=dnull, close_fds=True)
692        except EnvironmentError:
693            raise service_error(service_error.internal, 
694                    "Cannot generate visualization: is graphviz available?")
[db6b092]695        dnull.close()
[866c983]696
697        # Translate dot to vis format
698        vis_nodes = [ ]
699        vis = { 'node': vis_nodes }
700        for line in dot.stdout:
701            m = vis_re.match(line)
702            if m:
703                vn = m.group(1)
704                vis_node = {'name': vn, \
705                        'x': float(m.group(2)),\
706                        'y' : float(m.group(3)),\
707                    }
708                if vn in links.keys() or vn in lans.keys():
709                    vis_node['type'] = 'lan'
710                else:
711                    vis_node['type'] = 'node'
712                vis_nodes.append(vis_node)
713        rv = dot.wait()
714
715        os.remove(dotname)
[1fed67b]716        # XXX: graphviz seems to use low return codes for warnings, like
717        # "couldn't find font"
718        if rv < 2 : return vis
[866c983]719        else: return None
[d0ae12d]720
[db6b092]721
[725c55d]722    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
723            cert_pwd=None):
[e19b75c]724        """
725        Release access to testbed through fedd
726        """
[db6b092]727
[fd07c48]728        if not uri and tbmap:
729            uri = tbmap.get(tb, None)
[e19b75c]730        if not uri:
[69692a9]731            raise service_error(service_error.server_config, 
[e19b75c]732                    "Unknown testbed: %s" % tb)
[db6b092]733
[e19b75c]734        if self.local_access.has_key(uri):
735            resp = self.local_access[uri].ReleaseAccess(\
[29d5f7c]736                    { 'ReleaseAccessRequestBody' : 
737                        {'allocID': {'fedid': aid}},}, 
[725c55d]738                    fedid(file=cert_file))
[e19b75c]739            resp = { 'ReleaseAccessResponseBody': resp } 
740        else:
[29d5f7c]741            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
[725c55d]742                    cert_file, cert_pwd, self.trusted_certs)
[db6b092]743
[e19b75c]744        # better error coding
[db6b092]745
[5f6929a]746    def remote_ns2topdl(self, uri, desc):
[db6b092]747
[e19b75c]748        req = {
749                'description' : { 'ns2description': desc },
[db6b092]750            }
751
[5f6929a]752        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
[e19b75c]753                self.trusted_certs)
754
[5f6929a]755        if r.has_key('Ns2TopdlResponseBody'):
756            r = r['Ns2TopdlResponseBody']
[1dcaff4]757            ed = r.get('experimentdescription', None)
758            if ed.has_key('topdldescription'):
759                return topdl.Topology(**ed['topdldescription'])
[e19b75c]760            else:
761                raise service_error(service_error.protocol, 
762                        "Bad splitter response (no output)")
763        else:
764            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]765
[e19b75c]766    class start_segment:
[fd556d1]767        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]768                cert_pwd=None, trusted_certs=None, caller=None,
769                log_collector=None):
[cc8d8e9]770            self.log = log
771            self.debug = debug
772            self.cert_file = cert_file
773            self.cert_pwd = cert_pwd
774            self.trusted_certs = None
775            self.caller = caller
[fd556d1]776            self.testbed = testbed
[f07fa49]777            self.log_collector = log_collector
[69692a9]778            self.response = None
[b4b19c7]779            self.node = { }
[9e5e251]780            self.subs = { }
[934dd99]781            self.tb = { }
[e83f2f2]782            self.proof = None
[b4b19c7]783
784        def make_map(self, resp):
[29d5f7c]785            if 'segmentdescription' not in resp  or \
786                    'topdldescription' not in resp['segmentdescription']:
787                self.log.warn('No topology returned from startsegment')
788                return 
789
790            top = topdl.Topology(
791                    **resp['segmentdescription']['topdldescription'])
792
793            for e in top.elements:
794                if isinstance(e, topdl.Computer):
[1ae1aa2]795                    self.node[e.name] = e
[934dd99]796                elif isinstance(e, topdl.Testbed):
797                    self.tb[e.uri] = e
[9e5e251]798            for s in top.substrates:
799                self.subs[s.name] = s
[cc8d8e9]800
[43197eb]801        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
[cc8d8e9]802            req = {
803                    'allocID': { 'fedid' : aid }, 
804                    'segmentdescription': { 
805                        'topdldescription': topo.to_dict(),
806                    },
807                }
[e02cd14]808
809            if connInfo:
810                req['connection'] = connInfo
[43197eb]811
812            import_svcs = [ s for m in masters.values() \
813                    for s in m if self.testbed in s.importers]
814
815            if import_svcs or self.testbed in masters:
816                req['service'] = []
817
818            for s in import_svcs:
819                for r in s.reqs:
820                    sr = copy.deepcopy(r)
821                    sr['visibility'] = 'import';
822                    req['service'].append(sr)
823
824            for s in masters.get(self.testbed, []):
825                for r in s.reqs:
826                    sr = copy.deepcopy(r)
827                    sr['visibility'] = 'export';
828                    req['service'].append(sr)
829
[6c57fe9]830            if attrs:
831                req['fedAttr'] = attrs
[cc8d8e9]832
[fd556d1]833            try:
[13e3dd2]834                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]835                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
836                        self.trusted_certs)
[f07fa49]837                if r.has_key('StartSegmentResponseBody'):
838                    lval = r['StartSegmentResponseBody'].get('allocationLog',
839                            None)
840                    if lval and self.log_collector:
841                        for line in  lval.splitlines(True):
842                            self.log_collector.write(line)
[b4b19c7]843                    self.make_map(r['StartSegmentResponseBody'])
[e83f2f2]844                    if 'proof' in r: self.proof = r['proof']
[69692a9]845                    self.response = r
[f07fa49]846                else:
847                    raise service_error(service_error.internal, 
848                            "Bad response!?: %s" %r)
[fd556d1]849                return True
850            except service_error, e:
851                self.log.error("Start segment failed on %s: %s" % \
852                        (self.testbed, e))
853                return False
[cc8d8e9]854
855
[5ae3857]856
[e19b75c]857    class terminate_segment:
[fd556d1]858        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]859                cert_pwd=None, trusted_certs=None, caller=None):
860            self.log = log
861            self.debug = debug
862            self.cert_file = cert_file
863            self.cert_pwd = cert_pwd
864            self.trusted_certs = None
865            self.caller = caller
[fd556d1]866            self.testbed = testbed
[5ae3857]867
868        def __call__(self, uri, aid ):
869            req = {
[29d5f7c]870                    'allocID': {'fedid': aid }, 
[5ae3857]871                }
[a69de97]872            self.log.info("Calling terminate segment")
[fd556d1]873            try:
874                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
875                        self.trusted_certs)
[a69de97]876                self.log.info("Terminate segment succeeded")
[fd556d1]877                return True
878            except service_error, e:
879                self.log.error("Terminate segment failed on %s: %s" % \
880                        (self.testbed, e))
881                return False
[db6b092]882
[6e33086]883    class info_segment(start_segment):
884        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
885                cert_pwd=None, trusted_certs=None, caller=None,
886                log_collector=None):
887            experiment_control_local.start_segment.__init__(self, debug, 
888                    log, testbed, cert_file, cert_pwd, trusted_certs, 
889                    caller, log_collector)
890
891        def __call__(self, uri, aid):
892            req = { 'allocID': { 'fedid' : aid } }
893
894            try:
895                self.log.debug("Calling InfoSegment at %s " % uri)
896                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
897                        self.trusted_certs)
898                if r.has_key('InfoSegmentResponseBody'):
899                    self.make_map(r['InfoSegmentResponseBody'])
900                    if 'proof' in r: self.proof = r['proof']
901                    self.response = r
902                else:
903                    raise service_error(service_error.internal, 
904                            "Bad response!?: %s" %r)
905                return True
906            except service_error, e:
907                self.log.error("Info segment failed on %s: %s" % \
908                        (self.testbed, e))
909                return False
910
[22a1a77]911    class operation_segment:
912        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
913                cert_pwd=None, trusted_certs=None, caller=None,
914                log_collector=None):
915            self.log = log
916            self.debug = debug
917            self.cert_file = cert_file
918            self.cert_pwd = cert_pwd
919            self.trusted_certs = None
920            self.caller = caller
921            self.testbed = testbed
922            self.status = None
923
924        def __call__(self, uri, aid, op, targets, params):
925            req = { 
926                    'allocID': { 'fedid' : aid },
927                    'operation': op,
928                    'target': targets,
929                    }
930            if params: req['parameter'] = params
931
932
933            try:
934                self.log.debug("Calling OperationSegment at %s " % uri)
935                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
936                        self.trusted_certs)
937                if 'OperationSegmentResponseBody' in r:
938                    r = r['OperationSegmentResponseBody']
939                    if 'status' in r:
940                        self.status = r['status']
941                else:
942                    raise service_error(service_error.internal, 
943                            "Bad response!?: %s" %r)
944                return True
945            except service_error, e:
946                self.log.error("Operation segment failed on %s: %s" % \
947                        (self.testbed, e))
948                return False
949
[6e33086]950    def annotate_topology(self, top, data):
[f671ef7]951        # These routines do various parts of the annotation
952        def add_new_names(nl, l):
953            """ add any names in nl to the list in l """
954            for n in nl:
955                if n not in l: l.append(n)
956       
957        def merge_services(ne, e):
958            for ns in ne.service:
959                # NB: the else is on the for
960                for s in e.service:
961                    if ns.name == s.name:
962                        s.importer = ns.importer
963                        s.param = ns.param
964                        s.description = ns.description
965                        s.status = ns.status
966                        break
967                else:
968                    e.service.append(ns)
969       
970        def merge_oses(ne, e):
971            """
972            Merge the operating system entries of ne into e
973            """
974            for nos in ne.os:
975                # NB: the else is on the for
976                for os in e.os:
[7aaa8dc]977                    if nos.name == os.name:
978                        os.version = nos.version
979                        os.version = nos.distribution
980                        os.version = nos.distributionversion
[f671ef7]981                        for a in nos.attribute:
[db3da0b]982                            if os.get_attribute(a.attribute):
983                                os.remove_attribute(a.attribute)
[7aaa8dc]984                            os.set_attribute(a.attribute, a.value)
[f671ef7]985                        break
986                else:
[db3da0b]987                    # If both nodes have one OS, this is a replacement
988                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
989                    else: e.os.append(nos)
[6e33086]990
991        # Annotate the topology with embedding info
992        for e in top.elements:
993            if isinstance(e, topdl.Computer):
994                for s in data:
[f671ef7]995                    ne = s.node.get(e.name, None)
996                    if ne is not None:
997                        add_new_names(ne.localname, e.localname)
998                        e.status = ne.status
999                        merge_services(ne, e)
1000                        add_new_names(ne.operation, e.operation)
1001                        if ne.os: merge_oses(ne, e)
[6e33086]1002                        break
[934dd99]1003            elif isinstance(e,topdl.Testbed):
1004                for s in data:
1005                    ne = s.tb.get(e.uri, None)
1006                    if ne is not None:
1007                        add_new_names(ne.localname, e.localname)
1008                        add_new_names(ne.operation, e.operation)
1009                        merge_services(ne, e)
1010                        for a in ne.attribute:
1011                            e.set_attribute(a.attribute, a.value)
[9e5e251]1012        # Annotate substrates
1013        for s in top.substrates:
1014            for d in data:
1015                ss = d.subs.get(s.name, None)
1016                if ss is not None:
1017                    if ss.capacity is not None:
1018                        s.capacity = ss.capacity
1019                    if s.latency is not None:
1020                        s.latency = ss.latency
[6e33086]1021
1022
1023
[43197eb]1024    def allocate_resources(self, allocated, masters, eid, expid, 
[b4b19c7]1025            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
[c573278]1026            attrs=None, connInfo={}, tbmap=None, expcert=None):
[69692a9]1027
[cc8d8e9]1028        started = { }           # Testbeds where a sub-experiment started
1029                                # successfully
1030
1031        # XXX
1032        fail_soft = False
1033
[fd07c48]1034        if tbmap is None: tbmap = { }
1035
[cc8d8e9]1036        log = alloc_log or self.log
1037
[faea607]1038        tp = thread_pool(self.nthreads)
[cc8d8e9]1039        threads = [ ]
[b4b19c7]1040        starters = [ ]
[cc8d8e9]1041
[c573278]1042        if expcert:
1043            cert = expcert
1044            pw = None
1045        else:
1046            cert = self.cert_file
[822d31b]1047            pw = self.cert_pwd
[c573278]1048
[109a32a]1049        for tb in allocated.keys():
1050            # Create and start a thread to start the segment, and save it
1051            # to get the return value later
[ab847bc]1052            tb_attrs = copy.copy(attrs)
[faea607]1053            tp.wait_for_slot()
[9294673]1054            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
[ab847bc]1055            base, suffix = split_testbed(tb)
1056            if suffix:
1057                tb_attrs.append({'attribute': 'experiment_name', 
[175b444]1058                    'value': "%s-%s" % (eid, suffix)})
[ab847bc]1059            else:
1060                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
[109a32a]1061            if not uri:
1062                raise service_error(service_error.internal, 
1063                        "Unknown testbed %s !?" % tb)
1064
[9294673]1065            aid = tbparams[tb].allocID
1066            if not aid:
[cc8d8e9]1067                raise service_error(service_error.internal, 
1068                        "No alloc id for testbed %s !?" % tb)
1069
[b4b19c7]1070            s = self.start_segment(log=log, debug=self.debug,
[c573278]1071                    testbed=tb, cert_file=cert,
1072                    cert_pwd=pw, trusted_certs=self.trusted_certs,
[b4b19c7]1073                    caller=self.call_StartSegment,
1074                    log_collector=log_collector)
1075            starters.append(s)
[faea607]1076            t  = pooled_thread(\
[b4b19c7]1077                    target=s, name=tb,
[ab847bc]1078                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
[faea607]1079                    pdata=tp, trace_file=self.trace_file)
[69692a9]1080            threads.append(t)
1081            t.start()
[cc8d8e9]1082
[109a32a]1083        # Wait until all finish (keep pinging the log, though)
1084        mins = 0
[dadc4da]1085        revoked = False
[faea607]1086        while not tp.wait_for_all_done(60.0):
[109a32a]1087            mins += 1
1088            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1089                    % mins)
[dadc4da]1090            if not revoked and \
[f52f5df]1091                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1092                # a testbed has failed.  Revoke this experiment's
1093                # synchronizarion values so that sub experiments will not
1094                # deadlock waiting for synchronization that will never happen
1095                self.log.info("A subexperiment has failed to swap in, " + \
1096                        "revoking synch keys")
1097                var_key = "fedid:%s" % expid
1098                for k in self.synch_store.all_keys():
1099                    if len(k) > 45 and k[0:46] == var_key:
1100                        self.synch_store.revoke_key(k)
1101                revoked = True
[69692a9]1102
[cc8d8e9]1103        failed = [ t.getName() for t in threads if not t.rv ]
1104        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1105
[cc8d8e9]1106        # If one failed clean up, unless fail_soft is set
[32e7d93]1107        if failed:
[cc8d8e9]1108            if not fail_soft:
[faea607]1109                tp.clear()
[cc8d8e9]1110                for tb in succeeded:
1111                    # Create and start a thread to stop the segment
[faea607]1112                    tp.wait_for_slot()
[9294673]1113                    uri = tbparams[tb].uri
[faea607]1114                    t  = pooled_thread(\
[32e7d93]1115                            target=self.terminate_segment(log=log,
[fd556d1]1116                                testbed=tb,
[696a689]1117                                cert_file=cert, 
1118                                cert_pwd=pw,
[32e7d93]1119                                trusted_certs=self.trusted_certs,
1120                                caller=self.call_TerminateSegment),
[9294673]1121                            args=(uri, tbparams[tb].allocID),
[32e7d93]1122                            name=tb,
[faea607]1123                            pdata=tp, trace_file=self.trace_file)
[cc8d8e9]1124                    t.start()
[f52f5df]1125                # Wait until all finish (if any are being stopped)
1126                if succeeded:
[faea607]1127                    tp.wait_for_all_done()
[cc8d8e9]1128
1129                # release the allocations
1130                for tb in tbparams.keys():
[725c55d]1131                    try:
[9294673]1132                        self.release_access(tb, tbparams[tb].allocID, 
1133                                tbmap=tbmap, uri=tbparams[tb].uri,
[696a689]1134                                cert_file=cert, cert_pwd=pw)
[725c55d]1135                    except service_error, e:
1136                        self.log.warn("Error releasing access: %s" % e.desc)
[cc8d8e9]1137                # Remove the placeholder
1138                self.state_lock.acquire()
[29d5f7c]1139                self.state[eid].status = 'failed'
[6e33086]1140                self.state[eid].updated()
[cc8d8e9]1141                if self.state_filename: self.write_state()
1142                self.state_lock.release()
[05e8da8]1143                # Remove the repo dir
1144                self.remove_dirs("%s/%s" %(self.repodir, expid))
1145                # Walk up tmpdir, deleting as we go
1146                if self.cleanup:
1147                    self.remove_dirs(tmpdir)
1148                else:
1149                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1150
[cc8d8e9]1151
1152                log.error("Swap in failed on %s" % ",".join(failed))
1153                return
1154        else:
[29d5f7c]1155            # Walk through the successes and gather the proofs
[e83f2f2]1156            proofs = { }
[b4b19c7]1157            for s in starters:
[e83f2f2]1158                if s.proof:
1159                    proofs[s.testbed] = s.proof
[6e33086]1160            self.annotate_topology(top, starters)
[cc8d8e9]1161            log.info("[start_segment]: Experiment %s active" % eid)
1162
1163
1164        # Walk up tmpdir, deleting as we go
[69692a9]1165        if self.cleanup:
[05e8da8]1166            self.remove_dirs(tmpdir)
[69692a9]1167        else:
1168            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1169
[b4b19c7]1170        # Insert the experiment into our state and update the disk copy.
[cc8d8e9]1171        self.state_lock.acquire()
[29d5f7c]1172        self.state[expid].status = 'active'
[cc8d8e9]1173        self.state[eid] = self.state[expid]
[29d5f7c]1174        self.state[eid].top = top
[6e33086]1175        self.state[eid].updated()
[e83f2f2]1176        # Append startup proofs
[29d5f7c]1177        for f in self.state[eid].get_all_allocations():
1178            if f.tb in proofs:
1179                f.proof.append(proofs[f.tb])
[e83f2f2]1180
[cc8d8e9]1181        if self.state_filename: self.write_state()
1182        self.state_lock.release()
1183        return
1184
1185
[895a133]1186    def add_kit(self, e, kit):
1187        """
1188        Add a Software object created from the list of (install, location)
1189        tuples passed as kit  to the software attribute of an object e.  We
1190        do this enough to break out the code, but it's kind of a hack to
1191        avoid changing the old tuple rep.
1192        """
1193
1194        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1195
1196        if isinstance(e.software, list): e.software.extend(s)
1197        else: e.software = s
1198
[913dc7a]1199    def append_experiment_authorization(self, expid, attrs, 
1200            need_state_lock=True):
1201        """
1202        Append the authorization information to system state
1203        """
1204
1205        for p, a in attrs:
1206            self.auth.set_attribute(p, a)
1207        self.auth.save()
1208
1209        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1210        # XXX: really a no op?
1211        #self.state[expid]['auth'].update(attrs)
[913dc7a]1212        if self.state_filename: self.write_state()
1213        if need_state_lock: self.state_lock.release()
1214
[a96d946]1215    def clear_experiment_authorization(self, expid, need_state_lock=True):
[913dc7a]1216        """
1217        Attrs is a set of attribute principal pairs that need to be removed
1218        from the authenticator.  Remove them and save the authenticator.
1219        """
1220
1221        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1222        # XXX: should be a no-op
1223        #if expid in self.state and 'auth' in self.state[expid]:
1224            #for p, a in self.state[expid]['auth']:
1225                #self.auth.unset_attribute(p, a)
1226            #self.state[expid]['auth'] = set()
[913dc7a]1227        if self.state_filename: self.write_state()
1228        if need_state_lock: self.state_lock.release()
[b67fd22]1229        self.auth.save()
[913dc7a]1230
[895a133]1231
[b4b19c7]1232    def create_experiment_state(self, fid, req, expid, expcert,
[a3ad8bd]1233            state='starting'):
[895a133]1234        """
1235        Create the initial entry in the experiment's state.  The expid and
1236        expcert are the experiment's fedid and certifacte that represents that
1237        ID, which are installed in the experiment state.  If the request
1238        includes a suggested local name that is used if possible.  If the local
1239        name is already taken by an experiment owned by this user that has
[a3ad8bd]1240        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1241        valid localname is found.  The generated local name is returned.
1242        """
1243
1244        if req.has_key('experimentID') and \
1245                req['experimentID'].has_key('localname'):
1246            overwrite = False
1247            eid = req['experimentID']['localname']
1248            # If there's an old failed experiment here with the same local name
1249            # and accessible by this user, we'll overwrite it, otherwise we'll
1250            # fall through and do the collision avoidance.
1251            old_expid = self.get_experiment_fedid(eid)
[74572ba]1252            if old_expid:
1253                users_experiment = True
1254                try:
1255                    self.check_experiment_access(fid, old_expid)
1256                except service_error, e:
1257                    if e.code == service_error.access: users_experiment = False
1258                    else: raise e
1259                if users_experiment:
1260                    self.state_lock.acquire()
[29d5f7c]1261                    status = self.state[eid].status
[74572ba]1262                    if status and status == 'failed':
1263                        # remove the old access attributes
1264                        self.clear_experiment_authorization(eid,
1265                                need_state_lock=False)
1266                        overwrite = True
1267                        del self.state[eid]
1268                        del self.state[old_expid]
1269                    self.state_lock.release()
[6031c9d]1270                else:
1271                    self.log.info('Experiment %s exists, ' % eid + \
1272                            'but this user cannot access it')
[895a133]1273            self.state_lock.acquire()
1274            while (self.state.has_key(eid) and not overwrite):
1275                eid += random.choice(string.ascii_letters)
1276            # Initial state
[29d5f7c]1277            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1278                    identity=expcert)
[895a133]1279            self.state[expid] = self.state[eid]
[913dc7a]1280            if self.state_filename: self.write_state()
1281            self.state_lock.release()
[895a133]1282        else:
1283            eid = self.exp_stem
1284            for i in range(0,5):
1285                eid += random.choice(string.ascii_letters)
1286            self.state_lock.acquire()
1287            while (self.state.has_key(eid)):
1288                eid = self.exp_stem
1289                for i in range(0,5):
1290                    eid += random.choice(string.ascii_letters)
1291            # Initial state
[29d5f7c]1292            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1293                    identity=expcert)
[895a133]1294            self.state[expid] = self.state[eid]
[913dc7a]1295            if self.state_filename: self.write_state()
1296            self.state_lock.release()
1297
1298        # Let users touch the state.  Authorize this fid and the expid itself
1299        # to touch the experiment, as well as allowing th eoverrides.
1300        self.append_experiment_authorization(eid, 
1301                set([(fid, expid), (expid,expid)] + \
1302                        [ (o, expid) for o in self.overrides]))
[895a133]1303
1304        return eid
1305
1306
1307    def allocate_ips_to_topo(self, top):
1308        """
[69692a9]1309        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1310        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1311        created and returned as a list of hostfiles entries.  We also return
1312        the allocator, because we may need to allocate IPs to portals
1313        (specifically DRAGON portals).
[895a133]1314        """
1315        subs = sorted(top.substrates, 
1316                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1317                reverse=True)
1318        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1319        ifs = { }
1320        hosts = [ ]
1321
1322        for idx, s in enumerate(subs):
[289ff7e]1323            net_size = len(s.interfaces)+2
1324
1325            a = ips.allocate(net_size)
[895a133]1326            if a :
1327                base, num = a
[289ff7e]1328                if num < net_size: 
[895a133]1329                    raise service_error(service_error.internal,
1330                            "Allocator returned wrong number of IPs??")
1331            else:
1332                raise service_error(service_error.req, 
1333                        "Cannot allocate IP addresses")
[062b991]1334            mask = ips.min_alloc
1335            while mask < net_size:
1336                mask *= 2
[289ff7e]1337
[062b991]1338            netmask = ((2**32-1) ^ (mask-1))
[895a133]1339
1340            base += 1
1341            for i in s.interfaces:
1342                i.attribute.append(
1343                        topdl.Attribute('ip4_address', 
1344                            "%s" % ip_addr(base)))
[289ff7e]1345                i.attribute.append(
1346                        topdl.Attribute('ip4_netmask', 
1347                            "%s" % ip_addr(int(netmask))))
1348
[1e7f268]1349                hname = i.element.name
[895a133]1350                if ifs.has_key(hname):
1351                    hosts.append("%s\t%s-%s %s-%d" % \
1352                            (ip_addr(base), hname, s.name, hname,
1353                                ifs[hname]))
1354                else:
1355                    ifs[hname] = 0
1356                    hosts.append("%s\t%s-%s %s-%d %s" % \
1357                            (ip_addr(base), hname, s.name, hname,
1358                                ifs[hname], hname))
1359
1360                ifs[hname] += 1
1361                base += 1
[69692a9]1362        return hosts, ips
[895a133]1363
[1d73342]1364    def get_access_to_testbeds(self, testbeds, fid, allocated, 
[725c55d]1365            tbparam, masters, tbmap, expid=None, expcert=None):
[6e63513]1366        for tb in testbeds:
[1d73342]1367            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
[c573278]1368                    expcert)
[6e63513]1369            allocated[tb] = 1
1370
[1d73342]1371    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1372            expcert=None):
[6e63513]1373        """
1374        Get access to testbed through fedd and set the parameters for that tb
1375        """
1376        def get_export_project(svcs):
1377            """
1378            Look through for the list of federated_service for this testbed
1379            objects for a project_export service, and extract the project
1380            parameter.
1381            """
1382
1383            pe = [s for s in svcs if s.name=='project_export']
1384            if len(pe) == 1:
1385                return pe[0].params.get('project', None)
1386            elif len(pe) == 0:
1387                return None
1388            else:
1389                raise service_error(service_error.req,
1390                        "More than one project export is not supported")
1391
[d0912be]1392        def add_services(svcs, type, slist, keys):
[63c6664]1393            """
[d0912be]1394            Add the given services to slist.  type is import or export.  Also
1395            add a mapping entry from the assigned id to the original service
1396            record.
[63c6664]1397            """
1398            for i, s in enumerate(svcs):
1399                idx = '%s%d' % (type, i)
[d0912be]1400                keys[idx] = s
[63c6664]1401                sr = {'id': idx, 'name': s.name, 'visibility': type }
1402                if s.params:
1403                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1404                            for k, v in s.params.items()]
1405                slist.append(sr)
1406
[6e63513]1407        uri = tbmap.get(testbed_base(tb), None)
1408        if not uri:
1409            raise service_error(service_error.server_config, 
1410                    "Unknown testbed: %s" % tb)
1411
1412        export_svcs = masters.get(tb,[])
1413        import_svcs = [ s for m in masters.values() \
1414                for s in m \
1415                    if tb in s.importers ]
1416
1417        export_project = get_export_project(export_svcs)
1418        # Compose the credential list so that IDs come before attributes
1419        creds = set()
1420        keys = set()
[c573278]1421        certs = self.auth.get_creds_for_principal(fid)
[a0119a1]1422        # Append credenials about this experiment controller - e.g. that it is
1423        # trusted.
1424        certs.update(self.auth.get_creds_for_principal(
1425            fedid(file=self.cert_file)))
[c573278]1426        if expid:
1427            certs.update(self.auth.get_creds_for_principal(expid))
1428        for c in certs:
[6e63513]1429            keys.add(c.issuer_cert())
1430            creds.add(c.attribute_cert())
1431        creds = list(keys) + list(creds)
1432
[c573278]1433        if expcert: cert, pw = expcert, None
1434        else: cert, pw = self.cert_file, self.cert_pw
1435
[6e63513]1436        # Request credentials
1437        req = {
1438                'abac_credential': creds,
1439            }
1440        # Make the service request from the services we're importing and
1441        # exporting.  Keep track of the export request ids so we can
1442        # collect the resulting info from the access response.
1443        e_keys = { }
1444        if import_svcs or export_svcs:
[63c6664]1445            slist = []
[d0912be]1446            add_services(import_svcs, 'import', slist, e_keys)
1447            add_services(export_svcs, 'export', slist, e_keys)
[63c6664]1448            req['service'] = slist
[6e63513]1449
1450        if self.local_access.has_key(uri):
1451            # Local access call
1452            req = { 'RequestAccessRequestBody' : req }
1453            r = self.local_access[uri].RequestAccess(req, 
1454                    fedid(file=self.cert_file))
1455            r = { 'RequestAccessResponseBody' : r }
1456        else:
[c573278]1457            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
[6e63513]1458
[725c55d]1459        if r.has_key('RequestAccessResponseBody'):
1460            # Through to here we have a valid response, not a fault.
1461            # Access denied is a fault, so something better or worse than
1462            # access denied has happened.
1463            r = r['RequestAccessResponseBody']
1464            self.log.debug("[get_access] Access granted")
1465        else:
1466            raise service_error(service_error.protocol,
1467                        "Bad proxy response")
[e83f2f2]1468        if 'proof' not in r:
1469            raise service_error(service_error.protocol,
1470                        "Bad access response (no access proof)")
[ab3d6c5]1471
[9294673]1472        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
[ab3d6c5]1473                tb=tb, uri=uri, proof=[r['proof']], 
1474                services=masters.get(tb, None))
[6e63513]1475
1476        # Collect the responses corresponding to the services this testbed
1477        # exports.  These will be the service requests that we will include in
1478        # the start segment requests (with appropriate visibility values) to
1479        # import and export the segments.
1480        for s in r.get('service', []):
1481            id = s.get('id', None)
[ab3d6c5]1482            # Note that this attaches the response to the object in the masters
1483            # data structure.  (The e_keys index disappears when this fcn
1484            # returns)
[6e63513]1485            if id and id in e_keys:
1486                e_keys[id].reqs.append(s)
1487
1488        # Add attributes to parameter space.  We don't allow attributes to
1489        # overlay any parameters already installed.
1490        for a in r.get('fedAttr', []):
1491            try:
[9294673]1492                if a['attribute']:
1493                    tbparam[tb].set_attribute(a['attribute'], a['value'])
[6e63513]1494            except KeyError:
1495                self.log.error("Bad attribute in response: %s" % a)
1496
1497
[7fe81be]1498    def split_topology(self, top, topo, testbeds):
[895a133]1499        """
[e02cd14]1500        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1501        """
1502        for tb in testbeds:
1503            topo[tb] = top.clone()
[7fe81be]1504            # copy in for loop allows deletions from the original
1505            for e in [ e for e in topo[tb].elements]:
[895a133]1506                etb = e.get_attribute('testbed')
[7fe81be]1507                # NB: elements without a testbed attribute won't appear in any
1508                # sub topologies. 
1509                if not etb or etb != tb:
[895a133]1510                    for i in e.interface:
1511                        for s in i.subs:
1512                            try:
1513                                s.interfaces.remove(i)
1514                            except ValueError:
1515                                raise service_error(service_error.internal,
1516                                        "Can't remove interface??")
[7fe81be]1517                    topo[tb].elements.remove(e)
[895a133]1518            topo[tb].make_indices()
1519
[2627eb3]1520    def confirm_software(self, top):
1521        """
1522        Make sure that the software to be loaded in the topo is all available
1523        before we begin making access requests, etc.  This is a subset of
1524        wrangle_software.
1525        """
1526        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1527        pkgs.update([x.location for e in top.elements for x in e.software])
1528
1529        for pkg in pkgs:
1530            loc = pkg
1531
1532            scheme, host, path = urlparse(loc)[0:3]
1533            dest = os.path.basename(path)
1534            if not scheme:
1535                if not loc.startswith('/'):
1536                    loc = "/%s" % loc
1537                loc = "file://%s" %loc
1538            # NB: if scheme was found, loc == pkg
1539            try:
1540                u = urlopen(loc)
1541                u.close()
1542            except Exception, e:
1543                raise service_error(service_error.req, 
1544                        "Cannot open %s: %s" % (loc, e))
1545        return True
1546
[895a133]1547    def wrangle_software(self, expid, top, topo, tbparams):
1548        """
1549        Copy software out to the repository directory, allocate permissions and
1550        rewrite the segment topologies to look for the software in local
1551        places.
1552        """
1553
1554        # Copy the rpms and tarfiles to a distribution directory from
1555        # which the federants can retrieve them
1556        linkpath = "%s/software" %  expid
1557        softdir ="%s/%s" % ( self.repodir, linkpath)
1558        softmap = { }
[2627eb3]1559
1560        # self.fedkit and self.gateway kit are lists of tuples of
1561        # (install_location, download_location) this extracts the download
1562        # locations.
1563        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1564        pkgs.update([x.location for e in top.elements for x in e.software])
[895a133]1565        try:
1566            os.makedirs(softdir)
[d3c8759]1567        except EnvironmentError, e:
[895a133]1568            raise service_error(
1569                    "Cannot create software directory: %s" % e)
1570        # The actual copying.  Everything's converted into a url for copying.
[913dc7a]1571        auth_attrs = set()
[895a133]1572        for pkg in pkgs:
1573            loc = pkg
1574
1575            scheme, host, path = urlparse(loc)[0:3]
1576            dest = os.path.basename(path)
1577            if not scheme:
1578                if not loc.startswith('/'):
1579                    loc = "/%s" % loc
1580                loc = "file://%s" %loc
[2627eb3]1581            # NB: if scheme was found, loc == pkg
[895a133]1582            try:
1583                u = urlopen(loc)
1584            except Exception, e:
1585                raise service_error(service_error.req, 
1586                        "Cannot open %s: %s" % (loc, e))
1587            try:
1588                f = open("%s/%s" % (softdir, dest) , "w")
1589                self.log.debug("Writing %s/%s" % (softdir,dest) )
1590                data = u.read(4096)
1591                while data:
1592                    f.write(data)
1593                    data = u.read(4096)
1594                f.close()
1595                u.close()
1596            except Exception, e:
1597                raise service_error(service_error.internal,
1598                        "Could not copy %s: %s" % (loc, e))
1599            path = re.sub("/tmp", "", linkpath)
1600            # XXX
1601            softmap[pkg] = \
[7183b48]1602                    "%s/%s/%s" %\
1603                    ( self.repo_url, path, dest)
[895a133]1604
[913dc7a]1605            # Allow the individual segments to access the software by assigning
1606            # an attribute to each testbed allocation that encodes the data to
1607            # be released.  This expression collects the data for each run of
1608            # the loop.
1609            auth_attrs.update([
[9294673]1610                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
[913dc7a]1611                        for tb in tbparams.keys()])
1612
1613        self.append_experiment_authorization(expid, auth_attrs)
[895a133]1614
1615        # Convert the software locations in the segments into the local
1616        # copies on this host
1617        for soft in [ s for tb in topo.values() \
1618                for e in tb.elements \
1619                    if getattr(e, 'software', False) \
1620                        for s in e.software ]:
1621            if softmap.has_key(soft.location):
1622                soft.location = softmap[soft.location]
1623
1624
[a3ad8bd]1625    def new_experiment(self, req, fid):
1626        """
1627        The external interface to empty initial experiment creation called from
1628        the dispatcher.
1629
1630        Creates a working directory, splits the incoming description using the
1631        splitter script and parses out the avrious subsections using the
1632        lcasses above.  Once each sub-experiment is created, use pooled threads
1633        to instantiate them and start it all up.
1634        """
[2bb8b35]1635        self.log.info("New experiment call started for %s" % fid)
[7206e5a]1636        req = req.get('NewRequestBody', None)
1637        if not req:
1638            raise service_error(service_error.req,
1639                    "Bad request format (no NewRequestBody)")
1640
[d58ee5e]1641        # import may partially succeed so always save credentials and warn
1642        if not self.auth.import_credentials(data_list=req.get('credential', [])):
1643            self.log.debug("Failed to import delegation credentials(!)")
[25bf6cc]1644        self.get_grouper_updates(fid)
1645        self.auth.update()
[d58ee5e]1646        self.auth.save()
[c573278]1647
[8cab4c2]1648        try:
1649            access_ok, proof = self.auth.check_attribute(fid, 'new', 
1650                    with_proof=True)
1651        except service_error, e:
1652            self.log.info("New experiment call for %s: access denied" % fid)
1653            raise e
1654
[e83f2f2]1655
1656        if not access_ok:
[2bb8b35]1657            self.log.info("New experiment call for %s: Access denied" % fid)
[e83f2f2]1658            raise service_error(service_error.access, "New access denied",
1659                    proof=[proof])
[a3ad8bd]1660
1661        try:
1662            tmpdir = tempfile.mkdtemp(prefix="split-")
[d3c8759]1663        except EnvironmentError:
[a3ad8bd]1664            raise service_error(service_error.internal, "Cannot create tmp dir")
1665
1666        # Generate an ID for the experiment (slice) and a certificate that the
1667        # allocator can use to prove they own it.  We'll ship it back through
[7206e5a]1668        # the encrypted connection.  If the requester supplied one, use it.
1669        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1670            expcert = req['experimentAccess']['X509']
[962ea25]1671            expid = fedid(certstr=expcert)
[7206e5a]1672            self.state_lock.acquire()
1673            if expid in self.state:
1674                self.state_lock.release()
1675                raise service_error(service_error.req, 
1676                        'fedid %s identifies an existing experiment' % expid)
1677            self.state_lock.release()
1678        else:
1679            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
[a3ad8bd]1680
1681        #now we're done with the tmpdir, and it should be empty
1682        if self.cleanup:
1683            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1684            os.rmdir(tmpdir)
1685        else:
1686            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1687
1688        eid = self.create_experiment_state(fid, req, expid, expcert, 
1689                state='empty')
1690
1691        rv = {
1692                'experimentID': [
1693                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1694                ],
1695                'experimentStatus': 'empty',
[e83f2f2]1696                'experimentAccess': { 'X509' : expcert },
1697                'proof': proof.to_dict(),
[a3ad8bd]1698            }
1699
[2bb8b35]1700        self.log.info("New experiment call succeeded for %s" % fid)
[a3ad8bd]1701        return rv
1702
[cf0ff4f]1703    # create_experiment sub-functions
1704
[5ecb9a3]1705    @staticmethod
[cf0ff4f]1706    def get_experiment_key(req, field='experimentID'):
[5ecb9a3]1707        """
1708        Parse the experiment identifiers out of the request (the request body
1709        tag has been removed).  Specifically this pulls either the fedid or the
1710        localname out of the experimentID field.  A fedid is preferred.  If
1711        neither is present or the request does not contain the fields,
1712        service_errors are raised.
1713        """
1714        # Get the experiment access
[cf0ff4f]1715        exp = req.get(field, None)
[5ecb9a3]1716        if exp:
1717            if exp.has_key('fedid'):
1718                key = exp['fedid']
1719            elif exp.has_key('localname'):
1720                key = exp['localname']
1721            else:
1722                raise service_error(service_error.req, "Unknown lookup type")
1723        else:
1724            raise service_error(service_error.req, "No request?")
1725
1726        return key
1727
1728    def get_experiment_ids_and_start(self, key, tmpdir):
1729        """
1730        Get the experiment name, id and access certificate from the state, and
1731        set the experiment state to 'starting'.  returns a triple (fedid,
1732        localname, access_cert_file). The access_cert_file is a copy of the
1733        contents of the access certificate, created in the tempdir with
1734        restricted permissions.  If things are confused, raise an exception.
1735        """
1736
1737        expid = eid = None
1738        self.state_lock.acquire()
[29d5f7c]1739        if key in self.state:
1740            exp = self.state[key]
1741            exp.status = "starting"
[6e33086]1742            exp.updated()
[29d5f7c]1743            expid = exp.fedid
1744            eid = exp.localname
1745            expcert = exp.identity
[5ecb9a3]1746        self.state_lock.release()
1747
1748        # make a protected copy of the access certificate so the experiment
1749        # controller can act as the experiment principal.
1750        if expcert:
1751            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1752            if not expcert_file:
1753                raise service_error(service_error.internal, 
1754                        "Cannot create temp cert file?")
1755        else:
1756            expcert_file = None
1757
1758        return (eid, expid, expcert_file)
1759
1760    def get_topology(self, req, tmpdir):
1761        """
1762        Get the ns2 content and put it into a file for parsing.  Call the local
1763        or remote parser and return the topdl.Topology.  Errors result in
1764        exceptions.  req is the request and tmpdir is a work directory.
1765        """
1766
1767        # The tcl parser needs to read a file so put the content into that file
1768        descr=req.get('experimentdescription', None)
1769        if descr:
[a7c0bcb]1770            if 'ns2description' in descr:
1771                file_content=descr['ns2description']
1772            elif 'topdldescription' in descr:
1773                return topdl.Topology(**descr['topdldescription'])
1774            else:
1775                raise service_error(service_error.req, 
1776                        'Unknown experiment description type')
[5ecb9a3]1777        else:
1778            raise service_error(service_error.req, "No experiment description")
1779
1780
1781        if self.splitter_url:
1782            self.log.debug("Calling remote topdl translator at %s" % \
1783                    self.splitter_url)
1784            top = self.remote_ns2topdl(self.splitter_url, file_content)
1785        else:
1786            tclfile = os.path.join(tmpdir, "experiment.tcl")
1787            if file_content:
1788                try:
1789                    f = open(tclfile, 'w')
1790                    f.write(file_content)
1791                    f.close()
1792                except EnvironmentError:
1793                    raise service_error(service_error.internal,
1794                            "Cannot write temp experiment description")
1795            else:
1796                raise service_error(service_error.req, 
1797                        "Only ns2descriptions supported")
1798            pid = "dummy"
1799            gid = "dummy"
1800            eid = "dummy"
1801
1802            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1803                str(self.muxmax), '-m', 'dummy']
1804
1805            tclcmd.extend([pid, gid, eid, tclfile])
1806
1807            self.log.debug("running local splitter %s", " ".join(tclcmd))
1808            # This is just fantastic.  As a side effect the parser copies
1809            # tb_compat.tcl into the current directory, so that directory
1810            # must be writable by the fedd user.  Doing this in the
1811            # temporary subdir ensures this is the case.
1812            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1813                    cwd=tmpdir)
1814            split_data = tclparser.stdout
1815
1816            top = topdl.topology_from_xml(file=split_data, top="experiment")
1817            os.remove(tclfile)
1818
1819        return top
1820
[1660f7c]1821    def get_testbed_services(self, req, testbeds):
[5ecb9a3]1822        """
[57facae]1823        Parse the services section of the request into two dicts mapping
1824        testbed to lists of federated_service objects.  The first dict maps all
1825        exporters of services to those service objects, the second maps
1826        testbeds to service objects only for services requiring portals.
[5ecb9a3]1827        """
[f96be61]1828
1829        # Sanity check the services.  Exports or imports from unknown testbeds
1830        # cause an exception.
1831        for s in req.get('service', []):
1832            for t in s.get('import', []):
1833                if t not in testbeds:
1834                    raise service_error(service_error.req, 
1835                            'Service import to unknown testbed: %s' %t)
1836            for t in s.get('export', []):
1837                if t not in testbeds:
1838                    raise service_error(service_error.req, 
1839                            'Service export from unknown testbed: %s' %t)
1840
1841
[57facae]1842        # We construct both dicts here because deriving the second is more
[f96be61]1843        # complex than it looks - both the keys and lists can differ, and it's
[57facae]1844        # much easier to generate both in one pass.
[5ecb9a3]1845        masters = { }
[57facae]1846        pmasters = { }
[5ecb9a3]1847        for s in req.get('service', []):
1848            # If this is a service request with the importall field
1849            # set, fill it out.
1850
1851            if s.get('importall', False):
1852                s['import'] = [ tb for tb in testbeds \
1853                        if tb not in s.get('export',[])]
1854                del s['importall']
1855
1856            # Add the service to masters
1857            for tb in s.get('export', []):
1858                if s.get('name', None):
1859
1860                    params = { }
1861                    for a in s.get('fedAttr', []):
1862                        params[a.get('attribute', '')] = a.get('value','')
1863
1864                    fser = federated_service(name=s['name'],
1865                            exporter=tb, importers=s.get('import',[]),
1866                            params=params)
1867                    if fser.name == 'hide_hosts' \
1868                            and 'hosts' not in fser.params:
1869                        fser.params['hosts'] = \
1870                                ",".join(tb_hosts.get(fser.exporter, []))
1871                    if tb in masters: masters[tb].append(fser)
1872                    else: masters[tb] = [fser]
[57facae]1873
1874                    if fser.portal:
1875                        if tb in pmasters: pmasters[tb].append(fser)
1876                        else: pmasters[tb] = [fser]
[5ecb9a3]1877                else:
1878                    self.log.error('Testbed service does not have name " + \
1879                            "and importers')
[57facae]1880        return masters, pmasters
[5ecb9a3]1881
[cf0ff4f]1882    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1883        """
[3df9b33]1884        Create the ssh keys necessary for interconnecting the portal nodes and
[cf0ff4f]1885        the global hosts file for letting each segment know about the IP
1886        addresses in play.  Save these into the repo.  Add attributes to the
1887        autorizer allowing access controllers to download them and return a set
[3df9b33]1888        of attributes that inform the segments where to find this stuff.  May
[cf0ff4f]1889        raise service_errors in if there are problems.
1890        """
1891        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1892        gw_secretkey_base = "fed.%s" % self.ssh_type
[3df9b33]1893        keydir = os.path.join(tmpdir, 'keys')
1894        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1895        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
[cf0ff4f]1896
1897        try:
1898            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1899        except ValueError:
1900            raise service_error(service_error.server_config, 
1901                    "Bad key type (%s)" % self.ssh_type)
1902
[3df9b33]1903        self.generate_seer_certs(keydir)
[cf0ff4f]1904
1905        # Copy configuration files into the remote file store
1906        # The config urlpath
1907        configpath = "/%s/config" % expid
1908        # The config file system location
1909        configdir ="%s%s" % ( self.repodir, configpath)
1910        try:
1911            os.makedirs(configdir)
1912        except EnvironmentError, e:
1913            raise service_error(service_error.internal,
1914                    "Cannot create config directory: %s" % e)
1915        try:
1916            f = open("%s/hosts" % configdir, "w")
1917            print >> f, string.join(hosts, '\n')
1918            f.close()
1919        except EnvironmentError, e:
1920            raise service_error(service_error.internal, 
1921                    "Cannot write hosts file: %s" % e)
1922        try:
[3df9b33]1923            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1924            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1925            copy_file(os.path.join(keydir, 'ca.pem'), 
1926                    os.path.join(configdir, 'ca.pem'))
1927            copy_file(os.path.join(keydir, 'node.pem'), 
1928                    os.path.join(configdir, 'node.pem'))
[cf0ff4f]1929        except EnvironmentError, e:
1930            raise service_error(service_error.internal, 
1931                    "Cannot copy keyfiles: %s" % e)
1932
[913dc7a]1933        # Allow the individual testbeds to access the configuration files,
1934        # again by setting an attribute for the relevant pathnames on each
1935        # allocation principal.  Yeah, that's a long list comprehension.
1936        self.append_experiment_authorization(expid, set([
[9294673]1937            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
[913dc7a]1938                    for tb in tbparams.keys() \
[3df9b33]1939                        for f in ("hosts", 'ca.pem', 'node.pem', 
1940                            gw_secretkey_base, gw_pubkey_base)]))
[cf0ff4f]1941
1942        attrs = [ 
1943                {
1944                    'attribute': 'ssh_pubkey', 
1945                    'value': '%s/%s/config/%s' % \
1946                            (self.repo_url, expid, gw_pubkey_base)
1947                },
1948                {
1949                    'attribute': 'ssh_secretkey', 
1950                    'value': '%s/%s/config/%s' % \
1951                            (self.repo_url, expid, gw_secretkey_base)
1952                },
1953                {
1954                    'attribute': 'hosts', 
1955                    'value': '%s/%s/config/hosts' % \
1956                            (self.repo_url, expid)
1957                },
[3df9b33]1958                {
1959                    'attribute': 'seer_ca_pem', 
1960                    'value': '%s/%s/config/%s' % \
1961                            (self.repo_url, expid, 'ca.pem')
1962                },
1963                {
1964                    'attribute': 'seer_node_pem', 
1965                    'value': '%s/%s/config/%s' % \
1966                            (self.repo_url, expid, 'node.pem')
1967                },
[cf0ff4f]1968            ]
1969        return attrs
1970
1971
1972    def get_vtopo(self, req, fid):
1973        """
1974        Return the stored virtual topology for this experiment
1975        """
1976        rv = None
1977        state = None
[2bb8b35]1978        self.log.info("vtopo call started for %s" %  fid)
[cf0ff4f]1979
1980        req = req.get('VtopoRequestBody', None)
1981        if not req:
1982            raise service_error(service_error.req,
1983                    "Bad request format (no VtopoRequestBody)")
1984        exp = req.get('experiment', None)
1985        if exp:
1986            if exp.has_key('fedid'):
1987                key = exp['fedid']
1988                keytype = "fedid"
1989            elif exp.has_key('localname'):
1990                key = exp['localname']
1991                keytype = "localname"
1992            else:
1993                raise service_error(service_error.req, "Unknown lookup type")
1994        else:
1995            raise service_error(service_error.req, "No request?")
1996
[8cab4c2]1997        try:
1998            proof = self.check_experiment_access(fid, key)
1999        except service_error, e:
2000            self.log.info("vtopo call failed for %s: access denied" %  fid)
2001            raise e
[cf0ff4f]2002
2003        self.state_lock.acquire()
[29d5f7c]2004        # XXX: this needs to be recalculated
[80b1e82]2005        if key in self.state:
2006            if self.state[key].top is not None:
2007                vtopo = topdl.topology_to_vtopo(self.state[key].top)
[e83f2f2]2008                rv = { 'experiment' : {keytype: key },
[80b1e82]2009                        'vtopo': vtopo,
[e83f2f2]2010                        'proof': proof.to_dict(), 
[cf0ff4f]2011                    }
2012            else:
[80b1e82]2013                state = self.state[key].status
[cf0ff4f]2014        self.state_lock.release()
2015
[2bb8b35]2016        if rv: 
2017            self.log.info("vtopo call completed for %s %s " % \
2018                (key, fid))
2019            return rv
[cf0ff4f]2020        else: 
2021            if state:
[2bb8b35]2022                self.log.info("vtopo call completed for %s %s (Not ready)" % \
2023                    (key, fid))
[cf0ff4f]2024                raise service_error(service_error.partial, 
2025                        "Not ready: %s" % state)
2026            else:
[2bb8b35]2027                self.log.info("vtopo call completed for %s %s (No experiment)"\
2028                        % (key, fid))
[cf0ff4f]2029                raise service_error(service_error.req, "No such experiment")
2030
2031    def get_vis(self, req, fid):
2032        """
2033        Return the stored visualization for this experiment
2034        """
2035        rv = None
2036        state = None
2037
[2bb8b35]2038        self.log.info("vis call started for %s" %  fid)
[cf0ff4f]2039        req = req.get('VisRequestBody', None)
2040        if not req:
2041            raise service_error(service_error.req,
2042                    "Bad request format (no VisRequestBody)")
2043        exp = req.get('experiment', None)
2044        if exp:
2045            if exp.has_key('fedid'):
2046                key = exp['fedid']
2047                keytype = "fedid"
2048            elif exp.has_key('localname'):
2049                key = exp['localname']
2050                keytype = "localname"
2051            else:
2052                raise service_error(service_error.req, "Unknown lookup type")
2053        else:
2054            raise service_error(service_error.req, "No request?")
2055
[8cab4c2]2056        try:
2057            proof = self.check_experiment_access(fid, key)
2058        except service_error, e:
2059            self.log.info("vis call failed for %s: access denied" %  fid)
2060            raise e
[cf0ff4f]2061
2062        self.state_lock.acquire()
[80b1e82]2063        # Generate the visualization
2064        if key in self.state:
2065            if self.state[key].top is not None:
2066                try:
2067                    vis = self.genviz(
[6a50b78]2068                            topdl.topology_to_vtopo(self.state[key].top))
[80b1e82]2069                except service_error, e:
2070                    self.state_lock.release()
2071                    raise e
2072                rv =  { 'experiment' : {keytype: key },
2073                        'vis': vis,
[e83f2f2]2074                        'proof': proof.to_dict(), 
[80b1e82]2075                        }
[cf0ff4f]2076            else:
[80b1e82]2077                state = self.state[key].status
[cf0ff4f]2078        self.state_lock.release()
2079
[2bb8b35]2080        if rv: 
2081            self.log.info("vis call completed for %s %s " % \
2082                (key, fid))
2083            return rv
[cf0ff4f]2084        else:
2085            if state:
[2bb8b35]2086                self.log.info("vis call completed for %s %s (not ready)" % \
2087                    (key, fid))
[cf0ff4f]2088                raise service_error(service_error.partial, 
2089                        "Not ready: %s" % state)
2090            else:
[2bb8b35]2091                self.log.info("vis call completed for %s %s (no experiment)" % \
2092                    (key, fid))
[cf0ff4f]2093                raise service_error(service_error.req, "No such experiment")
2094
2095   
[ec3aa4d]2096    def save_federant_information(self, allocated, tbparams, eid, top):
[cf0ff4f]2097        """
2098        Store the various data that have changed in the experiment state
2099        between when it was started and the beginning of resource allocation.
2100        This is basically the information about each local allocation.  This
[e83f2f2]2101        fills in the values of the placeholder allocation in the state.  It
2102        also collects the access proofs and returns them as dicts for a
2103        response message.
[cf0ff4f]2104        """
[29d5f7c]2105        self.state_lock.acquire()
2106        exp = self.state[eid]
[ec3aa4d]2107        exp.top = top.clone()
[cf0ff4f]2108        # save federant information
2109        for k in allocated.keys():
[9294673]2110            exp.add_allocation(tbparams[k])
[ec3aa4d]2111            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
[ab3d6c5]2112                type="testbed", localname=[k], 
2113                service=[ s.to_topdl() for s in tbparams[k].services]))
[cf0ff4f]2114
[e83f2f2]2115        # Access proofs for the response message
2116        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
[9294673]2117                    for p in tbparams[k].proof]
[6e33086]2118        exp.updated()
[cf0ff4f]2119        if self.state_filename: 
2120            self.write_state()
2121        self.state_lock.release()
[e83f2f2]2122        return proofs
[cf0ff4f]2123
2124    def clear_placeholder(self, eid, expid, tmpdir):
2125        """
2126        Clear the placeholder and remove any allocated temporary dir.
2127        """
2128
2129        self.state_lock.acquire()
2130        del self.state[eid]
2131        del self.state[expid]
2132        if self.state_filename: self.write_state()
2133        self.state_lock.release()
2134        if tmpdir and self.cleanup:
2135            self.remove_dirs(tmpdir)
2136
2137    # end of create_experiment sub-functions
[5ecb9a3]2138
[e19b75c]2139    def create_experiment(self, req, fid):
[db6b092]2140        """
2141        The external interface to experiment creation called from the
2142        dispatcher.
2143
2144        Creates a working directory, splits the incoming description using the
[43197eb]2145        splitter script and parses out the various subsections using the
[1a4ee0f]2146        classes above.  Once each sub-experiment is created, use pooled threads
2147        to instantiate them and start it all up.
[db6b092]2148        """
[7183b48]2149
[2bb8b35]2150        self.log.info("Create experiment call started for %s" % fid)
[7183b48]2151        req = req.get('CreateRequestBody', None)
[5ecb9a3]2152        if req:
[cf0ff4f]2153            key = self.get_experiment_key(req)
[5ecb9a3]2154        else:
[7183b48]2155            raise service_error(service_error.req,
2156                    "Bad request format (no CreateRequestBody)")
2157
[6e63513]2158        # Import information from the requester
[d58ee5e]2159        # import may partially succeed so always save credentials and warn
[25bf6cc]2160        if not self.auth.import_credentials(data_list=req.get('credential',[])):
[cde9b98]2161            self.log.debug("Failed to import delegation credentials(!)")
[25bf6cc]2162        self.get_grouper_updates(fid)
2163        self.auth.update()
[d58ee5e]2164        self.auth.save()
[6e63513]2165
[8cab4c2]2166        try:
2167            # Make sure that the caller can talk to us
2168            proof = self.check_experiment_access(fid, key)
2169        except service_error, e:
2170            self.log.info("Create experiment call failed for %s: access denied"\
2171                    % fid)
2172            raise e
2173
[db6b092]2174
[fd07c48]2175        # Install the testbed map entries supplied with the request into a copy
2176        # of the testbed map.
2177        tbmap = dict(self.tbmap)
2178        for m in req.get('testbedmap', []):
2179            if 'testbed' in m and 'uri' in m:
2180                tbmap[m['testbed']] = m['uri']
2181
[5ecb9a3]2182        # a place to work
[db6b092]2183        try:
2184            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2185            os.mkdir(tmpdir+"/keys")
[d3c8759]2186        except EnvironmentError:
[db6b092]2187            raise service_error(service_error.internal, "Cannot create tmp dir")
2188
2189        tbparams = { }
2190
[5ecb9a3]2191        eid, expid, expcert_file = \
2192                self.get_experiment_ids_and_start(key, tmpdir)
[c573278]2193
[5ecb9a3]2194        # This catches exceptions to clear the placeholder if necessary
[db6b092]2195        try: 
[5ecb9a3]2196            if not (eid and expid):
2197                raise service_error(service_error.internal, 
2198                        "Cannot find local experiment info!?")
[5f6929a]2199
[5ecb9a3]2200            top = self.get_topology(req, tmpdir)
[2627eb3]2201            self.confirm_software(top)
[5ecb9a3]2202            # Assign the IPs
[69692a9]2203            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[1a4ee0f]2204            # Find the testbeds to look up
[5334044]2205            tb_hosts = { }
[5ecb9a3]2206            testbeds = [ ]
2207            for e in top.elements:
2208                if isinstance(e, topdl.Computer):
2209                    tb = e.get_attribute('testbed') or 'default'
2210                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2211                    else: 
2212                        tb_hosts[tb] = [ e.name ]
2213                        testbeds.append(tb)
2214
[57facae]2215            masters, pmasters = self.get_testbed_services(req, testbeds)
[895a133]2216            allocated = { }         # Testbeds we can access
2217            topo ={ }               # Sub topologies
[e02cd14]2218            connInfo = { }          # Connection information
[5334044]2219
[5ecb9a3]2220            self.get_access_to_testbeds(testbeds, fid, allocated, 
2221                    tbparams, masters, tbmap, expid, expcert_file)
[5f96438]2222
[4ffa6f8]2223            # tbactive is the set of testbeds that have NATs in front of their
2224            # portals. They need to initiate connections.
2225            tbactive = set([k for k, v in tbparams.items() \
2226                    if v.get_attribute('nat_portals')])
2227
2228            self.split_topology(top, topo, testbeds)
2229
[cf0ff4f]2230            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
[cc8d8e9]2231
[fd07c48]2232            part = experiment_partition(self.auth, self.store_url, tbmap,
[a11eda5]2233                    self.muxmax, self.direct_transit, tbactive)
[5334044]2234            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
[2761484]2235                    connInfo, expid)
[913dc7a]2236
2237            auth_attrs = set()
[ab847bc]2238            # Now get access to the dynamic testbeds (those added above)
2239            for tb in [ t for t in topo if t not in allocated]:
[cf0ff4f]2240                self.get_access(tb, tbparams, fid, masters, tbmap, 
2241                        expid, expcert_file)
[ab847bc]2242                allocated[tb] = 1
2243                store_keys = topo[tb].get_attribute('store_keys')
2244                # Give the testbed access to keys it exports or imports
2245                if store_keys:
[b16cfc0]2246                    auth_attrs.update(set([
[9294673]2247                        (tbparams[tb].allocID, sk) \
[913dc7a]2248                                for sk in store_keys.split(" ")]))
2249
2250            if auth_attrs:
2251                self.append_experiment_authorization(expid, auth_attrs)
[69692a9]2252
[cf0ff4f]2253            # transit and disconnected testbeds may not have a connInfo entry.
2254            # Fill in the blanks.
2255            for t in allocated.keys():
2256                if not connInfo.has_key(t):
2257                    connInfo[t] = { }
2258
[895a133]2259            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2260
[e83f2f2]2261            proofs = self.save_federant_information(allocated, tbparams, 
[ec3aa4d]2262                    eid, top)
[866c983]2263        except service_error, e:
2264            # If something goes wrong in the parse (usually an access error)
2265            # clear the placeholder state.  From here on out the code delays
[db6b092]2266            # exceptions.  Failing at this point returns a fault to the remote
2267            # caller.
[2bb8b35]2268
2269            self.log.info("Create experiment call failed for %s %s: %s" % 
2270                    (eid, fid, e))
[cf0ff4f]2271            self.clear_placeholder(eid, expid, tmpdir)
[866c983]2272            raise e
2273
[db6b092]2274        # Start the background swapper and return the starting state.  From
2275        # here on out, the state will stick around a while.
[866c983]2276
[db6b092]2277        # Create a logger that logs to the experiment's state object as well as
2278        # to the main log file.
2279        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[29d5f7c]2280        alloc_collector = self.list_log(self.state[eid].log)
[f07fa49]2281        h = logging.StreamHandler(alloc_collector)
[db6b092]2282        # XXX: there should be a global one of these rather than repeating the
2283        # code.
2284        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2285                    '%d %b %y %H:%M:%S'))
2286        alloc_log.addHandler(h)
[617592b]2287
[db6b092]2288        # Start a thread to do the resource allocation
[e19b75c]2289        t  = Thread(target=self.allocate_resources,
[43197eb]2290                args=(allocated, masters, eid, expid, tbparams, 
[b4b19c7]2291                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
[725c55d]2292                    connInfo, tbmap, expcert_file),
[db6b092]2293                name=eid)
2294        t.start()
2295
2296        rv = {
2297                'experimentID': [
2298                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2299                ],
2300                'experimentStatus': 'starting',
[e83f2f2]2301                'proof': [ proof.to_dict() ] + proofs,
[db6b092]2302            }
[2bb8b35]2303        self.log.info("Create experiment call succeeded for %s %s" % \
2304                (eid, fid))
[db6b092]2305
2306        return rv
[9479343]2307   
2308    def get_experiment_fedid(self, key):
2309        """
[db6b092]2310        find the fedid associated with the localname key in the state database.
[9479343]2311        """
2312
[db6b092]2313        rv = None
2314        self.state_lock.acquire()
[29d5f7c]2315        if key in self.state:
2316            rv = self.state[key].fedid
[db6b092]2317        self.state_lock.release()
2318        return rv
[a97394b]2319
[4064742]2320    def check_experiment_access(self, fid, key):
[866c983]2321        """
2322        Confirm that the fid has access to the experiment.  Though a request
2323        may be made in terms of a local name, the access attribute is always
2324        the experiment's fedid.
2325        """
2326        if not isinstance(key, fedid):
[db6b092]2327            key = self.get_experiment_fedid(key)
[866c983]2328
[e83f2f2]2329        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2330
2331        if access_ok:
2332            return proof
[866c983]2333        else:
[e83f2f2]2334            raise service_error(service_error.access, "Access Denied",
2335                proof)
[4064742]2336
2337
[db6b092]2338    def get_handler(self, path, fid):
[cf0ff4f]2339        """
2340        Perhaps surprisingly named, this function handles HTTP GET requests to
2341        this server (SOAP requests are POSTs).
2342        """
[7183b48]2343        self.log.info("Get handler %s %s" % (path, fid))
[bcc6fd6]2344        if len("%s" % fid) == 0:
2345            return (None, None)
[e83f2f2]2346        # XXX: log proofs?
[6c57fe9]2347        if self.auth.check_attribute(fid, path):
2348            return ("%s/%s" % (self.repodir, path), "application/binary")
2349        else:
2350            return (None, None)
[987aaa1]2351
[6e33086]2352    def update_info(self, key, force=False):
2353        top = None
2354        self.state_lock.acquire()
2355        if key in self.state:
2356            if force or self.state[key].older_than(self.info_cache_limit):
[6a50b78]2357                top = self.state[key].top
[6e33086]2358                if top is not None: top = top.clone()
2359                d1, info_params, cert, d2 = \
2360                        self.get_segment_info(self.state[key], need_lock=False)
2361        self.state_lock.release()
2362
2363        if top is None: return
2364
2365        try:
2366            tmpdir = tempfile.mkdtemp(prefix="info-")
2367        except EnvironmentError:
2368            raise service_error(service_error.internal, 
2369                    "Cannot create tmp dir")
2370        cert_file = self.make_temp_certfile(cert, tmpdir)
2371
2372        data = []
2373        try:
2374            for k, (uri, aid) in info_params.items():
2375                info=self.info_segment(log=self.log, testbed=uri,
2376                            cert_file=cert_file, cert_pwd=None,
2377                            trusted_certs=self.trusted_certs,
2378                            caller=self.call_InfoSegment)
2379                info(uri, aid)
2380                data.append(info)
2381        # Clean up the tmpdir no matter what
2382        finally:
2383            if tmpdir: self.remove_dirs(tmpdir)
2384
2385        self.annotate_topology(top, data)
2386        self.state_lock.acquire()
2387        if key in self.state:
2388            self.state[key].top = top
2389            self.state[key].updated()
2390            if self.state_filename: self.write_state()
2391        self.state_lock.release()
2392
[29d5f7c]2393   
[c52c48d]2394    def get_info(self, req, fid):
[866c983]2395        """
2396        Return all the stored info about this experiment
2397        """
2398        rv = None
2399
[2bb8b35]2400        self.log.info("Info call started for %s" %  fid)
[866c983]2401        req = req.get('InfoRequestBody', None)
2402        if not req:
2403            raise service_error(service_error.req,
[65f3f29]2404                    "Bad request format (no InfoRequestBody)")
[866c983]2405        exp = req.get('experiment', None)
[80b1e82]2406        legacy = req.get('legacy', False)
[6e33086]2407        fresh = req.get('fresh', False)
[866c983]2408        if exp:
2409            if exp.has_key('fedid'):
2410                key = exp['fedid']
2411                keytype = "fedid"
2412            elif exp.has_key('localname'):
2413                key = exp['localname']
2414                keytype = "localname"
2415            else:
2416                raise service_error(service_error.req, "Unknown lookup type")
2417        else:
2418            raise service_error(service_error.req, "No request?")
2419
[8cab4c2]2420        try:
2421            proof = self.check_experiment_access(fid, key)
2422        except service_error, e:
2423            self.log.info("Info call failed for %s: access denied" %  fid)
2424
[866c983]2425
[6e33086]2426        self.update_info(key, fresh)
2427
[866c983]2428        self.state_lock.acquire()
2429        if self.state.has_key(key):
[29d5f7c]2430            rv = self.state[key].get_info()
[6e33086]2431            # Copy the topo if we need legacy annotations
[80b1e82]2432            if legacy:
[6a50b78]2433                top = self.state[key].top
[80b1e82]2434                if top is not None: top = top.clone()
[866c983]2435        self.state_lock.release()
[2bb8b35]2436        self.log.info("Gathered Info for %s %s" % (key, fid))
[866c983]2437
[80b1e82]2438        # If the legacy visualization and topology representations are
[6e33086]2439        # requested, calculate them and add them to the return.
[80b1e82]2440        if legacy and rv is not None:
[2bb8b35]2441            self.log.info("Generating legacy Info for %s %s" % (key, fid))
[80b1e82]2442            if top is not None:
2443                vtopo = topdl.topology_to_vtopo(top)
2444                if vtopo is not None:
2445                    rv['vtopo'] = vtopo
2446                    try:
2447                        vis = self.genviz(vtopo)
2448                    except service_error, e:
2449                        self.log.debug('Problem generating visualization: %s' \
2450                                % e)
2451                        vis = None
2452                    if vis is not None:
2453                        rv['vis'] = vis
[db6b092]2454        if rv:
[2bb8b35]2455            self.log.info("Info succeded for %s %s" % (key, fid))
[29d5f7c]2456            rv['proof'] = proof.to_dict()
2457            return rv
[2bb8b35]2458        else: 
2459            self.log.info("Info failed for %s %s: no experiment" % (key, fid))
[db6b092]2460            raise service_error(service_error.req, "No such experiment")
[7a8d667]2461
[22a1a77]2462    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2463            results):
2464        """
2465        Call OperateSegment on multiple testbeds and gather the results.
2466        op_params contains the parameters needed to contact that testbed, cert
2467        is a certificate containing the fedid to use, op is the operation,
2468        testbeds is a dict mapping testbed name to targets in that testbed,
2469        params are the parameters to include a,d results is a growing list of
2470        the results of the calls.
2471        """
2472        try:
2473            tmpdir = tempfile.mkdtemp(prefix="info-")
2474        except EnvironmentError:
2475            raise service_error(service_error.internal, 
2476                    "Cannot create tmp dir")
2477        cert_file = self.make_temp_certfile(cert, tmpdir)
2478
2479        try:
2480            for tb, targets in testbeds.items():
2481                if tb in op_params:
2482                    uri, aid = op_params[tb]
2483                    operate=self.operation_segment(log=self.log, testbed=uri,
2484                                cert_file=cert_file, cert_pwd=None,
2485                                trusted_certs=self.trusted_certs,
2486                                caller=self.call_OperationSegment)
2487                    if operate(uri, aid, op, targets, params):
2488                        if operate.status is not None:
2489                            results.extend(operate.status)
2490                            continue
2491                # Something went wrong in a weird way.  Add statuses
2492                # that reflect that to results
2493                for t in targets:
2494                    results.append(operation_status(t, 
2495                        operation_status.federant,
[b709861]2496                        'Unexpected error on %s' % tb))
[22a1a77]2497        # Clean up the tmpdir no matter what
2498        finally:
2499            if tmpdir: self.remove_dirs(tmpdir)
2500
2501    def do_operation(self, req, fid):
2502        """
2503        Find the testbeds holding each target and ask them to carry out the
2504        operation.  Return the statuses.
2505        """
2506        # Map an element to the testbed containing it
2507        def element_to_tb(e):
2508            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2509            elif isinstance(e, topdl.Testbed): return e.name
2510            else: return None
2511        # If d is an operation_status object, make it a dict
2512        def make_dict(d):
2513            if isinstance(d, dict): return d
2514            elif isinstance(d, operation_status): return d.to_dict()
2515            else: return { }
2516
[b709861]2517        def element_name(e):
2518            if isinstance(e, topdl.Computer): return e.name
2519            elif isinstance(e, topdl.Testbed): 
2520                if e.localname: return e.localname[0]
2521                else: return None
2522            else: return None
2523
[8cab4c2]2524        self.log.info("Operation call started for %s" %  fid)
[22a1a77]2525        req = req.get('OperationRequestBody', None)
2526        if not req:
2527            raise service_error(service_error.req,
2528                    "Bad request format (no OperationRequestBody)")
2529        exp = req.get('experiment', None)
2530        op = req.get('operation', None)
[b709861]2531        targets = set(req.get('target', []))
[22a1a77]2532        params = req.get('parameter', None)
2533
2534        if exp:
2535            if 'fedid' in exp:
2536                key = exp['fedid']
2537                keytype = "fedid"
2538            elif 'localname' in exp:
2539                key = exp['localname']
2540                keytype = "localname"
2541            else:
2542                raise service_error(service_error.req, "Unknown lookup type")
2543        else:
2544            raise service_error(service_error.req, "No request?")
2545
[b709861]2546        if op is None or not targets:
[22a1a77]2547            raise service_error(service_error.req, "No request?")
2548
[8cab4c2]2549        try:
2550            proof = self.check_experiment_access(fid, key)
2551        except service_error, e:
2552            self.log.info("Operation call failed for %s: access denied" %  fid)
2553            raise e
2554
[22a1a77]2555        self.state_lock.acquire()
2556        if key in self.state:
2557            d1, op_params, cert, d2 = \
[b709861]2558                    self.get_segment_info(self.state[key], need_lock=False,
2559                            key='tb')
[22a1a77]2560            top = self.state[key].top
2561            if top is not None:
2562                top = top.clone()
2563        self.state_lock.release()
2564
2565        if top is None:
[8cab4c2]2566            self.log.info("Operation call failed for %s: not active" %  fid)
[22a1a77]2567            raise service_error(service_error.partial, "No topology yet", 
2568                    proof=proof)
2569
2570        testbeds = { }
2571        results = []
2572        for e in top.elements:
[b709861]2573            ename = element_name(e)
2574            if ename in targets:
[22a1a77]2575                tb = element_to_tb(e)
[b709861]2576                targets.remove(ename)
[22a1a77]2577                if tb is not None:
[b709861]2578                    if tb in testbeds: testbeds[tb].append(ename)
2579                    else: testbeds[tb] = [ ename ]
[22a1a77]2580                else:
2581                    results.append(operation_status(e.name, 
2582                        code=operation_status.no_target, 
2583                        description='Cannot map target to testbed'))
2584
[b709861]2585        for t in targets:
2586            results.append(operation_status(t, operation_status.no_target))
2587
[22a1a77]2588        self.operate_on_segments(op_params, cert, op, testbeds, params,
2589                results)
2590
[8cab4c2]2591        self.log.info("Operation call succeeded for %s" %  fid)
[22a1a77]2592        return { 
2593                'experiment': exp, 
[b709861]2594                'status': [make_dict(r) for r in results],
[22a1a77]2595                'proof': proof.to_dict()
2596                }
2597
2598
[65f3f29]2599    def get_multi_info(self, req, fid):
2600        """
2601        Return all the stored info that this fedid can access
2602        """
[e83f2f2]2603        rv = { 'info': [ ], 'proof': [ ] }
[65f3f29]2604
[2bb8b35]2605        self.log.info("Multi Info call started for %s" %  fid)
[25bf6cc]2606        self.get_grouper_updates(fid)
2607        self.auth.update()
2608        self.auth.save()
[db6b092]2609        self.state_lock.acquire()
2610        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2611            try:
[e83f2f2]2612                proof = self.check_experiment_access(fid, key)
[829246e]2613            except service_error, e:
2614                if e.code == service_error.access:
2615                    continue
2616                else:
[2bb8b35]2617                    self.log.info("Multi Info call failed for %s: %s" %  \
2618                            (e,fid))
[829246e]2619                    self.state_lock.release()
2620                    raise e
[65f3f29]2621
[db6b092]2622            if self.state.has_key(key):
[29d5f7c]2623                e = self.state[key].get_info()
2624                e['proof'] = proof.to_dict()
[db6b092]2625                rv['info'].append(e)
[e83f2f2]2626                rv['proof'].append(proof.to_dict())
[65f3f29]2627        self.state_lock.release()
[2bb8b35]2628        self.log.info("Multi Info call succeeded for %s" %  fid)
[db6b092]2629        return rv
[65f3f29]2630
[cf0ff4f]2631    def check_termination_status(self, fed_exp, force):
[e07c8f3]2632        """
[cf0ff4f]2633        Confirm that the experiment is sin a valid state to stop (or force it)
2634        return the state - invalid states for deletion and force settings cause
2635        exceptions.
[e07c8f3]2636        """
[cf0ff4f]2637        self.state_lock.acquire()
[29d5f7c]2638        status = fed_exp.status
[e07c8f3]2639
[cf0ff4f]2640        if status:
2641            if status in ('starting', 'terminating'):
2642                if not force:
2643                    self.state_lock.release()
2644                    raise service_error(service_error.partial, 
2645                            'Experiment still being created or destroyed')
2646                else:
2647                    self.log.warning('Experiment in %s state ' % status + \
2648                            'being terminated by force.')
2649            self.state_lock.release()
2650            return status
[725c55d]2651        else:
[cf0ff4f]2652            # No status??? trouble
2653            self.state_lock.release()
2654            raise service_error(service_error.internal,
2655                    "Experiment has no status!?")
2656
[b709861]2657    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
[cf0ff4f]2658        ids = []
2659        term_params = { }
[6e33086]2660        if need_lock: self.state_lock.acquire()
[29d5f7c]2661        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2662        expcert = fed_exp.identity
2663        repo = "%s" % fed_exp.fedid
[cf0ff4f]2664
2665        # Collect the allocation/segment ids into a dict keyed by the fedid
[29d5f7c]2666        # of the allocation that contains a tuple of uri, aid
2667        for i, fed in enumerate(fed_exp.get_all_allocations()):
2668            uri = fed.uri
2669            aid = fed.allocID
[b709861]2670            if key == 'aid': term_params[aid] = (uri, aid)
2671            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2672
[6e33086]2673        if need_lock: self.state_lock.release()
2674        return ids, term_params, expcert, repo
2675
2676
2677    def get_termination_info(self, fed_exp):
2678        self.state_lock.acquire()
2679        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
[cf0ff4f]2680        # Change the experiment state
[29d5f7c]2681        fed_exp.status = 'terminating'
[6e33086]2682        fed_exp.updated()
[cf0ff4f]2683        if self.state_filename: self.write_state()
2684        self.state_lock.release()
2685
2686        return ids, term_params, expcert, repo
2687
2688
2689    def deallocate_resources(self, term_params, expcert, status, force, 
2690            dealloc_log):
2691        tmpdir = None
2692        # This try block makes sure the tempdir is cleared
2693        try:
2694            # If no expcert, try the deallocation as the experiment
2695            # controller instance.
2696            if expcert and self.auth_type != 'legacy': 
2697                try:
2698                    tmpdir = tempfile.mkdtemp(prefix="term-")
2699                except EnvironmentError:
2700                    raise service_error(service_error.internal, 
2701                            "Cannot create tmp dir")
2702                cert_file = self.make_temp_certfile(expcert, tmpdir)
2703                pw = None
2704            else: 
2705                cert_file = self.cert_file
2706                pw = self.cert_pwd
2707
2708            # Stop everyone.  NB, wait_for_all waits until a thread starts
2709            # and then completes, so we can't wait if nothing starts.  So,
2710            # no tbparams, no start.
2711            if len(term_params) > 0:
2712                tp = thread_pool(self.nthreads)
2713                for k, (uri, aid) in term_params.items():
2714                    # Create and start a thread to stop the segment
2715                    tp.wait_for_slot()
2716                    t  = pooled_thread(\
2717                            target=self.terminate_segment(log=dealloc_log,
2718                                testbed=uri,
2719                                cert_file=cert_file, 
2720                                cert_pwd=pw,
2721                                trusted_certs=self.trusted_certs,
2722                                caller=self.call_TerminateSegment),
2723                            args=(uri, aid), name=k,
2724                            pdata=tp, trace_file=self.trace_file)
2725                    t.start()
2726                # Wait for completions
2727                tp.wait_for_all_done()
2728
2729            # release the allocations (failed experiments have done this
2730            # already, and starting experiments may be in odd states, so we
2731            # ignore errors releasing those allocations
2732            try: 
2733                for k, (uri, aid)  in term_params.items():
2734                    self.release_access(None, aid, uri=uri,
2735                            cert_file=cert_file, cert_pwd=pw)
2736            except service_error, e:
2737                if status != 'failed' and not force:
2738                    raise e
2739
2740        # Clean up the tmpdir no matter what
2741        finally:
2742            if tmpdir: self.remove_dirs(tmpdir)
2743
[7a8d667]2744    def terminate_experiment(self, req, fid):
[866c983]2745        """
2746        Swap this experiment out on the federants and delete the shared
2747        information
2748        """
[2bb8b35]2749        self.log.info("Terminate experiment call started for %s" % fid)
[866c983]2750        tbparams = { }
2751        req = req.get('TerminateRequestBody', None)
2752        if not req:
2753            raise service_error(service_error.req,
2754                    "Bad request format (no TerminateRequestBody)")
2755
[cf0ff4f]2756        key = self.get_experiment_key(req, 'experiment')
[8cab4c2]2757        try:
2758            proof = self.check_experiment_access(fid, key)
2759        except service_error, e:
2760            self.log.info(
2761                    "Terminate experiment call failed for %s: access denied" \
2762                            % fid)
2763            raise e
[cf0ff4f]2764        exp = req.get('experiment', False)
2765        force = req.get('force', False)
[866c983]2766
[db6b092]2767        dealloc_list = [ ]
[46e4682]2768
2769
[5ae3857]2770        # Create a logger that logs to the dealloc_list as well as to the main
2771        # log file.
2772        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
[a69de97]2773        dealloc_log.info("Terminating %s " %key)
[5ae3857]2774        h = logging.StreamHandler(self.list_log(dealloc_list))
2775        # XXX: there should be a global one of these rather than repeating the
2776        # code.
2777        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2778                    '%d %b %y %H:%M:%S'))
2779        dealloc_log.addHandler(h)
2780
2781        self.state_lock.acquire()
2782        fed_exp = self.state.get(key, None)
[cf0ff4f]2783        self.state_lock.release()
[e07c8f3]2784        repo = None
[5ae3857]2785
2786        if fed_exp:
[cf0ff4f]2787            status = self.check_termination_status(fed_exp, force)
[6e33086]2788            # get_termination_info updates the experiment state
[cf0ff4f]2789            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2790            self.deallocate_resources(term_params, expcert, status, force, 
2791                    dealloc_log)
[5ae3857]2792
2793            # Remove the terminated experiment
2794            self.state_lock.acquire()
2795            for id in ids:
[a96d946]2796                self.clear_experiment_authorization(id, need_state_lock=False)
[cf0ff4f]2797                if id in self.state: del self.state[id]
[5ae3857]2798
2799            if self.state_filename: self.write_state()
2800            self.state_lock.release()
2801
[2761484]2802            # Delete any synch points associated with this experiment.  All
2803            # synch points begin with the fedid of the experiment.
2804            fedid_keys = set(["fedid:%s" % f for f in ids \
2805                    if isinstance(f, fedid)])
2806            for k in self.synch_store.all_keys():
2807                try:
2808                    if len(k) > 45 and k[0:46] in fedid_keys:
2809                        self.synch_store.del_value(k)
[dadc4da]2810                except synch_store.BadDeletionError:
[2761484]2811                    pass
2812            self.write_store()
[e07c8f3]2813
2814            # Remove software and other cached stuff from the filesystem.
2815            if repo:
2816                self.remove_dirs("%s/%s" % (self.repodir, repo))
[e83f2f2]2817       
[2bb8b35]2818            self.log.info("Terminate experiment succeeded for %s %s" % \
2819                    (key, fid))
[5ae3857]2820            return { 
2821                    'experiment': exp , 
[cf0ff4f]2822                    'deallocationLog': string.join(dealloc_list, ''),
[e83f2f2]2823                    'proof': [proof.to_dict()],
[5ae3857]2824                    }
2825        else:
[2bb8b35]2826            self.log.info("Terminate experiment failed for %s %s: no state" % \
2827                    (key, fid))
[5ae3857]2828            raise service_error(service_error.req, "No saved state")
[2761484]2829
2830
2831    def GetValue(self, req, fid):
2832        """
2833        Get a value from the synchronized store
2834        """
2835        req = req.get('GetValueRequestBody', None)
2836        if not req:
2837            raise service_error(service_error.req,
2838                    "Bad request format (no GetValueRequestBody)")
2839       
[cf0ff4f]2840        name = req.get('name', None)
2841        wait = req.get('wait', False)
[2761484]2842        rv = { 'name': name }
2843
[e83f2f2]2844        if not name:
2845            raise service_error(service_error.req, "No name?")
2846
2847        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2848
2849        if access_ok:
[d8442da]2850            self.log.debug("[GetValue] asking for %s " % name)
[dadc4da]2851            try:
2852                v = self.synch_store.get_value(name, wait)
2853            except synch_store.RevokedKeyError:
2854                # No more synch on this key
2855                raise service_error(service_error.federant, 
2856                        "Synch key %s revoked" % name)
[2761484]2857            if v is not None:
2858                rv['value'] = v
[e83f2f2]2859            rv['proof'] = proof.to_dict()
[109a32a]2860            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2861            return rv
2862        else:
[e83f2f2]2863            raise service_error(service_error.access, "Access Denied",
2864                    proof=proof)
[2761484]2865       
2866
2867    def SetValue(self, req, fid):
2868        """
2869        Set a value in the synchronized store
2870        """
2871        req = req.get('SetValueRequestBody', None)
2872        if not req:
2873            raise service_error(service_error.req,
2874                    "Bad request format (no SetValueRequestBody)")
2875       
[cf0ff4f]2876        name = req.get('name', None)
2877        v = req.get('value', '')
[2761484]2878
[e83f2f2]2879        if not name:
2880            raise service_error(service_error.req, "No name?")
2881
2882        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2883
2884        if access_ok:
[2761484]2885            try:
2886                self.synch_store.set_value(name, v)
2887                self.write_store()
[109a32a]2888                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2889            except synch_store.CollisionError:
2890                # Translate into a service_error
2891                raise service_error(service_error.req,
2892                        "Value already set: %s" %name)
[dadc4da]2893            except synch_store.RevokedKeyError:
2894                # No more synch on this key
2895                raise service_error(service_error.federant, 
2896                        "Synch key %s revoked" % name)
[e83f2f2]2897                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
[2761484]2898        else:
[e83f2f2]2899            raise service_error(service_error.access, "Access Denied",
2900                    proof=proof)
Note: See TracBrowser for help on using the repository browser.