source: fedd/federation/experiment_control.py @ 40c599f

compt_changesinfo-ops
Last change on this file since 40c599f was db3da0b, checked in by Ted Faber <faber@…>, 13 years ago

More rational OS structure updates

  • Property mode set to 100644
File size: 92.2 KB
RevLine 
[6679c122]1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
[eee2b2e]11import pickle
[c971895]12import logging
[79b6596]13import signal
14import time
[6679c122]15
[3df9b33]16import os.path
17
[3441fe3]18import traceback
[c971895]19# For parsing visualization output and splitter output
20import xml.parsers.expat
[3441fe3]21
[6c57fe9]22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
[c573278]24from string import join
[6679c122]25
[db6b092]26from urlparse import urlparse
27from urllib2 import urlopen
28
[ec4fb42]29from util import *
[51cc9df]30from fedid import fedid, generate_fedid
[9460b1e]31from remote_service import xmlrpc_handler, soap_handler, service_caller
[c971895]32from service_error import service_error
[2761484]33from synch_store import synch_store
[73e7f5c]34from experiment_partition import experiment_partition
[1d73342]35from experiment_control_legacy import experiment_control_legacy
[7206e5a]36from authorizer import abac_authorizer
[faea607]37from thread_pool import thread_pool, pooled_thread
[ab3d6c5]38from experiment_info import experiment_info, allocation_info, federated_service
[22a1a77]39from operation_status import operation_status
[6679c122]40
[db6b092]41import topdl
[f07fa49]42import list_log
[db6b092]43from ip_allocator import ip_allocator
44from ip_addr import ip_addr
45
[11a08b0]46
47class nullHandler(logging.Handler):
48    def emit(self, record): pass
49
50fl = logging.getLogger("fedd.experiment_control")
51fl.addHandler(nullHandler())
52
[1d73342]53class experiment_control_local(experiment_control_legacy):
[0ea11af]54    """
55    Control of experiments that this system can directly access.
56
57    Includes experiment creation, termination and information dissemination.
58    Thred safe.
59    """
[79b6596]60
61    class ssh_cmd_timeout(RuntimeError): pass
[6679c122]62   
[f069052]63    call_RequestAccess = service_caller('RequestAccess')
64    call_ReleaseAccess = service_caller('ReleaseAccess')
[cc8d8e9]65    call_StartSegment = service_caller('StartSegment')
[db974ed]66    call_TerminateSegment = service_caller('TerminateSegment')
[6e33086]67    call_InfoSegment = service_caller('InfoSegment')
[22a1a77]68    call_OperationSegment = service_caller('OperationSegment')
[5f6929a]69    call_Ns2Topdl = service_caller('Ns2Topdl')
[058f58e]70
[3f6bc5f]71    def __init__(self, config=None, auth=None):
[866c983]72        """
73        Intialize the various attributes, most from the config object
74        """
75
76        def parse_tarfile_list(tf):
77            """
78            Parse a tarfile list from the configuration.  This is a set of
79            paths and tarfiles separated by spaces.
80            """
81            rv = [ ]
82            if tf is not None:
83                tl = tf.split()
84                while len(tl) > 1:
85                    p, t = tl[0:2]
86                    del tl[0:2]
87                    rv.append((p, t))
88            return rv
89
[f07fa49]90        self.list_log = list_log.list_log
[866c983]91
92        self.cert_file = config.get("experiment_control", "cert_file")
93        if self.cert_file:
94            self.cert_pwd = config.get("experiment_control", "cert_pwd")
95        else:
96            self.cert_file = config.get("globals", "cert_file")
97            self.cert_pwd = config.get("globals", "cert_pwd")
98
99        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
100                or config.get("globals", "trusted_certs")
101
[6c57fe9]102        self.repodir = config.get("experiment_control", "repodir")
[7183b48]103        self.repo_url = config.get("experiment_control", "repo_url", 
104                "https://users.isi.deterlab.net:23235");
[cc8d8e9]105
[866c983]106        self.exp_stem = "fed-stem"
107        self.log = logging.getLogger("fedd.experiment_control")
108        set_log_level(config, "experiment_control", self.log)
109        self.muxmax = 2
[35a4c01]110        self.nthreads = 10
[866c983]111        self.randomize_experiments = False
112
113        self.splitter = None
114        self.ssh_keygen = "/usr/bin/ssh-keygen"
115        self.ssh_identity_file = None
116
117
118        self.debug = config.getboolean("experiment_control", "create_debug")
[69692a9]119        self.cleanup = not config.getboolean("experiment_control", 
120                "leave_tmpfiles")
[866c983]121        self.state_filename = config.get("experiment_control", 
122                "experiment_state")
[2761484]123        self.store_filename = config.get("experiment_control", 
124                "synch_store")
125        self.store_url = config.get("experiment_control", "store_url")
[5f6929a]126        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
[866c983]127        self.fedkit = parse_tarfile_list(\
128                config.get("experiment_control", "fedkit"))
129        self.gatewaykit = parse_tarfile_list(\
130                config.get("experiment_control", "gatewaykit"))
131        accessdb_file = config.get("experiment_control", "accessdb")
132
[175b444]133        dt = config.get("experiment_control", "direct_transit")
[7206e5a]134        self.auth_type = config.get('experiment_control', 'auth_type') \
135                or 'legacy'
136        self.auth_dir = config.get('experiment_control', 'auth_dir')
[6e33086]137        # XXX: document this!
138        self.info_cache_limit = \
139                config.getint('experiment_control', 'info_cache', 600)
[139e2e2]140        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
141        else: self.direct_transit = [ ]
[866c983]142        # NB for internal master/slave ops, not experiment setup
143        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
[ca489e8]144
[db6b092]145        self.overrides = set([])
146        ovr = config.get('experiment_control', 'overrides')
147        if ovr:
148            for o in ovr.split(","):
149                o = o.strip()
150                if o.startswith('fedid:'): o = o[len('fedid:'):]
151                self.overrides.add(fedid(hexstr=o))
[ca489e8]152
[866c983]153        self.state = { }
154        self.state_lock = Lock()
155        self.tclsh = "/usr/local/bin/otclsh"
[5f6929a]156        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
[866c983]157                config.get("experiment_control", "tcl_splitter",
158                        "/usr/testbed/lib/ns2ir/parse.tcl")
159        mapdb_file = config.get("experiment_control", "mapdb")
160        self.trace_file = sys.stderr
161
162        self.def_expstart = \
163                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
164                "/tmp/federate";
165        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
166                "FEDDIR/hosts";
167        self.def_gwstart = \
168                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
169                "/tmp/bridge.log";
170        self.def_mgwstart = \
171                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
172                "/tmp/bridge.log";
173        self.def_gwimage = "FBSD61-TUNNEL2";
174        self.def_gwtype = "pc";
175        self.local_access = { }
176
[7206e5a]177        if self.auth_type == 'legacy':
178            if auth:
179                self.auth = auth
180            else:
181                self.log.error( "[access]: No authorizer initialized, " +\
182                        "creating local one.")
183                auth = authorizer()
[5ecb9a3]184            self.get_access = self.legacy_get_access
[7206e5a]185        elif self.auth_type == 'abac':
186            self.auth = abac_authorizer(load=self.auth_dir)
187        else:
188            raise service_error(service_error.internal, 
189                    "Unknown auth_type: %s" % self.auth_type)
[866c983]190
191        if mapdb_file:
192            self.read_mapdb(mapdb_file)
193        else:
194            self.log.warn("[experiment_control] No testbed map, using defaults")
195            self.tbmap = { 
196                    'deter':'https://users.isi.deterlab.net:23235',
197                    'emulab':'https://users.isi.deterlab.net:23236',
198                    'ucb':'https://users.isi.deterlab.net:23237',
199                    }
200
201        if accessdb_file:
202                self.read_accessdb(accessdb_file)
203        else:
204            raise service_error(service_error.internal,
205                    "No accessdb specified in config")
206
207        # Grab saved state.  OK to do this w/o locking because it's read only
208        # and only one thread should be in existence that can see self.state at
209        # this point.
210        if self.state_filename:
211            self.read_state()
212
[2761484]213        if self.store_filename:
214            self.read_store()
215        else:
216            self.log.warning("No saved synch store")
217            self.synch_store = synch_store
218
[866c983]219        # Dispatch tables
220        self.soap_services = {\
[a3ad8bd]221                'New': soap_handler('New', self.new_experiment),
[e19b75c]222                'Create': soap_handler('Create', self.create_experiment),
[866c983]223                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
224                'Vis': soap_handler('Vis', self.get_vis),
225                'Info': soap_handler('Info', self.get_info),
[65f3f29]226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
[22a1a77]227                'Operation': soap_handler('Operation', self.do_operation),
[866c983]228                'Terminate': soap_handler('Terminate', 
[e19b75c]229                    self.terminate_experiment),
[2761484]230                'GetValue': soap_handler('GetValue', self.GetValue),
231                'SetValue': soap_handler('SetValue', self.SetValue),
[866c983]232        }
233
234        self.xmlrpc_services = {\
[a3ad8bd]235                'New': xmlrpc_handler('New', self.new_experiment),
[e19b75c]236                'Create': xmlrpc_handler('Create', self.create_experiment),
[866c983]237                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
238                'Vis': xmlrpc_handler('Vis', self.get_vis),
239                'Info': xmlrpc_handler('Info', self.get_info),
[65f3f29]240                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
[866c983]241                'Terminate': xmlrpc_handler('Terminate',
[e19b75c]242                    self.terminate_experiment),
[22a1a77]243                'Operation': xmlrpc_handler('Operation', self.do_operation),
[2761484]244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
[866c983]246        }
[19cc408]247
[a97394b]248    # Call while holding self.state_lock
[eee2b2e]249    def write_state(self):
[866c983]250        """
251        Write a new copy of experiment state after copying the existing state
252        to a backup.
253
254        State format is a simple pickling of the state dictionary.
255        """
256        if os.access(self.state_filename, os.W_OK):
[40dd8c1]257            copy_file(self.state_filename, \
258                    "%s.bak" % self.state_filename)
[866c983]259        try:
260            f = open(self.state_filename, 'w')
261            pickle.dump(self.state, f)
[d3c8759]262        except EnvironmentError, e:
[866c983]263            self.log.error("Can't write file %s: %s" % \
264                    (self.state_filename, e))
265        except pickle.PicklingError, e:
266            self.log.error("Pickling problem: %s" % e)
267        except TypeError, e:
268            self.log.error("Pickling problem (TypeError): %s" % e)
[eee2b2e]269
[2761484]270    @staticmethod
[29d5f7c]271    def get_alloc_ids(exp):
[2761484]272        """
[29d5f7c]273        Used by read_store and read state.  This used to be worse.
[2761484]274        """
275
[29d5f7c]276        return [ a.allocID for a in exp.get_all_allocations() ]
277
[2761484]278
[a97394b]279    # Call while holding self.state_lock
[eee2b2e]280    def read_state(self):
[866c983]281        """
282        Read a new copy of experiment state.  Old state is overwritten.
283
284        State format is a simple pickling of the state dictionary.
285        """
[cc8d8e9]286       
[866c983]287        try:
288            f = open(self.state_filename, "r")
289            self.state = pickle.load(f)
290            self.log.debug("[read_state]: Read state from %s" % \
291                    self.state_filename)
[d3c8759]292        except EnvironmentError, e:
[866c983]293            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
294                    % (self.state_filename, e))
295        except pickle.UnpicklingError, e:
296            self.log.warning(("[read_state]: No saved state: " + \
297                    "Unpickling failed: %s") % e)
298       
[cc8d8e9]299        for s in self.state.values():
[866c983]300            try:
[cc8d8e9]301
[29d5f7c]302                eid = s.fedid
[cc8d8e9]303                if eid : 
[7206e5a]304                    if self.auth_type == 'legacy':
305                        # XXX: legacy
306                        # Give the owner rights to the experiment
[29d5f7c]307                        #self.auth.set_attribute(s['owner'], eid)
[7206e5a]308                        # And holders of the eid as well
309                        self.auth.set_attribute(eid, eid)
310                        # allow overrides to control experiments as well
311                        for o in self.overrides:
312                            self.auth.set_attribute(o, eid)
313                        # Set permissions to allow reading of the software
314                        # repo, if any, as well.
315                        for a in self.get_alloc_ids(s):
316                            self.auth.set_attribute(a, 'repo/%s' % eid)
[cc8d8e9]317                else:
318                    raise KeyError("No experiment id")
[866c983]319            except KeyError, e:
320                self.log.warning("[read_state]: State ownership or identity " +\
321                        "misformatted in %s: %s" % (self.state_filename, e))
[4064742]322
323
324    def read_accessdb(self, accessdb_file):
[866c983]325        """
326        Read the mapping from fedids that can create experiments to their name
327        in the 3-level access namespace.  All will be asserted from this
328        testbed and can include the local username and porject that will be
329        asserted on their behalf by this fedd.  Each fedid is also added to the
330        authorization system with the "create" attribute.
331        """
332        self.accessdb = {}
333        # These are the regexps for parsing the db
334        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
335        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
336                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
337        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
338                "\s*->\s*(" + name_expr + ")\s*$")
339        lineno = 0
340
341        # Parse the mappings and store in self.authdb, a dict of
342        # fedid -> (proj, user)
343        try:
344            f = open(accessdb_file, "r")
345            for line in f:
346                lineno += 1
347                line = line.strip()
348                if len(line) == 0 or line.startswith('#'):
349                    continue
350                m = project_line.match(line)
351                if m:
352                    fid = fedid(hexstr=m.group(1))
353                    project, user = m.group(2,3)
354                    if not self.accessdb.has_key(fid):
355                        self.accessdb[fid] = []
356                    self.accessdb[fid].append((project, user))
357                    continue
358
359                m = user_line.match(line)
360                if m:
361                    fid = fedid(hexstr=m.group(1))
362                    project = None
363                    user = m.group(2)
364                    if not self.accessdb.has_key(fid):
365                        self.accessdb[fid] = []
366                    self.accessdb[fid].append((project, user))
367                    continue
368                self.log.warn("[experiment_control] Error parsing access " +\
369                        "db %s at line %d" %  (accessdb_file, lineno))
[d3c8759]370        except EnvironmentError:
[866c983]371            raise service_error(service_error.internal,
[05fceef]372                    ("Error opening/reading %s as experiment " +\
373                            "control accessdb") %  accessdb_file)
[866c983]374        f.close()
375
376        # Initialize the authorization attributes
[7206e5a]377        # XXX: legacy
378        if self.auth_type == 'legacy':
379            for fid in self.accessdb.keys():
380                self.auth.set_attribute(fid, 'create')
381                self.auth.set_attribute(fid, 'new')
[34bc05c]382
383    def read_mapdb(self, file):
[866c983]384        """
385        Read a simple colon separated list of mappings for the
386        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
387        """
388
389        self.tbmap = { }
390        lineno =0
391        try:
392            f = open(file, "r")
393            for line in f:
394                lineno += 1
395                line = line.strip()
396                if line.startswith('#') or len(line) == 0:
397                    continue
398                try:
399                    label, url = line.split(':', 1)
400                    self.tbmap[label] = url
401                except ValueError, e:
402                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
403                            "map db: %s %s" % (lineno, line, e))
[d3c8759]404        except EnvironmentError, e:
[866c983]405            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
406                    "open %s: %s" % (file, e))
[1ec5d4a]407        else:
408            f.close()
[2761484]409
410    def read_store(self):
411        try:
412            self.synch_store = synch_store()
413            self.synch_store.load(self.store_filename)
414            self.log.debug("[read_store]: Read store from %s" % \
415                    self.store_filename)
[d3c8759]416        except EnvironmentError, e:
[2761484]417            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
418                    % (self.state_filename, e))
419            self.synch_store = synch_store()
420
421        # Set the initial permissions on data in the store.  XXX: This ad hoc
422        # authorization attribute initialization is getting out of hand.
[7206e5a]423        # XXX: legacy
424        if self.auth_type == 'legacy':
425            for k in self.synch_store.all_keys():
426                try:
427                    if k.startswith('fedid:'):
428                        fid = fedid(hexstr=k[6:46])
429                        if self.state.has_key(fid):
430                            for a in self.get_alloc_ids(self.state[fid]):
431                                self.auth.set_attribute(a, k)
432                except ValueError, e:
433                    self.log.warn("Cannot deduce permissions for %s" % k)
[2761484]434
435
436    def write_store(self):
437        """
438        Write a new copy of synch_store after writing current state
439        to a backup.  We use the internal synch_store pickle method to avoid
440        incinsistent data.
441
442        State format is a simple pickling of the store.
443        """
444        if os.access(self.store_filename, os.W_OK):
445            copy_file(self.store_filename, \
446                    "%s.bak" % self.store_filename)
447        try:
448            self.synch_store.save(self.store_filename)
[d3c8759]449        except EnvironmentError, e:
[2761484]450            self.log.error("Can't write file %s: %s" % \
451                    (self.store_filename, e))
452        except TypeError, e:
453            self.log.error("Pickling problem (TypeError): %s" % e)
454
[cf0ff4f]455
456    def remove_dirs(self, dir):
457        """
458        Remove the directory tree and all files rooted at dir.  Log any errors,
459        but continue.
460        """
461        self.log.debug("[removedirs]: removing %s" % dir)
462        try:
463            for path, dirs, files in os.walk(dir, topdown=False):
464                for f in files:
465                    os.remove(os.path.join(path, f))
466                for d in dirs:
467                    os.rmdir(os.path.join(path, d))
468            os.rmdir(dir)
469        except EnvironmentError, e:
470            self.log.error("Error deleting directory tree in %s" % e);
471
472    @staticmethod
473    def make_temp_certfile(expcert, tmpdir):
474        """
475        make a protected copy of the access certificate so the experiment
476        controller can act as the experiment principal.  mkstemp is the most
477        secure way to do that. The directory should be created by
478        mkdtemp.  Return the filename.
479        """
480        if expcert and tmpdir:
481            try:
482                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
483                f = os.fdopen(certf, 'w')
484                print >> f, expcert
485                f.close()
486            except EnvironmentError, e:
487                raise service_error(service_error.internal, 
488                        "Cannot create temp cert file?")
489            return certfn
490        else:
491            return None
492
[866c983]493       
[6679c122]494    def generate_ssh_keys(self, dest, type="rsa" ):
[866c983]495        """
496        Generate a set of keys for the gateways to use to talk.
497
498        Keys are of type type and are stored in the required dest file.
499        """
500        valid_types = ("rsa", "dsa")
501        t = type.lower();
502        if t not in valid_types: raise ValueError
503        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
504
505        try:
506            trace = open("/dev/null", "w")
[d3c8759]507        except EnvironmentError:
[866c983]508            raise service_error(service_error.internal,
509                    "Cannot open /dev/null??");
510
511        # May raise CalledProcessError
512        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
[4ea1e22]513        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
[866c983]514        if rv != 0:
515            raise service_error(service_error.internal, 
516                    "Cannot generate nonce ssh keys.  %s return code %d" \
517                            % (self.ssh_keygen, rv))
[6679c122]518
[3df9b33]519    def generate_seer_certs(self, destdir):
520        '''
521        Create a SEER ca cert and a node cert in destdir/ca.pem and
522        destdir/node.pem respectively.  These will be distributed throughout
523        the federated experiment.  This routine reports errors via
524        service_errors.
525        '''
526        openssl = '/usr/bin/openssl'
527        # All the filenames and parameters we need for openssl calls below
528        ca_key =os.path.join(destdir, 'ca.key') 
529        ca_pem = os.path.join(destdir, 'ca.pem')
530        node_key =os.path.join(destdir, 'node.key') 
531        node_pem = os.path.join(destdir, 'node.pem')
532        node_req = os.path.join(destdir, 'node.req')
533        node_signed = os.path.join(destdir, 'node.signed')
[9bde415]534        days = '%s' % (365 * 10)
[95be336]535        serial = '%s' % random.randint(0, 1<<16)
[3df9b33]536
537        try:
538            # Sequence of calls to create a CA key, create a ca cert, create a
539            # node key, node signing request, and finally a signed node
540            # certificate.
541            sequence = (
542                    (openssl, 'genrsa', '-out', ca_key, '1024'),
543                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
544                        ca_pem, '-days', days, '-subj', 
545                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
546                    (openssl, 'genrsa', '-out', node_key, '1024'),
547                    (openssl, 'req', '-new', '-key', node_key, '-out', 
548                        node_req, '-days', days, '-subj', 
549                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
550                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
[95be336]551                        '-set_serial', serial, '-req', '-in', node_req, 
[3df9b33]552                        '-out', node_signed, '-days', days),
553                )
554            # Do all that stuff; bail if there's an error, and push all the
555            # output to dev/null.
556            for cmd in sequence:
557                trace = open("/dev/null", "w")
558                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
559                if rv != 0:
560                    raise service_error(service_error.internal, 
561                            "Cannot generate SEER certs.  %s return code %d" \
562                                    % (' '.join(cmd), rv))
563            # Concatinate the node key and signed certificate into node.pem
564            f = open(node_pem, 'w')
565            for comp in (node_signed, node_key):
566                g = open(comp, 'r')
[9bde415]567                f.write(g.read())
[3df9b33]568                g.close()
569            f.close()
570
571            # Throw out intermediaries.
[95be336]572            for fn in (ca_key, node_key, node_req, node_signed):
[3df9b33]573                os.unlink(fn)
574
575        except EnvironmentError, e:
576            # Any difficulties with the file system wind up here
577            raise service_error(service_error.internal,
578                    "File error on  %s while creating SEER certs: %s" % \
579                            (e.filename, e.strerror))
580
581
582
[0d830de]583    def gentopo(self, str):
[866c983]584        """
[1d73342]585        Generate the topology data structure from the splitter's XML
[866c983]586        representation of it.
587
588        The topology XML looks like:
589            <experiment>
590                <nodes>
591                    <node><vname></vname><ips>ip1:ip2</ips></node>
592                </nodes>
593                <lans>
594                    <lan>
595                        <vname></vname><vnode></vnode><ip></ip>
596                        <bandwidth></bandwidth><member>node:port</member>
597                    </lan>
598                </lans>
599        """
600        class topo_parse:
601            """
602            Parse the topology XML and create the dats structure.
603            """
604            def __init__(self):
605                # Typing of the subelements for data conversion
606                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
607                self.int_subelements = ( 'bandwidth',)
608                self.float_subelements = ( 'delay',)
609                # The final data structure
610                self.nodes = [ ]
611                self.lans =  [ ]
612                self.topo = { \
613                        'node': self.nodes,\
614                        'lan' : self.lans,\
615                    }
616                self.element = { }  # Current element being created
617                self.chars = ""     # Last text seen
618
619            def end_element(self, name):
620                # After each sub element the contents is added to the current
621                # element or to the appropriate list.
622                if name == 'node':
623                    self.nodes.append(self.element)
624                    self.element = { }
625                elif name == 'lan':
626                    self.lans.append(self.element)
627                    self.element = { }
628                elif name in self.str_subelements:
629                    self.element[name] = self.chars
630                    self.chars = ""
631                elif name in self.int_subelements:
632                    self.element[name] = int(self.chars)
633                    self.chars = ""
634                elif name in self.float_subelements:
635                    self.element[name] = float(self.chars)
636                    self.chars = ""
637
638            def found_chars(self, data):
639                self.chars += data.rstrip()
640
641
642        tp = topo_parse();
643        parser = xml.parsers.expat.ParserCreate()
644        parser.EndElementHandler = tp.end_element
645        parser.CharacterDataHandler = tp.found_chars
646
647        parser.Parse(str)
648
649        return tp.topo
650       
[0d830de]651
652    def genviz(self, topo):
[866c983]653        """
654        Generate the visualization the virtual topology
655        """
656
657        neato = "/usr/local/bin/neato"
658        # These are used to parse neato output and to create the visualization
659        # file.
[0ac1934]660        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
[866c983]661        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
662                "%s</type></node>"
663
664        try:
665            # Node names
666            nodes = [ n['vname'] for n in topo['node'] ]
667            topo_lans = topo['lan']
[cc8d8e9]668        except KeyError, e:
669            raise service_error(service_error.internal, "Bad topology: %s" %e)
[866c983]670
671        lans = { }
672        links = { }
673
674        # Walk through the virtual topology, organizing the connections into
675        # 2-node connections (links) and more-than-2-node connections (lans).
676        # When a lan is created, it's added to the list of nodes (there's a
677        # node in the visualization for the lan).
678        for l in topo_lans:
679            if links.has_key(l['vname']):
680                if len(links[l['vname']]) < 2:
681                    links[l['vname']].append(l['vnode'])
682                else:
683                    nodes.append(l['vname'])
684                    lans[l['vname']] = links[l['vname']]
685                    del links[l['vname']]
686                    lans[l['vname']].append(l['vnode'])
687            elif lans.has_key(l['vname']):
688                lans[l['vname']].append(l['vnode'])
689            else:
690                links[l['vname']] = [ l['vnode'] ]
691
692
693        # Open up a temporary file for dot to turn into a visualization
694        try:
695            df, dotname = tempfile.mkstemp()
696            dotfile = os.fdopen(df, 'w')
[d3c8759]697        except EnvironmentError:
[866c983]698            raise service_error(service_error.internal,
699                    "Failed to open file in genviz")
700
[db6b092]701        try:
702            dnull = open('/dev/null', 'w')
[d3c8759]703        except EnvironmentError:
[db6b092]704            service_error(service_error.internal,
[886307f]705                    "Failed to open /dev/null in genviz")
706
[866c983]707        # Generate a dot/neato input file from the links, nodes and lans
708        try:
709            print >>dotfile, "graph G {"
710            for n in nodes:
711                print >>dotfile, '\t"%s"' % n
712            for l in links.keys():
713                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
714            for l in lans.keys():
715                for n in lans[l]:
716                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
717            print >>dotfile, "}"
718            dotfile.close()
719        except TypeError:
720            raise service_error(service_error.internal,
721                    "Single endpoint link in vtopo")
[d3c8759]722        except EnvironmentError:
[866c983]723            raise service_error(service_error.internal, "Cannot write dot file")
724
725        # Use dot to create a visualization
[5954004]726        try:
727            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
728                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
729                stderr=dnull, close_fds=True)
730        except EnvironmentError:
731            raise service_error(service_error.internal, 
732                    "Cannot generate visualization: is graphviz available?")
[db6b092]733        dnull.close()
[866c983]734
735        # Translate dot to vis format
736        vis_nodes = [ ]
737        vis = { 'node': vis_nodes }
738        for line in dot.stdout:
739            m = vis_re.match(line)
740            if m:
741                vn = m.group(1)
742                vis_node = {'name': vn, \
743                        'x': float(m.group(2)),\
744                        'y' : float(m.group(3)),\
745                    }
746                if vn in links.keys() or vn in lans.keys():
747                    vis_node['type'] = 'lan'
748                else:
749                    vis_node['type'] = 'node'
750                vis_nodes.append(vis_node)
751        rv = dot.wait()
752
753        os.remove(dotname)
[1fed67b]754        # XXX: graphviz seems to use low return codes for warnings, like
755        # "couldn't find font"
756        if rv < 2 : return vis
[866c983]757        else: return None
[d0ae12d]758
[db6b092]759
[725c55d]760    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
761            cert_pwd=None):
[e19b75c]762        """
763        Release access to testbed through fedd
764        """
[db6b092]765
[fd07c48]766        if not uri and tbmap:
767            uri = tbmap.get(tb, None)
[e19b75c]768        if not uri:
[69692a9]769            raise service_error(service_error.server_config, 
[e19b75c]770                    "Unknown testbed: %s" % tb)
[db6b092]771
[e19b75c]772        if self.local_access.has_key(uri):
773            resp = self.local_access[uri].ReleaseAccess(\
[29d5f7c]774                    { 'ReleaseAccessRequestBody' : 
775                        {'allocID': {'fedid': aid}},}, 
[725c55d]776                    fedid(file=cert_file))
[e19b75c]777            resp = { 'ReleaseAccessResponseBody': resp } 
778        else:
[29d5f7c]779            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
[725c55d]780                    cert_file, cert_pwd, self.trusted_certs)
[db6b092]781
[e19b75c]782        # better error coding
[db6b092]783
[5f6929a]784    def remote_ns2topdl(self, uri, desc):
[db6b092]785
[e19b75c]786        req = {
787                'description' : { 'ns2description': desc },
[db6b092]788            }
789
[5f6929a]790        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
[e19b75c]791                self.trusted_certs)
792
[5f6929a]793        if r.has_key('Ns2TopdlResponseBody'):
794            r = r['Ns2TopdlResponseBody']
[1dcaff4]795            ed = r.get('experimentdescription', None)
796            if ed.has_key('topdldescription'):
797                return topdl.Topology(**ed['topdldescription'])
[e19b75c]798            else:
799                raise service_error(service_error.protocol, 
800                        "Bad splitter response (no output)")
801        else:
802            raise service_error(service_error.protocol, "Bad splitter response")
[cc8d8e9]803
[e19b75c]804    class start_segment:
[fd556d1]805        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[f07fa49]806                cert_pwd=None, trusted_certs=None, caller=None,
807                log_collector=None):
[cc8d8e9]808            self.log = log
809            self.debug = debug
810            self.cert_file = cert_file
811            self.cert_pwd = cert_pwd
812            self.trusted_certs = None
813            self.caller = caller
[fd556d1]814            self.testbed = testbed
[f07fa49]815            self.log_collector = log_collector
[69692a9]816            self.response = None
[b4b19c7]817            self.node = { }
[e83f2f2]818            self.proof = None
[b4b19c7]819
820        def make_map(self, resp):
[29d5f7c]821            if 'segmentdescription' not in resp  or \
822                    'topdldescription' not in resp['segmentdescription']:
823                self.log.warn('No topology returned from startsegment')
824                return 
825
826            top = topdl.Topology(
827                    **resp['segmentdescription']['topdldescription'])
828
829            for e in top.elements:
830                if isinstance(e, topdl.Computer):
[1ae1aa2]831                    self.node[e.name] = e
[cc8d8e9]832
[43197eb]833        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
[cc8d8e9]834            req = {
835                    'allocID': { 'fedid' : aid }, 
836                    'segmentdescription': { 
837                        'topdldescription': topo.to_dict(),
838                    },
839                }
[e02cd14]840
841            if connInfo:
842                req['connection'] = connInfo
[43197eb]843
844            import_svcs = [ s for m in masters.values() \
845                    for s in m if self.testbed in s.importers]
846
847            if import_svcs or self.testbed in masters:
848                req['service'] = []
849
850            for s in import_svcs:
851                for r in s.reqs:
852                    sr = copy.deepcopy(r)
853                    sr['visibility'] = 'import';
854                    req['service'].append(sr)
855
856            for s in masters.get(self.testbed, []):
857                for r in s.reqs:
858                    sr = copy.deepcopy(r)
859                    sr['visibility'] = 'export';
860                    req['service'].append(sr)
861
[6c57fe9]862            if attrs:
863                req['fedAttr'] = attrs
[cc8d8e9]864
[fd556d1]865            try:
[13e3dd2]866                self.log.debug("Calling StartSegment at %s " % uri)
[fd556d1]867                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
868                        self.trusted_certs)
[f07fa49]869                if r.has_key('StartSegmentResponseBody'):
870                    lval = r['StartSegmentResponseBody'].get('allocationLog',
871                            None)
872                    if lval and self.log_collector:
873                        for line in  lval.splitlines(True):
874                            self.log_collector.write(line)
[b4b19c7]875                    self.make_map(r['StartSegmentResponseBody'])
[e83f2f2]876                    if 'proof' in r: self.proof = r['proof']
[69692a9]877                    self.response = r
[f07fa49]878                else:
879                    raise service_error(service_error.internal, 
880                            "Bad response!?: %s" %r)
[fd556d1]881                return True
882            except service_error, e:
883                self.log.error("Start segment failed on %s: %s" % \
884                        (self.testbed, e))
885                return False
[cc8d8e9]886
887
[5ae3857]888
[e19b75c]889    class terminate_segment:
[fd556d1]890        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
[5ae3857]891                cert_pwd=None, trusted_certs=None, caller=None):
892            self.log = log
893            self.debug = debug
894            self.cert_file = cert_file
895            self.cert_pwd = cert_pwd
896            self.trusted_certs = None
897            self.caller = caller
[fd556d1]898            self.testbed = testbed
[5ae3857]899
900        def __call__(self, uri, aid ):
901            req = {
[29d5f7c]902                    'allocID': {'fedid': aid }, 
[5ae3857]903                }
[fd556d1]904            try:
905                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
906                        self.trusted_certs)
907                return True
908            except service_error, e:
909                self.log.error("Terminate segment failed on %s: %s" % \
910                        (self.testbed, e))
911                return False
[db6b092]912
[6e33086]913    class info_segment(start_segment):
914        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
915                cert_pwd=None, trusted_certs=None, caller=None,
916                log_collector=None):
917            experiment_control_local.start_segment.__init__(self, debug, 
918                    log, testbed, cert_file, cert_pwd, trusted_certs, 
919                    caller, log_collector)
920
921        def __call__(self, uri, aid):
922            req = { 'allocID': { 'fedid' : aid } }
923
924            try:
925                self.log.debug("Calling InfoSegment at %s " % uri)
926                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
927                        self.trusted_certs)
928                if r.has_key('InfoSegmentResponseBody'):
929                    self.make_map(r['InfoSegmentResponseBody'])
930                    if 'proof' in r: self.proof = r['proof']
931                    self.response = r
932                else:
933                    raise service_error(service_error.internal, 
934                            "Bad response!?: %s" %r)
935                return True
936            except service_error, e:
937                self.log.error("Info segment failed on %s: %s" % \
938                        (self.testbed, e))
939                return False
940
[22a1a77]941    class operation_segment:
942        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
943                cert_pwd=None, trusted_certs=None, caller=None,
944                log_collector=None):
945            self.log = log
946            self.debug = debug
947            self.cert_file = cert_file
948            self.cert_pwd = cert_pwd
949            self.trusted_certs = None
950            self.caller = caller
951            self.testbed = testbed
952            self.status = None
953
954        def __call__(self, uri, aid, op, targets, params):
955            req = { 
956                    'allocID': { 'fedid' : aid },
957                    'operation': op,
958                    'target': targets,
959                    }
960            if params: req['parameter'] = params
961
962
963            try:
964                self.log.debug("Calling OperationSegment at %s " % uri)
965                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
966                        self.trusted_certs)
967                if 'OperationSegmentResponseBody' in r:
968                    r = r['OperationSegmentResponseBody']
969                    if 'status' in r:
970                        self.status = r['status']
971                else:
972                    raise service_error(service_error.internal, 
973                            "Bad response!?: %s" %r)
974                return True
975            except service_error, e:
976                self.log.error("Operation segment failed on %s: %s" % \
977                        (self.testbed, e))
978                return False
979
[6e33086]980    def annotate_topology(self, top, data):
[f671ef7]981        # These routines do various parts of the annotation
982        def add_new_names(nl, l):
983            """ add any names in nl to the list in l """
984            for n in nl:
985                if n not in l: l.append(n)
986       
987        def merge_services(ne, e):
988            for ns in ne.service:
989                # NB: the else is on the for
990                for s in e.service:
991                    if ns.name == s.name:
992                        s.importer = ns.importer
993                        s.param = ns.param
994                        s.description = ns.description
995                        s.status = ns.status
996                        break
997                else:
998                    e.service.append(ns)
999       
1000        def merge_oses(ne, e):
1001            """
1002            Merge the operating system entries of ne into e
1003            """
1004            for nos in ne.os:
1005                # NB: the else is on the for
1006                for os in e.os:
[7aaa8dc]1007                    if nos.name == os.name:
1008                        os.version = nos.version
1009                        os.version = nos.distribution
1010                        os.version = nos.distributionversion
[f671ef7]1011                        for a in nos.attribute:
[db3da0b]1012                            if os.get_attribute(a.attribute):
1013                                os.remove_attribute(a.attribute)
[7aaa8dc]1014                            os.set_attribute(a.attribute, a.value)
[f671ef7]1015                        break
1016                else:
[db3da0b]1017                    # If both nodes have one OS, this is a replacement
1018                    if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os
1019                    else: e.os.append(nos)
[6e33086]1020
1021        # Annotate the topology with embedding info
1022        for e in top.elements:
1023            if isinstance(e, topdl.Computer):
1024                for s in data:
[f671ef7]1025                    ne = s.node.get(e.name, None)
1026                    if ne is not None:
1027                        add_new_names(ne.localname, e.localname)
1028                        e.status = ne.status
1029                        merge_services(ne, e)
1030                        add_new_names(ne.operation, e.operation)
1031                        if ne.os: merge_oses(ne, e)
[6e33086]1032                        break
1033
1034
1035
[43197eb]1036    def allocate_resources(self, allocated, masters, eid, expid, 
[b4b19c7]1037            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
[c573278]1038            attrs=None, connInfo={}, tbmap=None, expcert=None):
[69692a9]1039
[cc8d8e9]1040        started = { }           # Testbeds where a sub-experiment started
1041                                # successfully
1042
1043        # XXX
1044        fail_soft = False
1045
[fd07c48]1046        if tbmap is None: tbmap = { }
1047
[cc8d8e9]1048        log = alloc_log or self.log
1049
[faea607]1050        tp = thread_pool(self.nthreads)
[cc8d8e9]1051        threads = [ ]
[b4b19c7]1052        starters = [ ]
[cc8d8e9]1053
[c573278]1054        if expcert:
1055            cert = expcert
1056            pw = None
1057        else:
1058            cert = self.cert_file
[822d31b]1059            pw = self.cert_pwd
[c573278]1060
[109a32a]1061        for tb in allocated.keys():
1062            # Create and start a thread to start the segment, and save it
1063            # to get the return value later
[ab847bc]1064            tb_attrs = copy.copy(attrs)
[faea607]1065            tp.wait_for_slot()
[9294673]1066            uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None)
[ab847bc]1067            base, suffix = split_testbed(tb)
1068            if suffix:
1069                tb_attrs.append({'attribute': 'experiment_name', 
[175b444]1070                    'value': "%s-%s" % (eid, suffix)})
[ab847bc]1071            else:
1072                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
[109a32a]1073            if not uri:
1074                raise service_error(service_error.internal, 
1075                        "Unknown testbed %s !?" % tb)
1076
[9294673]1077            aid = tbparams[tb].allocID
1078            if not aid:
[cc8d8e9]1079                raise service_error(service_error.internal, 
1080                        "No alloc id for testbed %s !?" % tb)
1081
[b4b19c7]1082            s = self.start_segment(log=log, debug=self.debug,
[c573278]1083                    testbed=tb, cert_file=cert,
1084                    cert_pwd=pw, trusted_certs=self.trusted_certs,
[b4b19c7]1085                    caller=self.call_StartSegment,
1086                    log_collector=log_collector)
1087            starters.append(s)
[faea607]1088            t  = pooled_thread(\
[b4b19c7]1089                    target=s, name=tb,
[ab847bc]1090                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
[faea607]1091                    pdata=tp, trace_file=self.trace_file)
[69692a9]1092            threads.append(t)
1093            t.start()
[cc8d8e9]1094
[109a32a]1095        # Wait until all finish (keep pinging the log, though)
1096        mins = 0
[dadc4da]1097        revoked = False
[faea607]1098        while not tp.wait_for_all_done(60.0):
[109a32a]1099            mins += 1
1100            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1101                    % mins)
[dadc4da]1102            if not revoked and \
[f52f5df]1103                    len([ t.getName() for t in threads if t.rv == False]) > 0:
[dadc4da]1104                # a testbed has failed.  Revoke this experiment's
1105                # synchronizarion values so that sub experiments will not
1106                # deadlock waiting for synchronization that will never happen
1107                self.log.info("A subexperiment has failed to swap in, " + \
1108                        "revoking synch keys")
1109                var_key = "fedid:%s" % expid
1110                for k in self.synch_store.all_keys():
1111                    if len(k) > 45 and k[0:46] == var_key:
1112                        self.synch_store.revoke_key(k)
1113                revoked = True
[69692a9]1114
[cc8d8e9]1115        failed = [ t.getName() for t in threads if not t.rv ]
1116        succeeded = [tb for tb in allocated.keys() if tb not in failed]
[3132419]1117
[cc8d8e9]1118        # If one failed clean up, unless fail_soft is set
[32e7d93]1119        if failed:
[cc8d8e9]1120            if not fail_soft:
[faea607]1121                tp.clear()
[cc8d8e9]1122                for tb in succeeded:
1123                    # Create and start a thread to stop the segment
[faea607]1124                    tp.wait_for_slot()
[9294673]1125                    uri = tbparams[tb].uri
[faea607]1126                    t  = pooled_thread(\
[32e7d93]1127                            target=self.terminate_segment(log=log,
[fd556d1]1128                                testbed=tb,
[696a689]1129                                cert_file=cert, 
1130                                cert_pwd=pw,
[32e7d93]1131                                trusted_certs=self.trusted_certs,
1132                                caller=self.call_TerminateSegment),
[9294673]1133                            args=(uri, tbparams[tb].allocID),
[32e7d93]1134                            name=tb,
[faea607]1135                            pdata=tp, trace_file=self.trace_file)
[cc8d8e9]1136                    t.start()
[f52f5df]1137                # Wait until all finish (if any are being stopped)
1138                if succeeded:
[faea607]1139                    tp.wait_for_all_done()
[cc8d8e9]1140
1141                # release the allocations
1142                for tb in tbparams.keys():
[725c55d]1143                    try:
[9294673]1144                        self.release_access(tb, tbparams[tb].allocID, 
1145                                tbmap=tbmap, uri=tbparams[tb].uri,
[696a689]1146                                cert_file=cert, cert_pwd=pw)
[725c55d]1147                    except service_error, e:
1148                        self.log.warn("Error releasing access: %s" % e.desc)
[cc8d8e9]1149                # Remove the placeholder
1150                self.state_lock.acquire()
[29d5f7c]1151                self.state[eid].status = 'failed'
[6e33086]1152                self.state[eid].updated()
[cc8d8e9]1153                if self.state_filename: self.write_state()
1154                self.state_lock.release()
[05e8da8]1155                # Remove the repo dir
1156                self.remove_dirs("%s/%s" %(self.repodir, expid))
1157                # Walk up tmpdir, deleting as we go
1158                if self.cleanup:
1159                    self.remove_dirs(tmpdir)
1160                else:
1161                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1162
[cc8d8e9]1163
1164                log.error("Swap in failed on %s" % ",".join(failed))
1165                return
1166        else:
[29d5f7c]1167            # Walk through the successes and gather the proofs
[e83f2f2]1168            proofs = { }
[b4b19c7]1169            for s in starters:
[e83f2f2]1170                if s.proof:
1171                    proofs[s.testbed] = s.proof
[6e33086]1172            self.annotate_topology(top, starters)
[cc8d8e9]1173            log.info("[start_segment]: Experiment %s active" % eid)
1174
1175
1176        # Walk up tmpdir, deleting as we go
[69692a9]1177        if self.cleanup:
[05e8da8]1178            self.remove_dirs(tmpdir)
[69692a9]1179        else:
1180            log.debug("[start_experiment]: not removing %s" % tmpdir)
[cc8d8e9]1181
[b4b19c7]1182        # Insert the experiment into our state and update the disk copy.
[cc8d8e9]1183        self.state_lock.acquire()
[29d5f7c]1184        self.state[expid].status = 'active'
[cc8d8e9]1185        self.state[eid] = self.state[expid]
[29d5f7c]1186        self.state[eid].top = top
[6e33086]1187        self.state[eid].updated()
[e83f2f2]1188        # Append startup proofs
[29d5f7c]1189        for f in self.state[eid].get_all_allocations():
1190            if f.tb in proofs:
1191                f.proof.append(proofs[f.tb])
[e83f2f2]1192
[cc8d8e9]1193        if self.state_filename: self.write_state()
1194        self.state_lock.release()
1195        return
1196
1197
[895a133]1198    def add_kit(self, e, kit):
1199        """
1200        Add a Software object created from the list of (install, location)
1201        tuples passed as kit  to the software attribute of an object e.  We
1202        do this enough to break out the code, but it's kind of a hack to
1203        avoid changing the old tuple rep.
1204        """
1205
1206        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1207
1208        if isinstance(e.software, list): e.software.extend(s)
1209        else: e.software = s
1210
[913dc7a]1211    def append_experiment_authorization(self, expid, attrs, 
1212            need_state_lock=True):
1213        """
1214        Append the authorization information to system state
1215        """
1216
1217        for p, a in attrs:
1218            self.auth.set_attribute(p, a)
1219        self.auth.save()
1220
1221        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1222        # XXX: really a no op?
1223        #self.state[expid]['auth'].update(attrs)
[913dc7a]1224        if self.state_filename: self.write_state()
1225        if need_state_lock: self.state_lock.release()
1226
[a96d946]1227    def clear_experiment_authorization(self, expid, need_state_lock=True):
[913dc7a]1228        """
1229        Attrs is a set of attribute principal pairs that need to be removed
1230        from the authenticator.  Remove them and save the authenticator.
1231        """
1232
1233        if need_state_lock: self.state_lock.acquire()
[29d5f7c]1234        # XXX: should be a no-op
1235        #if expid in self.state and 'auth' in self.state[expid]:
1236            #for p, a in self.state[expid]['auth']:
1237                #self.auth.unset_attribute(p, a)
1238            #self.state[expid]['auth'] = set()
[913dc7a]1239        if self.state_filename: self.write_state()
1240        if need_state_lock: self.state_lock.release()
[b67fd22]1241        self.auth.save()
[913dc7a]1242
[895a133]1243
[b4b19c7]1244    def create_experiment_state(self, fid, req, expid, expcert,
[a3ad8bd]1245            state='starting'):
[895a133]1246        """
1247        Create the initial entry in the experiment's state.  The expid and
1248        expcert are the experiment's fedid and certifacte that represents that
1249        ID, which are installed in the experiment state.  If the request
1250        includes a suggested local name that is used if possible.  If the local
1251        name is already taken by an experiment owned by this user that has
[a3ad8bd]1252        failed, it is overwritten.  Otherwise new letters are added until a
[895a133]1253        valid localname is found.  The generated local name is returned.
1254        """
1255
1256        if req.has_key('experimentID') and \
1257                req['experimentID'].has_key('localname'):
1258            overwrite = False
1259            eid = req['experimentID']['localname']
1260            # If there's an old failed experiment here with the same local name
1261            # and accessible by this user, we'll overwrite it, otherwise we'll
1262            # fall through and do the collision avoidance.
1263            old_expid = self.get_experiment_fedid(eid)
[74572ba]1264            if old_expid:
1265                users_experiment = True
1266                try:
1267                    self.check_experiment_access(fid, old_expid)
1268                except service_error, e:
1269                    if e.code == service_error.access: users_experiment = False
1270                    else: raise e
1271                if users_experiment:
1272                    self.state_lock.acquire()
[29d5f7c]1273                    status = self.state[eid].status
[74572ba]1274                    if status and status == 'failed':
1275                        # remove the old access attributes
1276                        self.clear_experiment_authorization(eid,
1277                                need_state_lock=False)
1278                        overwrite = True
1279                        del self.state[eid]
1280                        del self.state[old_expid]
1281                    self.state_lock.release()
[6031c9d]1282                else:
1283                    self.log.info('Experiment %s exists, ' % eid + \
1284                            'but this user cannot access it')
[895a133]1285            self.state_lock.acquire()
1286            while (self.state.has_key(eid) and not overwrite):
1287                eid += random.choice(string.ascii_letters)
1288            # Initial state
[29d5f7c]1289            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1290                    identity=expcert)
[895a133]1291            self.state[expid] = self.state[eid]
[913dc7a]1292            if self.state_filename: self.write_state()
1293            self.state_lock.release()
[895a133]1294        else:
1295            eid = self.exp_stem
1296            for i in range(0,5):
1297                eid += random.choice(string.ascii_letters)
1298            self.state_lock.acquire()
1299            while (self.state.has_key(eid)):
1300                eid = self.exp_stem
1301                for i in range(0,5):
1302                    eid += random.choice(string.ascii_letters)
1303            # Initial state
[29d5f7c]1304            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1305                    identity=expcert)
[895a133]1306            self.state[expid] = self.state[eid]
[913dc7a]1307            if self.state_filename: self.write_state()
1308            self.state_lock.release()
1309
1310        # Let users touch the state.  Authorize this fid and the expid itself
1311        # to touch the experiment, as well as allowing th eoverrides.
1312        self.append_experiment_authorization(eid, 
1313                set([(fid, expid), (expid,expid)] + \
1314                        [ (o, expid) for o in self.overrides]))
[895a133]1315
1316        return eid
1317
1318
1319    def allocate_ips_to_topo(self, top):
1320        """
[69692a9]1321        Add an ip4_address attribute to all the hosts in the topology, based on
[895a133]1322        the shared substrates on which they sit.  An /etc/hosts file is also
[69692a9]1323        created and returned as a list of hostfiles entries.  We also return
1324        the allocator, because we may need to allocate IPs to portals
1325        (specifically DRAGON portals).
[895a133]1326        """
1327        subs = sorted(top.substrates, 
1328                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1329                reverse=True)
1330        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1331        ifs = { }
1332        hosts = [ ]
1333
1334        for idx, s in enumerate(subs):
[289ff7e]1335            net_size = len(s.interfaces)+2
1336
1337            a = ips.allocate(net_size)
[895a133]1338            if a :
1339                base, num = a
[289ff7e]1340                if num < net_size: 
[895a133]1341                    raise service_error(service_error.internal,
1342                            "Allocator returned wrong number of IPs??")
1343            else:
1344                raise service_error(service_error.req, 
1345                        "Cannot allocate IP addresses")
[062b991]1346            mask = ips.min_alloc
1347            while mask < net_size:
1348                mask *= 2
[289ff7e]1349
[062b991]1350            netmask = ((2**32-1) ^ (mask-1))
[895a133]1351
1352            base += 1
1353            for i in s.interfaces:
1354                i.attribute.append(
1355                        topdl.Attribute('ip4_address', 
1356                            "%s" % ip_addr(base)))
[289ff7e]1357                i.attribute.append(
1358                        topdl.Attribute('ip4_netmask', 
1359                            "%s" % ip_addr(int(netmask))))
1360
[1e7f268]1361                hname = i.element.name
[895a133]1362                if ifs.has_key(hname):
1363                    hosts.append("%s\t%s-%s %s-%d" % \
1364                            (ip_addr(base), hname, s.name, hname,
1365                                ifs[hname]))
1366                else:
1367                    ifs[hname] = 0
1368                    hosts.append("%s\t%s-%s %s-%d %s" % \
1369                            (ip_addr(base), hname, s.name, hname,
1370                                ifs[hname], hname))
1371
1372                ifs[hname] += 1
1373                base += 1
[69692a9]1374        return hosts, ips
[895a133]1375
[1d73342]1376    def get_access_to_testbeds(self, testbeds, fid, allocated, 
[725c55d]1377            tbparam, masters, tbmap, expid=None, expcert=None):
[6e63513]1378        for tb in testbeds:
[1d73342]1379            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
[c573278]1380                    expcert)
[6e63513]1381            allocated[tb] = 1
1382
[1d73342]1383    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1384            expcert=None):
[6e63513]1385        """
1386        Get access to testbed through fedd and set the parameters for that tb
1387        """
1388        def get_export_project(svcs):
1389            """
1390            Look through for the list of federated_service for this testbed
1391            objects for a project_export service, and extract the project
1392            parameter.
1393            """
1394
1395            pe = [s for s in svcs if s.name=='project_export']
1396            if len(pe) == 1:
1397                return pe[0].params.get('project', None)
1398            elif len(pe) == 0:
1399                return None
1400            else:
1401                raise service_error(service_error.req,
1402                        "More than one project export is not supported")
1403
[d0912be]1404        def add_services(svcs, type, slist, keys):
[63c6664]1405            """
[d0912be]1406            Add the given services to slist.  type is import or export.  Also
1407            add a mapping entry from the assigned id to the original service
1408            record.
[63c6664]1409            """
1410            for i, s in enumerate(svcs):
1411                idx = '%s%d' % (type, i)
[d0912be]1412                keys[idx] = s
[63c6664]1413                sr = {'id': idx, 'name': s.name, 'visibility': type }
1414                if s.params:
1415                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1416                            for k, v in s.params.items()]
1417                slist.append(sr)
1418
[6e63513]1419        uri = tbmap.get(testbed_base(tb), None)
1420        if not uri:
1421            raise service_error(service_error.server_config, 
1422                    "Unknown testbed: %s" % tb)
1423
1424        export_svcs = masters.get(tb,[])
1425        import_svcs = [ s for m in masters.values() \
1426                for s in m \
1427                    if tb in s.importers ]
1428
1429        export_project = get_export_project(export_svcs)
1430        # Compose the credential list so that IDs come before attributes
1431        creds = set()
1432        keys = set()
[c573278]1433        certs = self.auth.get_creds_for_principal(fid)
1434        if expid:
1435            certs.update(self.auth.get_creds_for_principal(expid))
1436        for c in certs:
[6e63513]1437            keys.add(c.issuer_cert())
1438            creds.add(c.attribute_cert())
1439        creds = list(keys) + list(creds)
1440
[c573278]1441        if expcert: cert, pw = expcert, None
1442        else: cert, pw = self.cert_file, self.cert_pw
1443
[6e63513]1444        # Request credentials
1445        req = {
1446                'abac_credential': creds,
1447            }
1448        # Make the service request from the services we're importing and
1449        # exporting.  Keep track of the export request ids so we can
1450        # collect the resulting info from the access response.
1451        e_keys = { }
1452        if import_svcs or export_svcs:
[63c6664]1453            slist = []
[d0912be]1454            add_services(import_svcs, 'import', slist, e_keys)
1455            add_services(export_svcs, 'export', slist, e_keys)
[63c6664]1456            req['service'] = slist
[6e63513]1457
1458        if self.local_access.has_key(uri):
1459            # Local access call
1460            req = { 'RequestAccessRequestBody' : req }
1461            r = self.local_access[uri].RequestAccess(req, 
1462                    fedid(file=self.cert_file))
1463            r = { 'RequestAccessResponseBody' : r }
1464        else:
[c573278]1465            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
[6e63513]1466
[725c55d]1467        if r.has_key('RequestAccessResponseBody'):
1468            # Through to here we have a valid response, not a fault.
1469            # Access denied is a fault, so something better or worse than
1470            # access denied has happened.
1471            r = r['RequestAccessResponseBody']
1472            self.log.debug("[get_access] Access granted")
1473        else:
1474            raise service_error(service_error.protocol,
1475                        "Bad proxy response")
[e83f2f2]1476        if 'proof' not in r:
1477            raise service_error(service_error.protocol,
1478                        "Bad access response (no access proof)")
[ab3d6c5]1479
[9294673]1480        tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None),
[ab3d6c5]1481                tb=tb, uri=uri, proof=[r['proof']], 
1482                services=masters.get(tb, None))
[6e63513]1483
1484        # Collect the responses corresponding to the services this testbed
1485        # exports.  These will be the service requests that we will include in
1486        # the start segment requests (with appropriate visibility values) to
1487        # import and export the segments.
1488        for s in r.get('service', []):
1489            id = s.get('id', None)
[ab3d6c5]1490            # Note that this attaches the response to the object in the masters
1491            # data structure.  (The e_keys index disappears when this fcn
1492            # returns)
[6e63513]1493            if id and id in e_keys:
1494                e_keys[id].reqs.append(s)
1495
1496        # Add attributes to parameter space.  We don't allow attributes to
1497        # overlay any parameters already installed.
1498        for a in r.get('fedAttr', []):
1499            try:
[9294673]1500                if a['attribute']:
1501                    tbparam[tb].set_attribute(a['attribute'], a['value'])
[6e63513]1502            except KeyError:
1503                self.log.error("Bad attribute in response: %s" % a)
1504
1505
[7fe81be]1506    def split_topology(self, top, topo, testbeds):
[895a133]1507        """
[e02cd14]1508        Create the sub-topologies that are needed for experiment instantiation.
[895a133]1509        """
1510        for tb in testbeds:
1511            topo[tb] = top.clone()
[7fe81be]1512            # copy in for loop allows deletions from the original
1513            for e in [ e for e in topo[tb].elements]:
[895a133]1514                etb = e.get_attribute('testbed')
[7fe81be]1515                # NB: elements without a testbed attribute won't appear in any
1516                # sub topologies. 
1517                if not etb or etb != tb:
[895a133]1518                    for i in e.interface:
1519                        for s in i.subs:
1520                            try:
1521                                s.interfaces.remove(i)
1522                            except ValueError:
1523                                raise service_error(service_error.internal,
1524                                        "Can't remove interface??")
[7fe81be]1525                    topo[tb].elements.remove(e)
[895a133]1526            topo[tb].make_indices()
1527
[2627eb3]1528    def confirm_software(self, top):
1529        """
1530        Make sure that the software to be loaded in the topo is all available
1531        before we begin making access requests, etc.  This is a subset of
1532        wrangle_software.
1533        """
1534        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1535        pkgs.update([x.location for e in top.elements for x in e.software])
1536
1537        for pkg in pkgs:
1538            loc = pkg
1539
1540            scheme, host, path = urlparse(loc)[0:3]
1541            dest = os.path.basename(path)
1542            if not scheme:
1543                if not loc.startswith('/'):
1544                    loc = "/%s" % loc
1545                loc = "file://%s" %loc
1546            # NB: if scheme was found, loc == pkg
1547            try:
1548                u = urlopen(loc)
1549                u.close()
1550            except Exception, e:
1551                raise service_error(service_error.req, 
1552                        "Cannot open %s: %s" % (loc, e))
1553        return True
1554
[895a133]1555    def wrangle_software(self, expid, top, topo, tbparams):
1556        """
1557        Copy software out to the repository directory, allocate permissions and
1558        rewrite the segment topologies to look for the software in local
1559        places.
1560        """
1561
1562        # Copy the rpms and tarfiles to a distribution directory from
1563        # which the federants can retrieve them
1564        linkpath = "%s/software" %  expid
1565        softdir ="%s/%s" % ( self.repodir, linkpath)
1566        softmap = { }
[2627eb3]1567
1568        # self.fedkit and self.gateway kit are lists of tuples of
1569        # (install_location, download_location) this extracts the download
1570        # locations.
1571        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1572        pkgs.update([x.location for e in top.elements for x in e.software])
[895a133]1573        try:
1574            os.makedirs(softdir)
[d3c8759]1575        except EnvironmentError, e:
[895a133]1576            raise service_error(
1577                    "Cannot create software directory: %s" % e)
1578        # The actual copying.  Everything's converted into a url for copying.
[913dc7a]1579        auth_attrs = set()
[895a133]1580        for pkg in pkgs:
1581            loc = pkg
1582
1583            scheme, host, path = urlparse(loc)[0:3]
1584            dest = os.path.basename(path)
1585            if not scheme:
1586                if not loc.startswith('/'):
1587                    loc = "/%s" % loc
1588                loc = "file://%s" %loc
[2627eb3]1589            # NB: if scheme was found, loc == pkg
[895a133]1590            try:
1591                u = urlopen(loc)
1592            except Exception, e:
1593                raise service_error(service_error.req, 
1594                        "Cannot open %s: %s" % (loc, e))
1595            try:
1596                f = open("%s/%s" % (softdir, dest) , "w")
1597                self.log.debug("Writing %s/%s" % (softdir,dest) )
1598                data = u.read(4096)
1599                while data:
1600                    f.write(data)
1601                    data = u.read(4096)
1602                f.close()
1603                u.close()
1604            except Exception, e:
1605                raise service_error(service_error.internal,
1606                        "Could not copy %s: %s" % (loc, e))
1607            path = re.sub("/tmp", "", linkpath)
1608            # XXX
1609            softmap[pkg] = \
[7183b48]1610                    "%s/%s/%s" %\
1611                    ( self.repo_url, path, dest)
[895a133]1612
[913dc7a]1613            # Allow the individual segments to access the software by assigning
1614            # an attribute to each testbed allocation that encodes the data to
1615            # be released.  This expression collects the data for each run of
1616            # the loop.
1617            auth_attrs.update([
[9294673]1618                (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \
[913dc7a]1619                        for tb in tbparams.keys()])
1620
1621        self.append_experiment_authorization(expid, auth_attrs)
[895a133]1622
1623        # Convert the software locations in the segments into the local
1624        # copies on this host
1625        for soft in [ s for tb in topo.values() \
1626                for e in tb.elements \
1627                    if getattr(e, 'software', False) \
1628                        for s in e.software ]:
1629            if softmap.has_key(soft.location):
1630                soft.location = softmap[soft.location]
1631
1632
[a3ad8bd]1633    def new_experiment(self, req, fid):
1634        """
1635        The external interface to empty initial experiment creation called from
1636        the dispatcher.
1637
1638        Creates a working directory, splits the incoming description using the
1639        splitter script and parses out the avrious subsections using the
1640        lcasses above.  Once each sub-experiment is created, use pooled threads
1641        to instantiate them and start it all up.
1642        """
[7206e5a]1643        req = req.get('NewRequestBody', None)
1644        if not req:
1645            raise service_error(service_error.req,
1646                    "Bad request format (no NewRequestBody)")
1647
1648        if self.auth.import_credentials(data_list=req.get('credential', [])):
1649            self.auth.save()
[c573278]1650
[e83f2f2]1651        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1652                with_proof=True)
1653
1654        if not access_ok:
1655            raise service_error(service_error.access, "New access denied",
1656                    proof=[proof])
[a3ad8bd]1657
1658        try:
1659            tmpdir = tempfile.mkdtemp(prefix="split-")
[d3c8759]1660        except EnvironmentError:
[a3ad8bd]1661            raise service_error(service_error.internal, "Cannot create tmp dir")
1662
1663        try:
1664            access_user = self.accessdb[fid]
1665        except KeyError:
1666            raise service_error(service_error.internal,
1667                    "Access map and authorizer out of sync in " + \
[7183b48]1668                            "new_experiment for fedid %s"  % fid)
[a3ad8bd]1669
1670        # Generate an ID for the experiment (slice) and a certificate that the
1671        # allocator can use to prove they own it.  We'll ship it back through
[7206e5a]1672        # the encrypted connection.  If the requester supplied one, use it.
1673        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1674            expcert = req['experimentAccess']['X509']
[962ea25]1675            expid = fedid(certstr=expcert)
[7206e5a]1676            self.state_lock.acquire()
1677            if expid in self.state:
1678                self.state_lock.release()
1679                raise service_error(service_error.req, 
1680                        'fedid %s identifies an existing experiment' % expid)
1681            self.state_lock.release()
1682        else:
1683            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
[a3ad8bd]1684
1685        #now we're done with the tmpdir, and it should be empty
1686        if self.cleanup:
1687            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1688            os.rmdir(tmpdir)
1689        else:
1690            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1691
1692        eid = self.create_experiment_state(fid, req, expid, expcert, 
1693                state='empty')
1694
1695        rv = {
1696                'experimentID': [
1697                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1698                ],
1699                'experimentStatus': 'empty',
[e83f2f2]1700                'experimentAccess': { 'X509' : expcert },
1701                'proof': proof.to_dict(),
[a3ad8bd]1702            }
1703
1704        return rv
1705
[cf0ff4f]1706    # create_experiment sub-functions
1707
[5ecb9a3]1708    @staticmethod
[cf0ff4f]1709    def get_experiment_key(req, field='experimentID'):
[5ecb9a3]1710        """
1711        Parse the experiment identifiers out of the request (the request body
1712        tag has been removed).  Specifically this pulls either the fedid or the
1713        localname out of the experimentID field.  A fedid is preferred.  If
1714        neither is present or the request does not contain the fields,
1715        service_errors are raised.
1716        """
1717        # Get the experiment access
[cf0ff4f]1718        exp = req.get(field, None)
[5ecb9a3]1719        if exp:
1720            if exp.has_key('fedid'):
1721                key = exp['fedid']
1722            elif exp.has_key('localname'):
1723                key = exp['localname']
1724            else:
1725                raise service_error(service_error.req, "Unknown lookup type")
1726        else:
1727            raise service_error(service_error.req, "No request?")
1728
1729        return key
1730
1731    def get_experiment_ids_and_start(self, key, tmpdir):
1732        """
1733        Get the experiment name, id and access certificate from the state, and
1734        set the experiment state to 'starting'.  returns a triple (fedid,
1735        localname, access_cert_file). The access_cert_file is a copy of the
1736        contents of the access certificate, created in the tempdir with
1737        restricted permissions.  If things are confused, raise an exception.
1738        """
1739
1740        expid = eid = None
1741        self.state_lock.acquire()
[29d5f7c]1742        if key in self.state:
1743            exp = self.state[key]
1744            exp.status = "starting"
[6e33086]1745            exp.updated()
[29d5f7c]1746            expid = exp.fedid
1747            eid = exp.localname
1748            expcert = exp.identity
[5ecb9a3]1749        self.state_lock.release()
1750
1751        # make a protected copy of the access certificate so the experiment
1752        # controller can act as the experiment principal.
1753        if expcert:
1754            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1755            if not expcert_file:
1756                raise service_error(service_error.internal, 
1757                        "Cannot create temp cert file?")
1758        else:
1759            expcert_file = None
1760
1761        return (eid, expid, expcert_file)
1762
1763    def get_topology(self, req, tmpdir):
1764        """
1765        Get the ns2 content and put it into a file for parsing.  Call the local
1766        or remote parser and return the topdl.Topology.  Errors result in
1767        exceptions.  req is the request and tmpdir is a work directory.
1768        """
1769
1770        # The tcl parser needs to read a file so put the content into that file
1771        descr=req.get('experimentdescription', None)
1772        if descr:
[a7c0bcb]1773            if 'ns2description' in descr:
1774                file_content=descr['ns2description']
1775            elif 'topdldescription' in descr:
1776                return topdl.Topology(**descr['topdldescription'])
1777            else:
1778                raise service_error(service_error.req, 
1779                        'Unknown experiment description type')
[5ecb9a3]1780        else:
1781            raise service_error(service_error.req, "No experiment description")
1782
1783
1784        if self.splitter_url:
1785            self.log.debug("Calling remote topdl translator at %s" % \
1786                    self.splitter_url)
1787            top = self.remote_ns2topdl(self.splitter_url, file_content)
1788        else:
1789            tclfile = os.path.join(tmpdir, "experiment.tcl")
1790            if file_content:
1791                try:
1792                    f = open(tclfile, 'w')
1793                    f.write(file_content)
1794                    f.close()
1795                except EnvironmentError:
1796                    raise service_error(service_error.internal,
1797                            "Cannot write temp experiment description")
1798            else:
1799                raise service_error(service_error.req, 
1800                        "Only ns2descriptions supported")
1801            pid = "dummy"
1802            gid = "dummy"
1803            eid = "dummy"
1804
1805            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1806                str(self.muxmax), '-m', 'dummy']
1807
1808            tclcmd.extend([pid, gid, eid, tclfile])
1809
1810            self.log.debug("running local splitter %s", " ".join(tclcmd))
1811            # This is just fantastic.  As a side effect the parser copies
1812            # tb_compat.tcl into the current directory, so that directory
1813            # must be writable by the fedd user.  Doing this in the
1814            # temporary subdir ensures this is the case.
1815            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1816                    cwd=tmpdir)
1817            split_data = tclparser.stdout
1818
1819            top = topdl.topology_from_xml(file=split_data, top="experiment")
1820            os.remove(tclfile)
1821
1822        return top
1823
[1660f7c]1824    def get_testbed_services(self, req, testbeds):
[5ecb9a3]1825        """
[57facae]1826        Parse the services section of the request into two dicts mapping
1827        testbed to lists of federated_service objects.  The first dict maps all
1828        exporters of services to those service objects, the second maps
1829        testbeds to service objects only for services requiring portals.
[5ecb9a3]1830        """
[57facae]1831        # We construct both dicts here because deriving the second is more
1832        # comples than it looks - both the keys and lists can differ, and it's
1833        # much easier to generate both in one pass.
[5ecb9a3]1834        masters = { }
[57facae]1835        pmasters = { }
[5ecb9a3]1836        for s in req.get('service', []):
1837            # If this is a service request with the importall field
1838            # set, fill it out.
1839
1840            if s.get('importall', False):
1841                s['import'] = [ tb for tb in testbeds \
1842                        if tb not in s.get('export',[])]
1843                del s['importall']
1844
1845            # Add the service to masters
1846            for tb in s.get('export', []):
1847                if s.get('name', None):
1848
1849                    params = { }
1850                    for a in s.get('fedAttr', []):
1851                        params[a.get('attribute', '')] = a.get('value','')
1852
1853                    fser = federated_service(name=s['name'],
1854                            exporter=tb, importers=s.get('import',[]),
1855                            params=params)
1856                    if fser.name == 'hide_hosts' \
1857                            and 'hosts' not in fser.params:
1858                        fser.params['hosts'] = \
1859                                ",".join(tb_hosts.get(fser.exporter, []))
1860                    if tb in masters: masters[tb].append(fser)
1861                    else: masters[tb] = [fser]
[57facae]1862
1863                    if fser.portal:
1864                        if tb in pmasters: pmasters[tb].append(fser)
1865                        else: pmasters[tb] = [fser]
[5ecb9a3]1866                else:
1867                    self.log.error('Testbed service does not have name " + \
1868                            "and importers')
[57facae]1869        return masters, pmasters
[5ecb9a3]1870
[cf0ff4f]1871    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1872        """
[3df9b33]1873        Create the ssh keys necessary for interconnecting the portal nodes and
[cf0ff4f]1874        the global hosts file for letting each segment know about the IP
1875        addresses in play.  Save these into the repo.  Add attributes to the
1876        autorizer allowing access controllers to download them and return a set
[3df9b33]1877        of attributes that inform the segments where to find this stuff.  May
[cf0ff4f]1878        raise service_errors in if there are problems.
1879        """
1880        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1881        gw_secretkey_base = "fed.%s" % self.ssh_type
[3df9b33]1882        keydir = os.path.join(tmpdir, 'keys')
1883        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1884        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
[cf0ff4f]1885
1886        try:
1887            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1888        except ValueError:
1889            raise service_error(service_error.server_config, 
1890                    "Bad key type (%s)" % self.ssh_type)
1891
[3df9b33]1892        self.generate_seer_certs(keydir)
[cf0ff4f]1893
1894        # Copy configuration files into the remote file store
1895        # The config urlpath
1896        configpath = "/%s/config" % expid
1897        # The config file system location
1898        configdir ="%s%s" % ( self.repodir, configpath)
1899        try:
1900            os.makedirs(configdir)
1901        except EnvironmentError, e:
1902            raise service_error(service_error.internal,
1903                    "Cannot create config directory: %s" % e)
1904        try:
1905            f = open("%s/hosts" % configdir, "w")
1906            print >> f, string.join(hosts, '\n')
1907            f.close()
1908        except EnvironmentError, e:
1909            raise service_error(service_error.internal, 
1910                    "Cannot write hosts file: %s" % e)
1911        try:
[3df9b33]1912            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1913            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1914            copy_file(os.path.join(keydir, 'ca.pem'), 
1915                    os.path.join(configdir, 'ca.pem'))
1916            copy_file(os.path.join(keydir, 'node.pem'), 
1917                    os.path.join(configdir, 'node.pem'))
[cf0ff4f]1918        except EnvironmentError, e:
1919            raise service_error(service_error.internal, 
1920                    "Cannot copy keyfiles: %s" % e)
1921
[913dc7a]1922        # Allow the individual testbeds to access the configuration files,
1923        # again by setting an attribute for the relevant pathnames on each
1924        # allocation principal.  Yeah, that's a long list comprehension.
1925        self.append_experiment_authorization(expid, set([
[9294673]1926            (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \
[913dc7a]1927                    for tb in tbparams.keys() \
[3df9b33]1928                        for f in ("hosts", 'ca.pem', 'node.pem', 
1929                            gw_secretkey_base, gw_pubkey_base)]))
[cf0ff4f]1930
1931        attrs = [ 
1932                {
1933                    'attribute': 'ssh_pubkey', 
1934                    'value': '%s/%s/config/%s' % \
1935                            (self.repo_url, expid, gw_pubkey_base)
1936                },
1937                {
1938                    'attribute': 'ssh_secretkey', 
1939                    'value': '%s/%s/config/%s' % \
1940                            (self.repo_url, expid, gw_secretkey_base)
1941                },
1942                {
1943                    'attribute': 'hosts', 
1944                    'value': '%s/%s/config/hosts' % \
1945                            (self.repo_url, expid)
1946                },
[3df9b33]1947                {
1948                    'attribute': 'seer_ca_pem', 
1949                    'value': '%s/%s/config/%s' % \
1950                            (self.repo_url, expid, 'ca.pem')
1951                },
1952                {
1953                    'attribute': 'seer_node_pem', 
1954                    'value': '%s/%s/config/%s' % \
1955                            (self.repo_url, expid, 'node.pem')
1956                },
[cf0ff4f]1957            ]
1958        return attrs
1959
1960
1961    def get_vtopo(self, req, fid):
1962        """
1963        Return the stored virtual topology for this experiment
1964        """
1965        rv = None
1966        state = None
1967
1968        req = req.get('VtopoRequestBody', None)
1969        if not req:
1970            raise service_error(service_error.req,
1971                    "Bad request format (no VtopoRequestBody)")
1972        exp = req.get('experiment', None)
1973        if exp:
1974            if exp.has_key('fedid'):
1975                key = exp['fedid']
1976                keytype = "fedid"
1977            elif exp.has_key('localname'):
1978                key = exp['localname']
1979                keytype = "localname"
1980            else:
1981                raise service_error(service_error.req, "Unknown lookup type")
1982        else:
1983            raise service_error(service_error.req, "No request?")
1984
[e83f2f2]1985        proof = self.check_experiment_access(fid, key)
[cf0ff4f]1986
1987        self.state_lock.acquire()
[29d5f7c]1988        # XXX: this needs to be recalculated
[80b1e82]1989        if key in self.state:
1990            if self.state[key].top is not None:
1991                vtopo = topdl.topology_to_vtopo(self.state[key].top)
[e83f2f2]1992                rv = { 'experiment' : {keytype: key },
[80b1e82]1993                        'vtopo': vtopo,
[e83f2f2]1994                        'proof': proof.to_dict(), 
[cf0ff4f]1995                    }
1996            else:
[80b1e82]1997                state = self.state[key].status
[cf0ff4f]1998        self.state_lock.release()
1999
2000        if rv: return rv
2001        else: 
2002            if state:
2003                raise service_error(service_error.partial, 
2004                        "Not ready: %s" % state)
2005            else:
2006                raise service_error(service_error.req, "No such experiment")
2007
2008    def get_vis(self, req, fid):
2009        """
2010        Return the stored visualization for this experiment
2011        """
2012        rv = None
2013        state = None
2014
2015        req = req.get('VisRequestBody', None)
2016        if not req:
2017            raise service_error(service_error.req,
2018                    "Bad request format (no VisRequestBody)")
2019        exp = req.get('experiment', None)
2020        if exp:
2021            if exp.has_key('fedid'):
2022                key = exp['fedid']
2023                keytype = "fedid"
2024            elif exp.has_key('localname'):
2025                key = exp['localname']
2026                keytype = "localname"
2027            else:
2028                raise service_error(service_error.req, "Unknown lookup type")
2029        else:
2030            raise service_error(service_error.req, "No request?")
2031
[e83f2f2]2032        proof = self.check_experiment_access(fid, key)
[cf0ff4f]2033
2034        self.state_lock.acquire()
[80b1e82]2035        # Generate the visualization
2036        if key in self.state:
2037            if self.state[key].top is not None:
2038                try:
2039                    vis = self.genviz(
2040                            topdl.topology_to_vtopo(self.state[key].topo))
2041                except service_error, e:
2042                    self.state_lock.release()
2043                    raise e
2044                rv =  { 'experiment' : {keytype: key },
2045                        'vis': vis,
[e83f2f2]2046                        'proof': proof.to_dict(), 
[80b1e82]2047                        }
[cf0ff4f]2048            else:
[80b1e82]2049                state = self.state[key].status
[cf0ff4f]2050        self.state_lock.release()
2051
2052        if rv: return rv
2053        else:
2054            if state:
2055                raise service_error(service_error.partial, 
2056                        "Not ready: %s" % state)
2057            else:
2058                raise service_error(service_error.req, "No such experiment")
2059
2060   
[ec3aa4d]2061    def save_federant_information(self, allocated, tbparams, eid, top):
[cf0ff4f]2062        """
2063        Store the various data that have changed in the experiment state
2064        between when it was started and the beginning of resource allocation.
2065        This is basically the information about each local allocation.  This
[e83f2f2]2066        fills in the values of the placeholder allocation in the state.  It
2067        also collects the access proofs and returns them as dicts for a
2068        response message.
[cf0ff4f]2069        """
[29d5f7c]2070        self.state_lock.acquire()
2071        exp = self.state[eid]
[ec3aa4d]2072        exp.top = top.clone()
[cf0ff4f]2073        # save federant information
2074        for k in allocated.keys():
[9294673]2075            exp.add_allocation(tbparams[k])
[ec3aa4d]2076            top.elements.append(topdl.Testbed(uri=tbparams[k].uri,
[ab3d6c5]2077                type="testbed", localname=[k], 
2078                service=[ s.to_topdl() for s in tbparams[k].services]))
[cf0ff4f]2079
[e83f2f2]2080        # Access proofs for the response message
2081        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
[9294673]2082                    for p in tbparams[k].proof]
[6e33086]2083        exp.updated()
[cf0ff4f]2084        if self.state_filename: 
2085            self.write_state()
2086        self.state_lock.release()
[e83f2f2]2087        return proofs
[cf0ff4f]2088
2089    def clear_placeholder(self, eid, expid, tmpdir):
2090        """
2091        Clear the placeholder and remove any allocated temporary dir.
2092        """
2093
2094        self.state_lock.acquire()
2095        del self.state[eid]
2096        del self.state[expid]
2097        if self.state_filename: self.write_state()
2098        self.state_lock.release()
2099        if tmpdir and self.cleanup:
2100            self.remove_dirs(tmpdir)
2101
2102    # end of create_experiment sub-functions
[5ecb9a3]2103
[e19b75c]2104    def create_experiment(self, req, fid):
[db6b092]2105        """
2106        The external interface to experiment creation called from the
2107        dispatcher.
2108
2109        Creates a working directory, splits the incoming description using the
[43197eb]2110        splitter script and parses out the various subsections using the
[1a4ee0f]2111        classes above.  Once each sub-experiment is created, use pooled threads
2112        to instantiate them and start it all up.
[db6b092]2113        """
[7183b48]2114
2115        req = req.get('CreateRequestBody', None)
[5ecb9a3]2116        if req:
[cf0ff4f]2117            key = self.get_experiment_key(req)
[5ecb9a3]2118        else:
[7183b48]2119            raise service_error(service_error.req,
2120                    "Bad request format (no CreateRequestBody)")
2121
[6e63513]2122        # Import information from the requester
2123        if self.auth.import_credentials(data_list=req.get('credential', [])):
2124            self.auth.save()
2125
[5ecb9a3]2126        # Make sure that the caller can talk to us
[e83f2f2]2127        proof = self.check_experiment_access(fid, key)
[db6b092]2128
[fd07c48]2129        # Install the testbed map entries supplied with the request into a copy
2130        # of the testbed map.
2131        tbmap = dict(self.tbmap)
2132        for m in req.get('testbedmap', []):
2133            if 'testbed' in m and 'uri' in m:
2134                tbmap[m['testbed']] = m['uri']
2135
[5ecb9a3]2136        # a place to work
[db6b092]2137        try:
2138            tmpdir = tempfile.mkdtemp(prefix="split-")
[895a133]2139            os.mkdir(tmpdir+"/keys")
[d3c8759]2140        except EnvironmentError:
[db6b092]2141            raise service_error(service_error.internal, "Cannot create tmp dir")
2142
2143        tbparams = { }
2144
[5ecb9a3]2145        eid, expid, expcert_file = \
2146                self.get_experiment_ids_and_start(key, tmpdir)
[c573278]2147
[5ecb9a3]2148        # This catches exceptions to clear the placeholder if necessary
[db6b092]2149        try: 
[5ecb9a3]2150            if not (eid and expid):
2151                raise service_error(service_error.internal, 
2152                        "Cannot find local experiment info!?")
[5f6929a]2153
[5ecb9a3]2154            top = self.get_topology(req, tmpdir)
[2627eb3]2155            self.confirm_software(top)
[5ecb9a3]2156            # Assign the IPs
[69692a9]2157            hosts, ip_allocator = self.allocate_ips_to_topo(top)
[1a4ee0f]2158            # Find the testbeds to look up
[5334044]2159            tb_hosts = { }
[5ecb9a3]2160            testbeds = [ ]
2161            for e in top.elements:
2162                if isinstance(e, topdl.Computer):
2163                    tb = e.get_attribute('testbed') or 'default'
2164                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2165                    else: 
2166                        tb_hosts[tb] = [ e.name ]
2167                        testbeds.append(tb)
2168
[57facae]2169            masters, pmasters = self.get_testbed_services(req, testbeds)
[895a133]2170            allocated = { }         # Testbeds we can access
2171            topo ={ }               # Sub topologies
[e02cd14]2172            connInfo = { }          # Connection information
[5334044]2173
[2627eb3]2174            self.split_topology(top, topo, testbeds)
2175
[5ecb9a3]2176            self.get_access_to_testbeds(testbeds, fid, allocated, 
2177                    tbparams, masters, tbmap, expid, expcert_file)
[5f96438]2178
[cf0ff4f]2179            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
[cc8d8e9]2180
[fd07c48]2181            part = experiment_partition(self.auth, self.store_url, tbmap,
[175b444]2182                    self.muxmax, self.direct_transit)
[5334044]2183            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
[2761484]2184                    connInfo, expid)
[913dc7a]2185
2186            auth_attrs = set()
[ab847bc]2187            # Now get access to the dynamic testbeds (those added above)
2188            for tb in [ t for t in topo if t not in allocated]:
[cf0ff4f]2189                self.get_access(tb, tbparams, fid, masters, tbmap, 
2190                        expid, expcert_file)
[ab847bc]2191                allocated[tb] = 1
2192                store_keys = topo[tb].get_attribute('store_keys')
2193                # Give the testbed access to keys it exports or imports
2194                if store_keys:
[b16cfc0]2195                    auth_attrs.update(set([
[9294673]2196                        (tbparams[tb].allocID, sk) \
[913dc7a]2197                                for sk in store_keys.split(" ")]))
2198
2199            if auth_attrs:
2200                self.append_experiment_authorization(expid, auth_attrs)
[69692a9]2201
[cf0ff4f]2202            # transit and disconnected testbeds may not have a connInfo entry.
2203            # Fill in the blanks.
2204            for t in allocated.keys():
2205                if not connInfo.has_key(t):
2206                    connInfo[t] = { }
2207
[895a133]2208            self.wrangle_software(expid, top, topo, tbparams)
[cc8d8e9]2209
[e83f2f2]2210            proofs = self.save_federant_information(allocated, tbparams, 
[ec3aa4d]2211                    eid, top)
[866c983]2212        except service_error, e:
2213            # If something goes wrong in the parse (usually an access error)
2214            # clear the placeholder state.  From here on out the code delays
[db6b092]2215            # exceptions.  Failing at this point returns a fault to the remote
2216            # caller.
[cf0ff4f]2217            self.clear_placeholder(eid, expid, tmpdir)
[866c983]2218            raise e
2219
[db6b092]2220        # Start the background swapper and return the starting state.  From
2221        # here on out, the state will stick around a while.
[866c983]2222
[db6b092]2223        # Create a logger that logs to the experiment's state object as well as
2224        # to the main log file.
2225        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
[29d5f7c]2226        alloc_collector = self.list_log(self.state[eid].log)
[f07fa49]2227        h = logging.StreamHandler(alloc_collector)
[db6b092]2228        # XXX: there should be a global one of these rather than repeating the
2229        # code.
2230        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2231                    '%d %b %y %H:%M:%S'))
2232        alloc_log.addHandler(h)
[617592b]2233
[db6b092]2234        # Start a thread to do the resource allocation
[e19b75c]2235        t  = Thread(target=self.allocate_resources,
[43197eb]2236                args=(allocated, masters, eid, expid, tbparams, 
[b4b19c7]2237                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
[725c55d]2238                    connInfo, tbmap, expcert_file),
[db6b092]2239                name=eid)
2240        t.start()
2241
2242        rv = {
2243                'experimentID': [
2244                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2245                ],
2246                'experimentStatus': 'starting',
[e83f2f2]2247                'proof': [ proof.to_dict() ] + proofs,
[db6b092]2248            }
2249
2250        return rv
[9479343]2251   
2252    def get_experiment_fedid(self, key):
2253        """
[db6b092]2254        find the fedid associated with the localname key in the state database.
[9479343]2255        """
2256
[db6b092]2257        rv = None
2258        self.state_lock.acquire()
[29d5f7c]2259        if key in self.state:
2260            rv = self.state[key].fedid
[db6b092]2261        self.state_lock.release()
2262        return rv
[a97394b]2263
[4064742]2264    def check_experiment_access(self, fid, key):
[866c983]2265        """
2266        Confirm that the fid has access to the experiment.  Though a request
2267        may be made in terms of a local name, the access attribute is always
2268        the experiment's fedid.
2269        """
2270        if not isinstance(key, fedid):
[db6b092]2271            key = self.get_experiment_fedid(key)
[866c983]2272
[e83f2f2]2273        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2274
2275        if access_ok:
2276            return proof
[866c983]2277        else:
[e83f2f2]2278            raise service_error(service_error.access, "Access Denied",
2279                proof)
[4064742]2280
2281
[db6b092]2282    def get_handler(self, path, fid):
[cf0ff4f]2283        """
2284        Perhaps surprisingly named, this function handles HTTP GET requests to
2285        this server (SOAP requests are POSTs).
2286        """
[7183b48]2287        self.log.info("Get handler %s %s" % (path, fid))
[e83f2f2]2288        # XXX: log proofs?
[6c57fe9]2289        if self.auth.check_attribute(fid, path):
2290            return ("%s/%s" % (self.repodir, path), "application/binary")
2291        else:
2292            return (None, None)
[987aaa1]2293
[6e33086]2294    def update_info(self, key, force=False):
2295        top = None
2296        self.state_lock.acquire()
2297        if key in self.state:
2298            if force or self.state[key].older_than(self.info_cache_limit):
2299                top = self.state[key].top
2300                if top is not None: top = top.clone()
2301                d1, info_params, cert, d2 = \
2302                        self.get_segment_info(self.state[key], need_lock=False)
2303        self.state_lock.release()
2304
2305        if top is None: return
2306
2307        try:
2308            tmpdir = tempfile.mkdtemp(prefix="info-")
2309        except EnvironmentError:
2310            raise service_error(service_error.internal, 
2311                    "Cannot create tmp dir")
2312        cert_file = self.make_temp_certfile(cert, tmpdir)
2313
2314        data = []
2315        try:
2316            for k, (uri, aid) in info_params.items():
2317                info=self.info_segment(log=self.log, testbed=uri,
2318                            cert_file=cert_file, cert_pwd=None,
2319                            trusted_certs=self.trusted_certs,
2320                            caller=self.call_InfoSegment)
2321                info(uri, aid)
2322                data.append(info)
2323        # Clean up the tmpdir no matter what
2324        finally:
2325            if tmpdir: self.remove_dirs(tmpdir)
2326
2327        self.annotate_topology(top, data)
2328        self.state_lock.acquire()
2329        if key in self.state:
2330            self.state[key].top = top
2331            self.state[key].updated()
2332            if self.state_filename: self.write_state()
2333        self.state_lock.release()
2334
[29d5f7c]2335   
[c52c48d]2336    def get_info(self, req, fid):
[866c983]2337        """
2338        Return all the stored info about this experiment
2339        """
2340        rv = None
2341
2342        req = req.get('InfoRequestBody', None)
2343        if not req:
2344            raise service_error(service_error.req,
[65f3f29]2345                    "Bad request format (no InfoRequestBody)")
[866c983]2346        exp = req.get('experiment', None)
[80b1e82]2347        legacy = req.get('legacy', False)
[6e33086]2348        fresh = req.get('fresh', False)
[866c983]2349        if exp:
2350            if exp.has_key('fedid'):
2351                key = exp['fedid']
2352                keytype = "fedid"
2353            elif exp.has_key('localname'):
2354                key = exp['localname']
2355                keytype = "localname"
2356            else:
2357                raise service_error(service_error.req, "Unknown lookup type")
2358        else:
2359            raise service_error(service_error.req, "No request?")
2360
[e83f2f2]2361        proof = self.check_experiment_access(fid, key)
[866c983]2362
[6e33086]2363        self.update_info(key, fresh)
2364
[866c983]2365        self.state_lock.acquire()
2366        if self.state.has_key(key):
[29d5f7c]2367            rv = self.state[key].get_info()
[6e33086]2368            # Copy the topo if we need legacy annotations
[80b1e82]2369            if legacy:
2370                top = self.state[key].top
2371                if top is not None: top = top.clone()
[866c983]2372        self.state_lock.release()
2373
[80b1e82]2374        # If the legacy visualization and topology representations are
[6e33086]2375        # requested, calculate them and add them to the return.
[80b1e82]2376        if legacy and rv is not None:
2377            if top is not None:
2378                vtopo = topdl.topology_to_vtopo(top)
2379                if vtopo is not None:
2380                    rv['vtopo'] = vtopo
2381                    try:
2382                        vis = self.genviz(vtopo)
2383                    except service_error, e:
2384                        self.log.debug('Problem generating visualization: %s' \
2385                                % e)
2386                        vis = None
2387                    if vis is not None:
2388                        rv['vis'] = vis
[db6b092]2389        if rv:
[29d5f7c]2390            rv['proof'] = proof.to_dict()
2391            return rv
[bd3e314]2392        else:
[db6b092]2393            raise service_error(service_error.req, "No such experiment")
[7a8d667]2394
[22a1a77]2395    def operate_on_segments(self, op_params, cert, op, testbeds, params, 
2396            results):
2397        """
2398        Call OperateSegment on multiple testbeds and gather the results.
2399        op_params contains the parameters needed to contact that testbed, cert
2400        is a certificate containing the fedid to use, op is the operation,
2401        testbeds is a dict mapping testbed name to targets in that testbed,
2402        params are the parameters to include a,d results is a growing list of
2403        the results of the calls.
2404        """
2405        try:
2406            tmpdir = tempfile.mkdtemp(prefix="info-")
2407        except EnvironmentError:
2408            raise service_error(service_error.internal, 
2409                    "Cannot create tmp dir")
2410        cert_file = self.make_temp_certfile(cert, tmpdir)
2411
2412        try:
2413            for tb, targets in testbeds.items():
2414                if tb in op_params:
2415                    uri, aid = op_params[tb]
2416                    operate=self.operation_segment(log=self.log, testbed=uri,
2417                                cert_file=cert_file, cert_pwd=None,
2418                                trusted_certs=self.trusted_certs,
2419                                caller=self.call_OperationSegment)
2420                    if operate(uri, aid, op, targets, params):
2421                        if operate.status is not None:
2422                            results.extend(operate.status)
2423                            continue
2424                # Something went wrong in a weird way.  Add statuses
2425                # that reflect that to results
2426                for t in targets:
2427                    results.append(operation_status(t, 
2428                        operation_status.federant,
[b709861]2429                        'Unexpected error on %s' % tb))
[22a1a77]2430        # Clean up the tmpdir no matter what
2431        finally:
2432            if tmpdir: self.remove_dirs(tmpdir)
2433
2434    def do_operation(self, req, fid):
2435        """
2436        Find the testbeds holding each target and ask them to carry out the
2437        operation.  Return the statuses.
2438        """
2439        # Map an element to the testbed containing it
2440        def element_to_tb(e):
2441            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
2442            elif isinstance(e, topdl.Testbed): return e.name
2443            else: return None
2444        # If d is an operation_status object, make it a dict
2445        def make_dict(d):
2446            if isinstance(d, dict): return d
2447            elif isinstance(d, operation_status): return d.to_dict()
2448            else: return { }
2449
[b709861]2450        def element_name(e):
2451            if isinstance(e, topdl.Computer): return e.name
2452            elif isinstance(e, topdl.Testbed): 
2453                if e.localname: return e.localname[0]
2454                else: return None
2455            else: return None
2456
[22a1a77]2457        req = req.get('OperationRequestBody', None)
2458        if not req:
2459            raise service_error(service_error.req,
2460                    "Bad request format (no OperationRequestBody)")
2461        exp = req.get('experiment', None)
2462        op = req.get('operation', None)
[b709861]2463        targets = set(req.get('target', []))
[22a1a77]2464        params = req.get('parameter', None)
2465
2466        if exp:
2467            if 'fedid' in exp:
2468                key = exp['fedid']
2469                keytype = "fedid"
2470            elif 'localname' in exp:
2471                key = exp['localname']
2472                keytype = "localname"
2473            else:
2474                raise service_error(service_error.req, "Unknown lookup type")
2475        else:
2476            raise service_error(service_error.req, "No request?")
2477
[b709861]2478        if op is None or not targets:
[22a1a77]2479            raise service_error(service_error.req, "No request?")
2480
2481        proof = self.check_experiment_access(fid, key)
2482        self.state_lock.acquire()
2483        if key in self.state:
2484            d1, op_params, cert, d2 = \
[b709861]2485                    self.get_segment_info(self.state[key], need_lock=False,
2486                            key='tb')
[22a1a77]2487            top = self.state[key].top
2488            if top is not None:
2489                top = top.clone()
2490        self.state_lock.release()
2491
2492        if top is None:
2493            raise service_error(service_error.partial, "No topology yet", 
2494                    proof=proof)
2495
2496        testbeds = { }
2497        results = []
2498        for e in top.elements:
[b709861]2499            ename = element_name(e)
2500            if ename in targets:
[22a1a77]2501                tb = element_to_tb(e)
[b709861]2502                targets.remove(ename)
[22a1a77]2503                if tb is not None:
[b709861]2504                    if tb in testbeds: testbeds[tb].append(ename)
2505                    else: testbeds[tb] = [ ename ]
[22a1a77]2506                else:
2507                    results.append(operation_status(e.name, 
2508                        code=operation_status.no_target, 
2509                        description='Cannot map target to testbed'))
2510
[b709861]2511        for t in targets:
2512            results.append(operation_status(t, operation_status.no_target))
2513
[22a1a77]2514        self.operate_on_segments(op_params, cert, op, testbeds, params,
2515                results)
2516
2517        return { 
2518                'experiment': exp, 
[b709861]2519                'status': [make_dict(r) for r in results],
[22a1a77]2520                'proof': proof.to_dict()
2521                }
2522
2523
[65f3f29]2524    def get_multi_info(self, req, fid):
2525        """
2526        Return all the stored info that this fedid can access
2527        """
[e83f2f2]2528        rv = { 'info': [ ], 'proof': [ ] }
[65f3f29]2529
[db6b092]2530        self.state_lock.acquire()
2531        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
[829246e]2532            try:
[e83f2f2]2533                proof = self.check_experiment_access(fid, key)
[829246e]2534            except service_error, e:
2535                if e.code == service_error.access:
2536                    continue
2537                else:
2538                    self.state_lock.release()
2539                    raise e
[65f3f29]2540
[db6b092]2541            if self.state.has_key(key):
[29d5f7c]2542                e = self.state[key].get_info()
2543                e['proof'] = proof.to_dict()
[db6b092]2544                rv['info'].append(e)
[e83f2f2]2545                rv['proof'].append(proof.to_dict())
[65f3f29]2546        self.state_lock.release()
[db6b092]2547        return rv
[65f3f29]2548
[cf0ff4f]2549    def check_termination_status(self, fed_exp, force):
[e07c8f3]2550        """
[cf0ff4f]2551        Confirm that the experiment is sin a valid state to stop (or force it)
2552        return the state - invalid states for deletion and force settings cause
2553        exceptions.
[e07c8f3]2554        """
[cf0ff4f]2555        self.state_lock.acquire()
[29d5f7c]2556        status = fed_exp.status
[e07c8f3]2557
[cf0ff4f]2558        if status:
2559            if status in ('starting', 'terminating'):
2560                if not force:
2561                    self.state_lock.release()
2562                    raise service_error(service_error.partial, 
2563                            'Experiment still being created or destroyed')
2564                else:
2565                    self.log.warning('Experiment in %s state ' % status + \
2566                            'being terminated by force.')
2567            self.state_lock.release()
2568            return status
[725c55d]2569        else:
[cf0ff4f]2570            # No status??? trouble
2571            self.state_lock.release()
2572            raise service_error(service_error.internal,
2573                    "Experiment has no status!?")
2574
[b709861]2575    def get_segment_info(self, fed_exp, need_lock=True, key='aid'):
[cf0ff4f]2576        ids = []
2577        term_params = { }
[6e33086]2578        if need_lock: self.state_lock.acquire()
[29d5f7c]2579        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2580        expcert = fed_exp.identity
2581        repo = "%s" % fed_exp.fedid
[cf0ff4f]2582
2583        # Collect the allocation/segment ids into a dict keyed by the fedid
[29d5f7c]2584        # of the allocation that contains a tuple of uri, aid
2585        for i, fed in enumerate(fed_exp.get_all_allocations()):
2586            uri = fed.uri
2587            aid = fed.allocID
[b709861]2588            if key == 'aid': term_params[aid] = (uri, aid)
2589            elif key == 'tb': term_params[fed.tb] = (uri, aid)
2590
[6e33086]2591        if need_lock: self.state_lock.release()
2592        return ids, term_params, expcert, repo
2593
2594
2595    def get_termination_info(self, fed_exp):
2596        self.state_lock.acquire()
2597        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
[cf0ff4f]2598        # Change the experiment state
[29d5f7c]2599        fed_exp.status = 'terminating'
[6e33086]2600        fed_exp.updated()
[cf0ff4f]2601        if self.state_filename: self.write_state()
2602        self.state_lock.release()
2603
2604        return ids, term_params, expcert, repo
2605
2606
2607    def deallocate_resources(self, term_params, expcert, status, force, 
2608            dealloc_log):
2609        tmpdir = None
2610        # This try block makes sure the tempdir is cleared
2611        try:
2612            # If no expcert, try the deallocation as the experiment
2613            # controller instance.
2614            if expcert and self.auth_type != 'legacy': 
2615                try:
2616                    tmpdir = tempfile.mkdtemp(prefix="term-")
2617                except EnvironmentError:
2618                    raise service_error(service_error.internal, 
2619                            "Cannot create tmp dir")
2620                cert_file = self.make_temp_certfile(expcert, tmpdir)
2621                pw = None
2622            else: 
2623                cert_file = self.cert_file
2624                pw = self.cert_pwd
2625
2626            # Stop everyone.  NB, wait_for_all waits until a thread starts
2627            # and then completes, so we can't wait if nothing starts.  So,
2628            # no tbparams, no start.
2629            if len(term_params) > 0:
2630                tp = thread_pool(self.nthreads)
2631                for k, (uri, aid) in term_params.items():
2632                    # Create and start a thread to stop the segment
2633                    tp.wait_for_slot()
2634                    t  = pooled_thread(\
2635                            target=self.terminate_segment(log=dealloc_log,
2636                                testbed=uri,
2637                                cert_file=cert_file, 
2638                                cert_pwd=pw,
2639                                trusted_certs=self.trusted_certs,
2640                                caller=self.call_TerminateSegment),
2641                            args=(uri, aid), name=k,
2642                            pdata=tp, trace_file=self.trace_file)
2643                    t.start()
2644                # Wait for completions
2645                tp.wait_for_all_done()
2646
2647            # release the allocations (failed experiments have done this
2648            # already, and starting experiments may be in odd states, so we
2649            # ignore errors releasing those allocations
2650            try: 
2651                for k, (uri, aid)  in term_params.items():
2652                    self.release_access(None, aid, uri=uri,
2653                            cert_file=cert_file, cert_pwd=pw)
2654            except service_error, e:
2655                if status != 'failed' and not force:
2656                    raise e
2657
2658        # Clean up the tmpdir no matter what
2659        finally:
2660            if tmpdir: self.remove_dirs(tmpdir)
2661
[7a8d667]2662    def terminate_experiment(self, req, fid):
[866c983]2663        """
2664        Swap this experiment out on the federants and delete the shared
2665        information
2666        """
2667        tbparams = { }
2668        req = req.get('TerminateRequestBody', None)
2669        if not req:
2670            raise service_error(service_error.req,
2671                    "Bad request format (no TerminateRequestBody)")
2672
[cf0ff4f]2673        key = self.get_experiment_key(req, 'experiment')
[e83f2f2]2674        proof = self.check_experiment_access(fid, key)
[cf0ff4f]2675        exp = req.get('experiment', False)
2676        force = req.get('force', False)
[866c983]2677
[db6b092]2678        dealloc_list = [ ]
[46e4682]2679
2680
[5ae3857]2681        # Create a logger that logs to the dealloc_list as well as to the main
2682        # log file.
2683        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2684        h = logging.StreamHandler(self.list_log(dealloc_list))
2685        # XXX: there should be a global one of these rather than repeating the
2686        # code.
2687        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2688                    '%d %b %y %H:%M:%S'))
2689        dealloc_log.addHandler(h)
2690
2691        self.state_lock.acquire()
2692        fed_exp = self.state.get(key, None)
[cf0ff4f]2693        self.state_lock.release()
[e07c8f3]2694        repo = None
[5ae3857]2695
2696        if fed_exp:
[cf0ff4f]2697            status = self.check_termination_status(fed_exp, force)
[6e33086]2698            # get_termination_info updates the experiment state
[cf0ff4f]2699            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2700            self.deallocate_resources(term_params, expcert, status, force, 
2701                    dealloc_log)
[5ae3857]2702
2703            # Remove the terminated experiment
2704            self.state_lock.acquire()
2705            for id in ids:
[a96d946]2706                self.clear_experiment_authorization(id, need_state_lock=False)
[cf0ff4f]2707                if id in self.state: del self.state[id]
[5ae3857]2708
2709            if self.state_filename: self.write_state()
2710            self.state_lock.release()
2711
[2761484]2712            # Delete any synch points associated with this experiment.  All
2713            # synch points begin with the fedid of the experiment.
2714            fedid_keys = set(["fedid:%s" % f for f in ids \
2715                    if isinstance(f, fedid)])
2716            for k in self.synch_store.all_keys():
2717                try:
2718                    if len(k) > 45 and k[0:46] in fedid_keys:
2719                        self.synch_store.del_value(k)
[dadc4da]2720                except synch_store.BadDeletionError:
[2761484]2721                    pass
2722            self.write_store()
[e07c8f3]2723
2724            # Remove software and other cached stuff from the filesystem.
2725            if repo:
2726                self.remove_dirs("%s/%s" % (self.repodir, repo))
[e83f2f2]2727       
[5ae3857]2728            return { 
2729                    'experiment': exp , 
[cf0ff4f]2730                    'deallocationLog': string.join(dealloc_list, ''),
[e83f2f2]2731                    'proof': [proof.to_dict()],
[5ae3857]2732                    }
2733        else:
2734            raise service_error(service_error.req, "No saved state")
[2761484]2735
2736
2737    def GetValue(self, req, fid):
2738        """
2739        Get a value from the synchronized store
2740        """
2741        req = req.get('GetValueRequestBody', None)
2742        if not req:
2743            raise service_error(service_error.req,
2744                    "Bad request format (no GetValueRequestBody)")
2745       
[cf0ff4f]2746        name = req.get('name', None)
2747        wait = req.get('wait', False)
[2761484]2748        rv = { 'name': name }
2749
[e83f2f2]2750        if not name:
2751            raise service_error(service_error.req, "No name?")
2752
2753        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2754
2755        if access_ok:
[d8442da]2756            self.log.debug("[GetValue] asking for %s " % name)
[dadc4da]2757            try:
2758                v = self.synch_store.get_value(name, wait)
2759            except synch_store.RevokedKeyError:
2760                # No more synch on this key
2761                raise service_error(service_error.federant, 
2762                        "Synch key %s revoked" % name)
[2761484]2763            if v is not None:
2764                rv['value'] = v
[e83f2f2]2765            rv['proof'] = proof.to_dict()
[109a32a]2766            self.log.debug("[GetValue] got %s from %s" % (v, name))
[2761484]2767            return rv
2768        else:
[e83f2f2]2769            raise service_error(service_error.access, "Access Denied",
2770                    proof=proof)
[2761484]2771       
2772
2773    def SetValue(self, req, fid):
2774        """
2775        Set a value in the synchronized store
2776        """
2777        req = req.get('SetValueRequestBody', None)
2778        if not req:
2779            raise service_error(service_error.req,
2780                    "Bad request format (no SetValueRequestBody)")
2781       
[cf0ff4f]2782        name = req.get('name', None)
2783        v = req.get('value', '')
[2761484]2784
[e83f2f2]2785        if not name:
2786            raise service_error(service_error.req, "No name?")
2787
2788        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2789
2790        if access_ok:
[2761484]2791            try:
2792                self.synch_store.set_value(name, v)
2793                self.write_store()
[109a32a]2794                self.log.debug("[SetValue] set %s to %s" % (name, v))
[2761484]2795            except synch_store.CollisionError:
2796                # Translate into a service_error
2797                raise service_error(service_error.req,
2798                        "Value already set: %s" %name)
[dadc4da]2799            except synch_store.RevokedKeyError:
2800                # No more synch on this key
2801                raise service_error(service_error.federant, 
2802                        "Synch key %s revoked" % name)
[e83f2f2]2803                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
[2761484]2804        else:
[e83f2f2]2805            raise service_error(service_error.access, "Access Denied",
2806                    proof=proof)
Note: See TracBrowser for help on using the repository browser.