#!/usr/local/bin/python import os,sys import re import random import string import subprocess import tempfile import copy import pickle import logging import signal import time import shutil import zipfile import os.path import traceback # For parsing visualization output and splitter output import xml.parsers.expat from threading import Lock, Thread, Condition from subprocess import call, Popen, PIPE from string import join from urlparse import urlparse from urllib2 import urlopen, URLError from util import * from deter import fedid, generate_fedid from remote_service import xmlrpc_handler, soap_handler, service_caller from service_error import service_error from synch_store import synch_store from experiment_partition import experiment_partition from experiment_control_legacy import experiment_control_legacy from authorizer import abac_authorizer from thread_pool import thread_pool, pooled_thread from experiment_info import experiment_info, allocation_info, federated_service from operation_status import operation_status from deter import topdl from deter import ip_allocator from deter import ip_addr from deter import topology_to_route_file import list_log class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.experiment_control") fl.addHandler(nullHandler()) class experiment_control_local(experiment_control_legacy): """ Control of experiments that this system can directly access. Includes experiment creation, termination and information dissemination. Thred safe. """ class ssh_cmd_timeout(RuntimeError): pass call_RequestAccess = service_caller('RequestAccess') call_ReleaseAccess = service_caller('ReleaseAccess') call_StartSegment = service_caller('StartSegment') call_TerminateSegment = service_caller('TerminateSegment') call_InfoSegment = service_caller('InfoSegment') call_OperationSegment = service_caller('OperationSegment') call_Ns2Topdl = service_caller('Ns2Topdl') def __init__(self, config=None, auth=None): """ Intialize the various attributes, most from the config object """ def parse_tarfile_list(tf): """ Parse a tarfile list from the configuration. This is a set of paths and tarfiles separated by spaces. """ rv = [ ] if tf is not None: tl = tf.split() while len(tl) > 1: p, t = tl[0:2] del tl[0:2] rv.append((p, t)) return rv self.list_log = list_log.list_log self.cert_file = config.get("experiment_control", "cert_file") if self.cert_file: self.cert_pwd = config.get("experiment_control", "cert_pwd") else: self.cert_file = config.get("globals", "cert_file") self.cert_pwd = config.get("globals", "cert_pwd") self.trusted_certs = config.get("experiment_control", "trusted_certs") \ or config.get("globals", "trusted_certs") self.repodir = config.get("experiment_control", "repodir") self.repo_url = config.get("experiment_control", "repo_url", "https://users.isi.deterlab.net:23235"); self.exp_stem = "fed-stem" self.log = logging.getLogger("fedd.experiment_control") set_log_level(config, "experiment_control", self.log) self.muxmax = 2 self.nthreads = 10 self.randomize_experiments = False self.splitter = None self.ssh_keygen = "/usr/bin/ssh-keygen" self.ssh_identity_file = None self.debug = config.getboolean("experiment_control", "create_debug") self.cleanup = not config.getboolean("experiment_control", "leave_tmpfiles") self.state_filename = config.get("experiment_control", "experiment_state") self.store_filename = config.get("experiment_control", "synch_store") self.store_url = config.get("experiment_control", "store_url") self.splitter_url = config.get("experiment_control", "ns2topdl_uri") self.grouper_url = config.get("experiment_control", "grouper_url") self.fedkit = parse_tarfile_list(\ config.get("experiment_control", "fedkit")) self.gatewaykit = parse_tarfile_list(\ config.get("experiment_control", "gatewaykit")) dt = config.get("experiment_control", "direct_transit") self.auth_type = config.get('experiment_control', 'auth_type') \ or 'legacy' self.auth_dir = config.get('experiment_control', 'auth_dir') self.routing = config.get('experiment_control', 'routing') self.default_tb = config.get('experiment_control', 'default_testbed') # XXX: document this! self.info_cache_limit = \ config.getint('experiment_control', 'info_cache', 600) if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")] else: self.direct_transit = [ ] # NB for internal master/slave ops, not experiment setup self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa") self.overrides = set([]) ovr = config.get('experiment_control', 'overrides') if ovr: for o in ovr.split(","): o = o.strip() if o.startswith('fedid:'): o = o[len('fedid:'):] self.overrides.add(fedid(hexstr=o)) self.state = { } self.state_lock = Lock() self.tclsh = "/usr/local/bin/otclsh" self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \ config.get("experiment_control", "tcl_splitter", "/usr/testbed/lib/ns2ir/parse.tcl") mapdb_file = config.get("experiment_control", "mapdb") self.trace_file = sys.stderr self.def_expstart = \ "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\ "/tmp/federate"; self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\ "FEDDIR/hosts"; self.def_gwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\ "/tmp/bridge.log"; self.def_mgwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\ "/tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; self.local_access = { } if self.auth_type == 'legacy': if auth: self.auth = auth else: self.log.error( "[access]: No authorizer initialized, " +\ "creating local one.") auth = authorizer() self.get_access = self.legacy_get_access elif self.auth_type == 'abac': self.auth = abac_authorizer(load=self.auth_dir, update=os.path.join(self.auth_dir,'update')) else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) if mapdb_file: self.read_mapdb(mapdb_file) else: self.log.warn("[experiment_control] No testbed map, using defaults") self.tbmap = { 'deter':'https://users.isi.deterlab.net:23235', 'emulab':'https://users.isi.deterlab.net:23236', 'ucb':'https://users.isi.deterlab.net:23237', } # Grab saved state. OK to do this w/o locking because it's read only # and only one thread should be in existence that can see self.state at # this point. if self.state_filename: self.read_state() if self.store_filename: self.read_store() else: self.log.warning("No saved synch store") self.synch_store = synch_store # Dispatch tables self.soap_services = {\ 'New': soap_handler('New', self.new_experiment), 'Create': soap_handler('Create', self.create_experiment), 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 'Vis': soap_handler('Vis', self.get_vis), 'Info': soap_handler('Info', self.get_info), 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 'Operation': soap_handler('Operation', self.do_operation), 'Terminate': soap_handler('Terminate', self.terminate_experiment), 'GetValue': soap_handler('GetValue', self.GetValue), 'SetValue': soap_handler('SetValue', self.SetValue), } self.xmlrpc_services = {\ 'New': xmlrpc_handler('New', self.new_experiment), 'Create': xmlrpc_handler('Create', self.create_experiment), 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 'Vis': xmlrpc_handler('Vis', self.get_vis), 'Info': xmlrpc_handler('Info', self.get_info), 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 'Terminate': xmlrpc_handler('Terminate', self.terminate_experiment), 'Operation': xmlrpc_handler('Operation', self.do_operation), 'GetValue': xmlrpc_handler('GetValue', self.GetValue), 'SetValue': xmlrpc_handler('SetValue', self.SetValue), } # Call while holding self.state_lock def write_state(self): """ Write a new copy of experiment state after copying the existing state to a backup. State format is a simple pickling of the state dictionary. """ if os.access(self.state_filename, os.W_OK): copy_file(self.state_filename, \ "%s.bak" % self.state_filename) try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) @staticmethod def get_alloc_ids(exp): """ Used by read_store and read state. This used to be worse. """ return [ a.allocID for a in exp.get_all_allocations() ] # Call while holding self.state_lock def read_state(self): """ Read a new copy of experiment state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except EnvironmentError, e: self.log.warning("[read_state]: No saved state: Can't open %s: %s"\ % (self.state_filename, e)) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) for s in self.state.values(): try: eid = s.fedid if eid : if self.auth_type == 'legacy': # XXX: legacy # Give the owner rights to the experiment #self.auth.set_attribute(s['owner'], eid) # And holders of the eid as well self.auth.set_attribute(eid, eid) # allow overrides to control experiments as well for o in self.overrides: self.auth.set_attribute(o, eid) # Set permissions to allow reading of the software # repo, if any, as well. for a in self.get_alloc_ids(s): self.auth.set_attribute(a, 'repo/%s' % eid) else: raise KeyError("No experiment id") except KeyError, e: self.log.warning("[read_state]: State ownership or identity " +\ "misformatted in %s: %s" % (self.state_filename, e)) def read_mapdb(self, file): """ Read a simple colon separated list of mappings for the label-to-testbed-URL mappings. Clears or creates self.tbmap. also adds testbeds to active if they include , active after their name. """ self.tbmap = { } lineno =0 try: f = open(file, "r") for line in f: lineno += 1 line = line.strip() if line.startswith('#') or len(line) == 0: continue try: label, url = line.split(':', 1) self.tbmap[label] = url except ValueError, e: self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\ "map db: %s %s" % (lineno, line, e)) except EnvironmentError, e: self.log.warning("[read_mapdb]: No saved map database: Can't " +\ "open %s: %s" % (file, e)) else: f.close() def read_store(self): try: self.synch_store = synch_store() self.synch_store.load(self.store_filename) self.log.debug("[read_store]: Read store from %s" % \ self.store_filename) except EnvironmentError, e: self.log.warning("[read_store]: No saved store: Can't open %s: %s"\ % (self.state_filename, e)) self.synch_store = synch_store() # Set the initial permissions on data in the store. XXX: This ad hoc # authorization attribute initialization is getting out of hand. # XXX: legacy if self.auth_type == 'legacy': for k in self.synch_store.all_keys(): try: if k.startswith('fedid:'): fid = fedid(hexstr=k[6:46]) if self.state.has_key(fid): for a in self.get_alloc_ids(self.state[fid]): self.auth.set_attribute(a, k) except ValueError, e: self.log.warn("Cannot deduce permissions for %s" % k) def write_store(self): """ Write a new copy of synch_store after writing current state to a backup. We use the internal synch_store pickle method to avoid incinsistent data. State format is a simple pickling of the store. """ if os.access(self.store_filename, os.W_OK): copy_file(self.store_filename, \ "%s.bak" % self.store_filename) try: self.synch_store.save(self.store_filename) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.store_filename, e)) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) # XXX this may belong somewhere else def get_grouper_updates(self, fid): if self.grouper_url is None: return d = tempfile.mkdtemp() try: fstr = "%s" % fid # XXX locking zipname = os.path.join(d, 'grouper.zip') dest = os.path.join(self.auth_dir, 'update') resp = urlopen('%s?uid=%s' % (self.grouper_url, fstr)) f = open(zipname, 'w') f.write(resp.read()) f.close() zf = zipfile.ZipFile(zipname, 'r') zf.extractall(dest) zf.close() except URLError, e: self.log.error("Cannot connect to grouper: %s" % e) pass finally: shutil.rmtree(d) def remove_dirs(self, dir): """ Remove the directory tree and all files rooted at dir. Log any errors, but continue. """ self.log.debug("[removedirs]: removing %s" % dir) try: for path, dirs, files in os.walk(dir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(dir) except EnvironmentError, e: self.log.error("Error deleting directory tree in %s" % e); @staticmethod def make_temp_certfile(expcert, tmpdir): """ make a protected copy of the access certificate so the experiment controller can act as the experiment principal. mkstemp is the most secure way to do that. The directory should be created by mkdtemp. Return the filename. """ if expcert and tmpdir: try: certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir) f = os.fdopen(certf, 'w') print >> f, expcert f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot create temp cert file?") return certfn else: return None def generate_ssh_keys(self, dest, type="rsa" ): """ Generate a set of keys for the gateways to use to talk. Keys are of type type and are stored in the required dest file. """ valid_types = ("rsa", "dsa") t = type.lower(); if t not in valid_types: raise ValueError cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] try: trace = open("/dev/null", "w") except EnvironmentError: raise service_error(service_error.internal, "Cannot open /dev/null??"); # May raise CalledProcessError self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) rv = call(cmd, stdout=trace, stderr=trace, close_fds=True) if rv != 0: raise service_error(service_error.internal, "Cannot generate nonce ssh keys. %s return code %d" \ % (self.ssh_keygen, rv)) def generate_seer_certs(self, destdir): ''' Create a SEER ca cert and a node cert in destdir/ca.pem and destdir/node.pem respectively. These will be distributed throughout the federated experiment. This routine reports errors via service_errors. ''' openssl = '/usr/bin/openssl' # All the filenames and parameters we need for openssl calls below ca_key =os.path.join(destdir, 'ca.key') ca_pem = os.path.join(destdir, 'ca.pem') node_key =os.path.join(destdir, 'node.key') node_pem = os.path.join(destdir, 'node.pem') node_req = os.path.join(destdir, 'node.req') node_signed = os.path.join(destdir, 'node.signed') days = '%s' % (365 * 10) serial = '%s' % random.randint(0, 1<<16) try: # Sequence of calls to create a CA key, create a ca cert, create a # node key, node signing request, and finally a signed node # certificate. sequence = ( (openssl, 'genrsa', '-out', ca_key, '1024'), (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', ca_pem, '-days', days, '-subj', '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ), (openssl, 'genrsa', '-out', node_key, '1024'), (openssl, 'req', '-new', '-key', node_key, '-out', node_req, '-days', days, '-subj', '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ), (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, '-set_serial', serial, '-req', '-in', node_req, '-out', node_signed, '-days', days), ) # Do all that stuff; bail if there's an error, and push all the # output to dev/null. for cmd in sequence: trace = open("/dev/null", "w") rv = call(cmd, stdout=trace, stderr=trace, close_fds=True) if rv != 0: raise service_error(service_error.internal, "Cannot generate SEER certs. %s return code %d" \ % (' '.join(cmd), rv)) # Concatinate the node key and signed certificate into node.pem f = open(node_pem, 'w') for comp in (node_signed, node_key): g = open(comp, 'r') f.write(g.read()) g.close() f.close() # Throw out intermediaries. for fn in (ca_key, node_key, node_req, node_signed): os.unlink(fn) except EnvironmentError, e: # Any difficulties with the file system wind up here raise service_error(service_error.internal, "File error on %s while creating SEER certs: %s" % \ (e.filename, e.strerror)) def gentopo(self, str): """ Generate the topology data structure from the splitter's XML representation of it. The topology XML looks like: ip1:ip2 node:port """ class topo_parse: """ Parse the topology XML and create the dats structure. """ def __init__(self): # Typing of the subelements for data conversion self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') self.int_subelements = ( 'bandwidth',) self.float_subelements = ( 'delay',) # The final data structure self.nodes = [ ] self.lans = [ ] self.topo = { \ 'node': self.nodes,\ 'lan' : self.lans,\ } self.element = { } # Current element being created self.chars = "" # Last text seen def end_element(self, name): # After each sub element the contents is added to the current # element or to the appropriate list. if name == 'node': self.nodes.append(self.element) self.element = { } elif name == 'lan': self.lans.append(self.element) self.element = { } elif name in self.str_subelements: self.element[name] = self.chars self.chars = "" elif name in self.int_subelements: self.element[name] = int(self.chars) self.chars = "" elif name in self.float_subelements: self.element[name] = float(self.chars) self.chars = "" def found_chars(self, data): self.chars += data.rstrip() tp = topo_parse(); parser = xml.parsers.expat.ParserCreate() parser.EndElementHandler = tp.end_element parser.CharacterDataHandler = tp.found_chars parser.Parse(str) return tp.topo def genviz(self, topo): """ Generate the visualization the virtual topology """ neato = "/usr/local/bin/neato" # These are used to parse neato output and to create the visualization # file. vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"') vis_fmt = "%s%s%s" + \ "%s" try: # Node names nodes = [ n['vname'] for n in topo['node'] ] topo_lans = topo['lan'] except KeyError, e: raise service_error(service_error.internal, "Bad topology: %s" %e) lans = { } links = { } # Walk through the virtual topology, organizing the connections into # 2-node connections (links) and more-than-2-node connections (lans). # When a lan is created, it's added to the list of nodes (there's a # node in the visualization for the lan). for l in topo_lans: if links.has_key(l['vname']): if len(links[l['vname']]) < 2: links[l['vname']].append(l['vnode']) else: nodes.append(l['vname']) lans[l['vname']] = links[l['vname']] del links[l['vname']] lans[l['vname']].append(l['vnode']) elif lans.has_key(l['vname']): lans[l['vname']].append(l['vnode']) else: links[l['vname']] = [ l['vnode'] ] # Open up a temporary file for dot to turn into a visualization try: df, dotname = tempfile.mkstemp() dotfile = os.fdopen(df, 'w') except EnvironmentError: raise service_error(service_error.internal, "Failed to open file in genviz") try: dnull = open('/dev/null', 'w') except EnvironmentError: service_error(service_error.internal, "Failed to open /dev/null in genviz") # Generate a dot/neato input file from the links, nodes and lans try: print >>dotfile, "graph G {" for n in nodes: print >>dotfile, '\t"%s"' % n for l in links.keys(): print >>dotfile, '\t"%s" -- "%s"' % tuple(links[l]) for l in lans.keys(): for n in lans[l]: print >>dotfile, '\t "%s" -- "%s"' % (n,l) print >>dotfile, "}" dotfile.close() except TypeError: raise service_error(service_error.internal, "Single endpoint link in vtopo") except EnvironmentError: raise service_error(service_error.internal, "Cannot write dot file") # Use dot to create a visualization try: dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE, stderr=dnull, close_fds=True) except EnvironmentError: raise service_error(service_error.internal, "Cannot generate visualization: is graphviz available?") dnull.close() # Translate dot to vis format vis_nodes = [ ] vis = { 'node': vis_nodes } for line in dot.stdout: m = vis_re.match(line) if m: vn = m.group(1) vis_node = {'name': vn, \ 'x': float(m.group(2)),\ 'y' : float(m.group(3)),\ } if vn in links.keys() or vn in lans.keys(): vis_node['type'] = 'lan' else: vis_node['type'] = 'node' vis_nodes.append(vis_node) rv = dot.wait() os.remove(dotname) # XXX: graphviz seems to use low return codes for warnings, like # "couldn't find font" if rv < 2 : return vis else: return None def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None, cert_pwd=None): """ Release access to testbed through fedd """ if not uri and tbmap: uri = tbmap.get(tb, None) if not uri: raise service_error(service_error.server_config, "Unknown testbed: %s" % tb) if self.local_access.has_key(uri): resp = self.local_access[uri].ReleaseAccess(\ { 'ReleaseAccessRequestBody' : {'allocID': {'fedid': aid}},}, fedid(file=cert_file)) resp = { 'ReleaseAccessResponseBody': resp } else: resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} }, cert_file, cert_pwd, self.trusted_certs) # better error coding def remote_ns2topdl(self, uri, desc): req = { 'description' : { 'ns2description': desc }, } r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('Ns2TopdlResponseBody'): r = r['Ns2TopdlResponseBody'] ed = r.get('experimentdescription', None) if ed.has_key('topdldescription'): return topdl.Topology(**ed['topdldescription']) else: raise service_error(service_error.protocol, "Bad splitter response (no output)") else: raise service_error(service_error.protocol, "Bad splitter response") class start_segment: def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None, log_collector=None): self.log = log self.debug = debug self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = None self.caller = caller self.testbed = testbed self.log_collector = log_collector self.response = None self.node = { } self.subs = { } self.tb = { } self.proof = None def make_map(self, resp): if 'segmentdescription' not in resp or \ 'topdldescription' not in resp['segmentdescription']: self.log.warn('No topology returned from startsegment') return top = topdl.Topology( **resp['segmentdescription']['topdldescription']) for e in top.elements: if isinstance(e, topdl.Computer): self.node[e.name] = e elif isinstance(e, topdl.Testbed): self.tb[e.uri] = e for s in top.substrates: self.subs[s.name] = s def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None): req = { 'allocID': { 'fedid' : aid }, 'segmentdescription': { 'topdldescription': topo.to_dict(), }, } if connInfo: req['connection'] = connInfo import_svcs = [ s for m in masters.values() \ for s in m if self.testbed in s.importers] if import_svcs or self.testbed in masters: req['service'] = [] for s in import_svcs: for r in s.reqs: sr = copy.deepcopy(r) sr['visibility'] = 'import'; req['service'].append(sr) for s in masters.get(self.testbed, []): for r in s.reqs: sr = copy.deepcopy(r) sr['visibility'] = 'export'; req['service'].append(sr) if attrs: req['fedAttr'] = attrs try: self.log.debug("Calling StartSegment at %s " % uri) r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('StartSegmentResponseBody'): lval = r['StartSegmentResponseBody'].get('allocationLog', None) if lval and self.log_collector: for line in lval.splitlines(True): self.log_collector.write(line) self.make_map(r['StartSegmentResponseBody']) if 'proof' in r: self.proof = r['proof'] self.response = r else: raise service_error(service_error.internal, "Bad response!?: %s" %r) return True except service_error, e: self.log.error("Start segment failed on %s: %s" % \ (self.testbed, e)) return False class terminate_segment: def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None): self.log = log self.debug = debug self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = None self.caller = caller self.testbed = testbed def __call__(self, uri, aid ): req = { 'allocID': {'fedid': aid }, } self.log.info("Calling terminate segment") try: r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) self.log.info("Terminate segment succeeded") return True except service_error, e: self.log.error("Terminate segment failed on %s: %s" % \ (self.testbed, e)) return False class info_segment(start_segment): def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None, log_collector=None): experiment_control_local.start_segment.__init__(self, debug, log, testbed, cert_file, cert_pwd, trusted_certs, caller, log_collector) def __call__(self, uri, aid): req = { 'allocID': { 'fedid' : aid } } try: self.log.debug("Calling InfoSegment at %s " % uri) r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('InfoSegmentResponseBody'): self.make_map(r['InfoSegmentResponseBody']) if 'proof' in r: self.proof = r['proof'] self.response = r else: raise service_error(service_error.internal, "Bad response!?: %s" %r) return True except service_error, e: self.log.error("Info segment failed on %s: %s" % \ (self.testbed, e)) return False class operation_segment: def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None, log_collector=None): self.log = log self.debug = debug self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = None self.caller = caller self.testbed = testbed self.status = None def __call__(self, uri, aid, op, targets, params): req = { 'allocID': { 'fedid' : aid }, 'operation': op, 'target': targets, } if params: req['parameter'] = params try: self.log.debug("Calling OperationSegment at %s " % uri) r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if 'OperationSegmentResponseBody' in r: r = r['OperationSegmentResponseBody'] if 'status' in r: self.status = r['status'] else: raise service_error(service_error.internal, "Bad response!?: %s" %r) return True except service_error, e: self.log.error("Operation segment failed on %s: %s" % \ (self.testbed, e)) return False def annotate_topology(self, top, data): # These routines do various parts of the annotation def add_new_names(nl, l): """ add any names in nl to the list in l """ for n in nl: if n not in l: l.append(n) def merge_services(ne, e): for ns in ne.service: # NB: the else is on the for for s in e.service: if ns.name == s.name: s.importer = ns.importer s.param = ns.param s.description = ns.description s.status = ns.status break else: e.service.append(ns) def merge_oses(ne, e): """ Merge the operating system entries of ne into e """ for nos in ne.os: # NB: the else is on the for for os in e.os: if nos.name == os.name: os.version = nos.version os.version = nos.distribution os.version = nos.distributionversion for a in nos.attribute: if os.get_attribute(a.attribute): os.remove_attribute(a.attribute) os.set_attribute(a.attribute, a.value) break else: # If both nodes have one OS, this is a replacement if len(ne.os) == 1 and len(e.os) == 1: e.os = ne.os else: e.os.append(nos) # Annotate the topology with embedding info for e in top.elements: if isinstance(e, topdl.Computer): for s in data: ne = s.node.get(e.name, None) if ne is not None: add_new_names(ne.localname, e.localname) e.status = ne.status merge_services(ne, e) add_new_names(ne.operation, e.operation) if ne.os: merge_oses(ne, e) break elif isinstance(e,topdl.Testbed): for s in data: ne = s.tb.get(e.uri, None) if ne is not None: add_new_names(ne.localname, e.localname) add_new_names(ne.operation, e.operation) merge_services(ne, e) for a in ne.attribute: e.set_attribute(a.attribute, a.value) # Annotate substrates for s in top.substrates: for d in data: ss = d.subs.get(s.name, None) if ss is not None: if ss.capacity is not None: s.capacity = ss.capacity if s.latency is not None: s.latency = ss.latency def allocate_resources(self, allocated, masters, eid, expid, tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, attrs=None, connInfo={}, tbmap=None, expcert=None): started = { } # Testbeds where a sub-experiment started # successfully # XXX fail_soft = False if tbmap is None: tbmap = { } log = alloc_log or self.log tp = thread_pool(self.nthreads) threads = [ ] starters = [ ] if expcert: cert = expcert pw = None else: cert = self.cert_file pw = self.cert_pwd for tb in allocated.keys(): # Create and start a thread to start the segment, and save it # to get the return value later tb_attrs = copy.copy(attrs) tp.wait_for_slot() uri = tbparams[tb].uri or tbmap.get(testbed_base(tb), None) base, suffix = split_testbed(tb) if suffix: tb_attrs.append({'attribute': 'experiment_name', 'value': "%s-%s" % (eid, suffix)}) else: tb_attrs.append({'attribute': 'experiment_name', 'value': eid}) if not uri: raise service_error(service_error.internal, "Unknown testbed %s !?" % tb) aid = tbparams[tb].allocID if not aid: raise service_error(service_error.internal, "No alloc id for testbed %s !?" % tb) s = self.start_segment(log=log, debug=self.debug, testbed=tb, cert_file=cert, cert_pwd=pw, trusted_certs=self.trusted_certs, caller=self.call_StartSegment, log_collector=log_collector) starters.append(s) t = pooled_thread(\ target=s, name=tb, args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]), pdata=tp, trace_file=self.trace_file) threads.append(t) t.start() # Wait until all finish (keep pinging the log, though) mins = 0 revoked = False while not tp.wait_for_all_done(60.0): mins += 1 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ % mins) if not revoked and \ len([ t.getName() for t in threads if t.rv == False]) > 0: # a testbed has failed. Revoke this experiment's # synchronizarion values so that sub experiments will not # deadlock waiting for synchronization that will never happen self.log.info("A subexperiment has failed to swap in, " + \ "revoking synch keys") var_key = "fedid:%s" % expid for k in self.synch_store.all_keys(): if len(k) > 45 and k[0:46] == var_key: self.synch_store.revoke_key(k) revoked = True failed = [ t.getName() for t in threads if not t.rv ] succeeded = [tb for tb in allocated.keys() if tb not in failed] # If one failed clean up, unless fail_soft is set if failed: if not fail_soft: tp.clear() for tb in succeeded: # Create and start a thread to stop the segment tp.wait_for_slot() uri = tbparams[tb].uri t = pooled_thread(\ target=self.terminate_segment(log=log, testbed=tb, cert_file=cert, cert_pwd=pw, trusted_certs=self.trusted_certs, caller=self.call_TerminateSegment), args=(uri, tbparams[tb].allocID), name=tb, pdata=tp, trace_file=self.trace_file) t.start() # Wait until all finish (if any are being stopped) if succeeded: tp.wait_for_all_done() # release the allocations for tb in tbparams.keys(): try: self.release_access(tb, tbparams[tb].allocID, tbmap=tbmap, uri=tbparams[tb].uri, cert_file=cert, cert_pwd=pw) except service_error, e: self.log.warn("Error releasing access: %s" % e.desc) # Remove the placeholder self.state_lock.acquire() self.state[eid].status = 'failed' self.state[eid].updated() if self.state_filename: self.write_state() self.state_lock.release() # Remove the repo dir self.remove_dirs("%s/%s" %(self.repodir, expid)) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: log.debug("[start_experiment]: not removing %s" % tmpdir) log.error("Swap in failed on %s" % ",".join(failed)) return else: # Walk through the successes and gather the proofs proofs = { } for s in starters: if s.proof: proofs[s.testbed] = s.proof self.annotate_topology(top, starters) log.info("[start_segment]: Experiment %s active" % eid) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: log.debug("[start_experiment]: not removing %s" % tmpdir) # Insert the experiment into our state and update the disk copy. self.state_lock.acquire() self.state[expid].status = 'active' self.state[eid] = self.state[expid] self.state[eid].top = top self.state[eid].updated() # Append startup proofs for f in self.state[eid].get_all_allocations(): if f.tb in proofs: f.proof.append(proofs[f.tb]) if self.state_filename: self.write_state() self.state_lock.release() return def add_kit(self, e, kit): """ Add a Software object created from the list of (install, location) tuples passed as kit to the software attribute of an object e. We do this enough to break out the code, but it's kind of a hack to avoid changing the old tuple rep. """ s = [ topdl.Software(install=i, location=l) for i, l in kit] if isinstance(e.software, list): e.software.extend(s) else: e.software = s def append_experiment_authorization(self, expid, attrs, need_state_lock=True): """ Append the authorization information to system state """ for p, a in attrs: if p and a: # Improperly configured overrides can add bad principals. self.auth.set_attribute(p, a) else: self.log.debug("Bad append experiment_authorization %s %s" \ % (p, a)) self.auth.save() if need_state_lock: self.state_lock.acquire() # XXX: really a no op? #self.state[expid]['auth'].update(attrs) if self.state_filename: self.write_state() if need_state_lock: self.state_lock.release() def clear_experiment_authorization(self, expid, need_state_lock=True): """ Attrs is a set of attribute principal pairs that need to be removed from the authenticator. Remove them and save the authenticator. """ if need_state_lock: self.state_lock.acquire() # XXX: should be a no-op #if expid in self.state and 'auth' in self.state[expid]: #for p, a in self.state[expid]['auth']: #self.auth.unset_attribute(p, a) #self.state[expid]['auth'] = set() if self.state_filename: self.write_state() if need_state_lock: self.state_lock.release() self.auth.save() def create_experiment_state(self, fid, req, expid, expcert, state='starting'): """ Create the initial entry in the experiment's state. The expid and expcert are the experiment's fedid and certifacte that represents that ID, which are installed in the experiment state. If the request includes a suggested local name that is used if possible. If the local name is already taken by an experiment owned by this user that has failed, it is overwritten. Otherwise new letters are added until a valid localname is found. The generated local name is returned. """ if req.has_key('experimentID') and \ req['experimentID'].has_key('localname'): overwrite = False eid = req['experimentID']['localname'] # If there's an old failed experiment here with the same local name # and accessible by this user, we'll overwrite it, otherwise we'll # fall through and do the collision avoidance. old_expid = self.get_experiment_fedid(eid) if old_expid: users_experiment = True try: self.check_experiment_access(fid, old_expid) except service_error, e: if e.code == service_error.access: users_experiment = False else: raise e if users_experiment: self.state_lock.acquire() status = self.state[eid].status if status and status == 'failed': # remove the old access attributes self.clear_experiment_authorization(eid, need_state_lock=False) overwrite = True del self.state[eid] del self.state[old_expid] self.state_lock.release() else: self.log.info('Experiment %s exists, ' % eid + \ 'but this user cannot access it') self.state_lock.acquire() while (self.state.has_key(eid) and not overwrite): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = experiment_info(fedid=expid, localname=eid, identity=expcert) self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() else: eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state_lock.acquire() while (self.state.has_key(eid)): eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = experiment_info(fedid=expid, localname=eid, identity=expcert) self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() # Let users touch the state. Authorize this fid and the expid itself # to touch the experiment, as well as allowing th eoverrides. self.append_experiment_authorization(eid, set([(fid, expid), (expid,expid)] + \ [ (o, expid) for o in self.overrides])) return eid def allocate_ips_to_topo(self, top): """ Add an ip4_address attribute to all the hosts in the topology that have not already been assigned one by the user, based on the shared substrates on which they sit. An /etc/hosts file is also created and returned as a list of hostfiles entries. We also return the allocator, because we may need to allocate IPs to portals (specifically DRAGON portals). """ subs = sorted(top.substrates, cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), reverse=True) ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) ifs = { } hosts = [ ] assigned = { } assigned_subs = set() # Look for IP networks that were already assigned to the experiment by # the user. We assume that users are smart in making that assignment, # but we check them later. for e in top.elements: if not isinstance(e, topdl.Computer): continue for inf in e.interface: a = inf.get_attribute('ip4_address') if not a: continue for s in inf.substrate: assigned_subs.add(s) a = ip_addr(a) n = ip_addr(inf.get_attribute('ip4_netmask')) sz = 0x100000000 - long(n) net = (long(a) & long(n)) & 0xffffffff if net in assigned: if sz > assigned[net]: assigned[net] = sz assigned[net] = sz # XXX: hack attack - always avoid 10.0.23.0 which is special on DETER assigned[long(ip_addr('10.0.23.0'))] = 256 # end of hack # Walk all substrates, smallest to largest, assigning IP addresses to # the unassigned and checking the assigned. Build an /etc/hosts file # as well. for idx, s in enumerate(subs): if s.name not in assigned_subs: net_size = len(s.interfaces)+2 # Get an ip allocation for this substrate. Make sure the # allocation does not collide with any user allocations. ok = False while not ok: a = ips.allocate(net_size) if not a: raise service_error(service_error.req, "Cannot allocate IP addresses") base, num = a if num < net_size: raise service_error(service_error.internal, "Allocator returned wrong number of IPs??") # Check for overlap. An assigned network could contain an # endpoint of the new allocation or the new allocation # could contain an endpoint of an assigned network. # NB the else is attached to the for - if the loop is not # exited by a break, ok = True. for n, sz in assigned.items(): if base >= n and base < n + sz: ok = False break if base + num > n and base + num < n + sz: ok = False break if n >= base and n < base + num: ok = False break if n+sz > base and n+sz < base + num: ok = False break else: ok = True mask = ips.min_alloc while mask < net_size: mask *= 2 netmask = ((2**32-1) ^ (mask-1)) else: base = 0 # Add the attributes and build hosts base += 1 for i in s.interfaces: # Add address and masks to interfaces on unassigned substrates if s.name not in assigned_subs: i.attribute.append( topdl.Attribute('ip4_address', "%s" % ip_addr(base))) i.attribute.append( topdl.Attribute('ip4_netmask', "%s" % ip_addr(int(netmask)))) # Make /etc/hosts entries addr = i.get_attribute('ip4_address') if addr is None: raise service_error(service_error.req, "Partially assigned substrate %s" %s.name) hname = i.element.name if hname in ifs: hosts.append("%s\t%s-%s %s-%d" % \ (addr, hname, s.name, hname, ifs[hname])) else: # First IP allocated it the default ip according to hosts, # so the extra alias is here. ifs[hname] = 0 hosts.append("%s\t%s-%s %s-%d %s" % \ (addr, hname, s.name, hname, ifs[hname], hname)) ifs[hname] += 1 base += 1 return hosts, ips def get_access_to_testbeds(self, testbeds, fid, allocated, tbparam, masters, tbmap, expid=None, expcert=None): for tb in testbeds: self.log.debug('Trying testbed %s' % tb) self.get_access(tb, tbparam, fid, masters, tbmap, expid, expcert) allocated[tb] = 1 self.log.debug('Got testbed %s' % tb) def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, expcert=None): """ Get access to testbed through fedd and set the parameters for that tb """ def get_export_project(svcs): """ Look through for the list of federated_service for this testbed objects for a project_export service, and extract the project parameter. """ pe = [s for s in svcs if s.name=='project_export'] if len(pe) == 1: return pe[0].params.get('project', None) elif len(pe) == 0: return None else: raise service_error(service_error.req, "More than one project export is not supported") def add_services(svcs, type, slist, keys): """ Add the given services to slist. type is import or export. Also add a mapping entry from the assigned id to the original service record. """ for i, s in enumerate(svcs): idx = '%s%d' % (type, i) keys[idx] = s sr = {'id': idx, 'name': s.name, 'visibility': type } if s.params: sr['fedAttr'] = [ { 'attribute': k, 'value': v } \ for k, v in s.params.items()] slist.append(sr) uri = tbmap.get(testbed_base(tb), None) if not uri: raise service_error(service_error.server_config, "Unknown testbed: %s" % tb) export_svcs = masters.get(tb,[]) import_svcs = [ s for m in masters.values() \ for s in m \ if tb in s.importers ] export_project = get_export_project(export_svcs) # Compose the credential list so that IDs come before attributes creds = set() keys = set() certs = self.auth.get_creds_for_principal(fid) # Append credenials about this experiment controller - e.g. that it is # trusted. certs.update(self.auth.get_creds_for_principal( fedid(file=self.cert_file))) if expid: certs.update(self.auth.get_creds_for_principal(expid)) for c in certs: keys.add(c.issuer_cert()) creds.add(c.attribute_cert()) creds = list(keys) + list(creds) if expcert: cert, pw = expcert, None else: cert, pw = self.cert_file, self.cert_pw # Request credentials req = { 'abac_credential': creds, } # Make the service request from the services we're importing and # exporting. Keep track of the export request ids so we can # collect the resulting info from the access response. e_keys = { } if import_svcs or export_svcs: slist = [] add_services(import_svcs, 'import', slist, e_keys) add_services(export_svcs, 'export', slist, e_keys) req['service'] = slist if self.local_access.has_key(uri): # Local access call req = { 'RequestAccessRequestBody' : req } r = self.local_access[uri].RequestAccess(req, fedid(file=self.cert_file)) r = { 'RequestAccessResponseBody' : r } else: r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs) if r.has_key('RequestAccessResponseBody'): # Through to here we have a valid response, not a fault. # Access denied is a fault, so something better or worse than # access denied has happened. r = r['RequestAccessResponseBody'] self.log.debug("[get_access] Access granted") else: raise service_error(service_error.protocol, "Bad proxy response") if 'proof' not in r: raise service_error(service_error.protocol, "Bad access response (no access proof)") tbparam[tb] = allocation_info(allocID=r['allocID'].get('fedid', None), tb=tb, uri=uri, proof=[r['proof']], services=masters.get(tb, None)) # Collect the responses corresponding to the services this testbed # exports. These will be the service requests that we will include in # the start segment requests (with appropriate visibility values) to # import and export the segments. for s in r.get('service', []): id = s.get('id', None) # Note that this attaches the response to the object in the masters # data structure. (The e_keys index disappears when this fcn # returns) if id and id in e_keys: e_keys[id].reqs.append(s) # Add attributes to parameter space. We don't allow attributes to # overlay any parameters already installed. for a in r.get('fedAttr', []): try: if a['attribute']: tbparam[tb].set_attribute(a['attribute'], a['value']) except KeyError: self.log.error("Bad attribute in response: %s" % a) def split_topology(self, top, topo, testbeds): """ Create the sub-topologies that are needed for experiment instantiation. """ for tb in testbeds: topo[tb] = top.clone() # copy in for loop allows deletions from the original for e in [ e for e in topo[tb].elements]: etb = e.get_attribute('testbed') # NB: elements without a testbed attribute won't appear in any # sub topologies. if not etb or etb != tb: for i in e.interface: for s in i.subs: try: s.interfaces.remove(i) except ValueError: raise service_error(service_error.internal, "Can't remove interface??") topo[tb].elements.remove(e) topo[tb].make_indices() def confirm_software(self, top): """ Make sure that the software to be loaded in the topo is all available before we begin making access requests, etc. This is a subset of wrangle_software. """ pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ]) pkgs.update([x.location for e in top.elements for x in e.software]) for pkg in pkgs: loc = pkg scheme, host, path = urlparse(loc)[0:3] dest = os.path.basename(path) if not scheme: if not loc.startswith('/'): loc = "/%s" % loc loc = "file://%s" %loc # NB: if scheme was found, loc == pkg try: u = urlopen(loc) u.close() except Exception, e: raise service_error(service_error.req, "Cannot open %s: %s" % (loc, e)) return True def wrangle_software(self, expid, top, topo, tbparams): """ Copy software out to the repository directory, allocate permissions and rewrite the segment topologies to look for the software in local places. """ # Copy the rpms and tarfiles to a distribution directory from # which the federants can retrieve them linkpath = "%s/software" % expid softdir ="%s/%s" % ( self.repodir, linkpath) softmap = { } # self.fedkit and self.gateway kit are lists of tuples of # (install_location, download_location) this extracts the download # locations. pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ]) pkgs.update([x.location for e in top.elements for x in e.software]) try: os.makedirs(softdir) except EnvironmentError, e: raise service_error( "Cannot create software directory: %s" % e) # The actual copying. Everything's converted into a url for copying. auth_attrs = set() for pkg in pkgs: loc = pkg scheme, host, path = urlparse(loc)[0:3] dest = os.path.basename(path) if not scheme: if not loc.startswith('/'): loc = "/%s" % loc loc = "file://%s" %loc # NB: if scheme was found, loc == pkg try: u = urlopen(loc) except Exception, e: raise service_error(service_error.req, "Cannot open %s: %s" % (loc, e)) try: f = open("%s/%s" % (softdir, dest) , "w") self.log.debug("Writing %s/%s" % (softdir,dest) ) data = u.read(4096) while data: f.write(data) data = u.read(4096) f.close() u.close() except Exception, e: raise service_error(service_error.internal, "Could not copy %s: %s" % (loc, e)) path = re.sub("/tmp", "", linkpath) # XXX softmap[pkg] = \ "%s/%s/%s" %\ ( self.repo_url, path, dest) # Allow the individual segments to access the software by assigning # an attribute to each testbed allocation that encodes the data to # be released. This expression collects the data for each run of # the loop. auth_attrs.update([ (tbparams[tb].allocID, "/%s/%s" % ( path, dest)) \ for tb in tbparams.keys()]) self.append_experiment_authorization(expid, auth_attrs) # Convert the software locations in the segments into the local # copies on this host for soft in [ s for tb in topo.values() \ for e in tb.elements \ if getattr(e, 'software', False) \ for s in e.software ]: if softmap.has_key(soft.location): soft.location = softmap[soft.location] def new_experiment(self, req, fid): """ The external interface to empty initial experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the avrious subsections using the lcasses above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ self.log.info("New experiment call started for %s" % fid) req = req.get('NewRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no NewRequestBody)") # import may partially succeed so always save credentials and warn if not self.auth.import_credentials(data_list=req.get('credential', [])): self.log.debug("Failed to import delegation credentials(!)") self.get_grouper_updates(fid) self.auth.update() self.auth.save() try: access_ok, proof = self.auth.check_attribute(fid, 'new', with_proof=True) except service_error, e: self.log.info("New experiment call for %s: access denied" % fid) raise e if not access_ok: self.log.info("New experiment call for %s: Access denied" % fid) raise service_error(service_error.access, "New access denied", proof=[proof]) try: tmpdir = tempfile.mkdtemp(prefix="split-") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") # Generate an ID for the experiment (slice) and a certificate that the # allocator can use to prove they own it. We'll ship it back through # the encrypted connection. If the requester supplied one, use it. if 'experimentAccess' in req and 'X509' in req['experimentAccess']: expcert = req['experimentAccess']['X509'] expid = fedid(certstr=expcert) self.state_lock.acquire() if expid in self.state: self.state_lock.release() raise service_error(service_error.req, 'fedid %s identifies an existing experiment' % expid) self.state_lock.release() else: (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) #now we're done with the tmpdir, and it should be empty if self.cleanup: self.log.debug("[new_experiment]: removing %s" % tmpdir) os.rmdir(tmpdir) else: self.log.debug("[new_experiment]: not removing %s" % tmpdir) eid = self.create_experiment_state(fid, req, expid, expcert, state='empty') rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'empty', 'experimentAccess': { 'X509' : expcert }, 'proof': proof.to_dict(), } self.log.info("New experiment call succeeded for %s" % fid) return rv # create_experiment sub-functions @staticmethod def get_experiment_key(req, field='experimentID'): """ Parse the experiment identifiers out of the request (the request body tag has been removed). Specifically this pulls either the fedid or the localname out of the experimentID field. A fedid is preferred. If neither is present or the request does not contain the fields, service_errors are raised. """ # Get the experiment access exp = req.get(field, None) if exp: if exp.has_key('fedid'): key = exp['fedid'] elif exp.has_key('localname'): key = exp['localname'] else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") return key def get_experiment_ids_and_start(self, key, tmpdir): """ Get the experiment name, id and access certificate from the state, and set the experiment state to 'starting'. returns a triple (fedid, localname, access_cert_file). The access_cert_file is a copy of the contents of the access certificate, created in the tempdir with restricted permissions. If things are confused, raise an exception. """ expid = eid = None self.state_lock.acquire() if key in self.state: exp = self.state[key] exp.status = "starting" exp.updated() expid = exp.fedid eid = exp.localname expcert = exp.identity self.state_lock.release() # make a protected copy of the access certificate so the experiment # controller can act as the experiment principal. if expcert: expcert_file = self.make_temp_certfile(expcert, tmpdir) if not expcert_file: raise service_error(service_error.internal, "Cannot create temp cert file?") else: expcert_file = None return (eid, expid, expcert_file) def get_topology(self, req, tmpdir): """ Get the ns2 content and put it into a file for parsing. Call the local or remote parser and return the topdl.Topology. Errors result in exceptions. req is the request and tmpdir is a work directory. """ # The tcl parser needs to read a file so put the content into that file descr=req.get('experimentdescription', None) if descr: if 'ns2description' in descr: file_content=descr['ns2description'] elif 'topdldescription' in descr: return topdl.Topology(**descr['topdldescription']) else: raise service_error(service_error.req, 'Unknown experiment description type') else: raise service_error(service_error.req, "No experiment description") if self.splitter_url: self.log.debug("Calling remote topdl translator at %s" % \ self.splitter_url) top = self.remote_ns2topdl(self.splitter_url, file_content) else: tclfile = os.path.join(tmpdir, "experiment.tcl") if file_content: try: f = open(tclfile, 'w') f.write(file_content) f.close() except EnvironmentError: raise service_error(service_error.internal, "Cannot write temp experiment description") else: raise service_error(service_error.req, "Only ns2descriptions supported") pid = "dummy" gid = "dummy" eid = "dummy" tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', str(self.muxmax), '-m', 'dummy'] tclcmd.extend([pid, gid, eid, tclfile]) self.log.debug("running local splitter %s", " ".join(tclcmd)) # This is just fantastic. As a side effect the parser copies # tb_compat.tcl into the current directory, so that directory # must be writable by the fedd user. Doing this in the # temporary subdir ensures this is the case. tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, cwd=tmpdir) split_data = tclparser.stdout top = topdl.topology_from_xml(file=split_data, top="experiment") os.remove(tclfile) return top def get_testbed_services(self, req, testbeds): """ Parse the services section of the request into two dicts mapping testbed to lists of federated_service objects. The first dict maps all exporters of services to those service objects, the second maps testbeds to service objects only for services requiring portals. """ # Sanity check the services. Exports or imports from unknown testbeds # cause an exception. for s in req.get('service', []): for t in s.get('import', []): if t not in testbeds: raise service_error(service_error.req, 'Service import to unknown testbed: %s' %t) for t in s.get('export', []): if t not in testbeds: raise service_error(service_error.req, 'Service export from unknown testbed: %s' %t) # We construct both dicts here because deriving the second is more # complex than it looks - both the keys and lists can differ, and it's # much easier to generate both in one pass. masters = { } pmasters = { } for s in req.get('service', []): # If this is a service request with the importall field # set, fill it out. if s.get('importall', False): s['import'] = [ tb for tb in testbeds \ if tb not in s.get('export',[])] del s['importall'] # Add the service to masters for tb in s.get('export', []): if s.get('name', None): params = { } for a in s.get('fedAttr', []): params[a.get('attribute', '')] = a.get('value','') fser = federated_service(name=s['name'], exporter=tb, importers=s.get('import',[]), params=params) if fser.name == 'hide_hosts' \ and 'hosts' not in fser.params: fser.params['hosts'] = \ ",".join(tb_hosts.get(fser.exporter, [])) if tb in masters: masters[tb].append(fser) else: masters[tb] = [fser] if fser.portal: if tb in pmasters: pmasters[tb].append(fser) else: pmasters[tb] = [fser] else: self.log.error('Testbed service does not have name " + \ "and importers') return masters, pmasters def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams): """ Create the ssh keys necessary for interconnecting the portal nodes and the global hosts file for letting each segment know about the IP addresses in play. If we have computed static routes, copy them into the repo. Save these into the repo. Add attributes to the autorizer allowing access controllers to download them and return a set of attributes that inform the segments where to find this stuff. May raise service_errors in if there are problems. """ gw_pubkey_base = "fed.%s.pub" % self.ssh_type gw_secretkey_base = "fed.%s" % self.ssh_type keydir = os.path.join(tmpdir, 'keys') gw_pubkey = os.path.join(keydir, gw_pubkey_base) gw_secretkey = os.path.join(keydir, gw_secretkey_base) route = os.path.join(tmpdir, 'route.tgz') try: self.generate_ssh_keys(gw_secretkey, self.ssh_type) except ValueError: raise service_error(service_error.server_config, "Bad key type (%s)" % self.ssh_type) self.generate_seer_certs(keydir) # Copy configuration files into the remote file store # The config urlpath configpath = "/%s/config" % expid # The config file system location configdir ="%s%s" % ( self.repodir, configpath) route_conf = os.path.join(configdir, 'route.tgz') try: os.makedirs(configdir) except EnvironmentError, e: raise service_error(service_error.internal, "Cannot create config directory: %s" % e) try: f = open("%s/hosts" % configdir, "w") print >> f, string.join(hosts, '\n') f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot write hosts file: %s" % e) try: if os.path.exists(route): copy_file(route, route_conf) copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base)) copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base)) copy_file(os.path.join(keydir, 'ca.pem'), os.path.join(configdir, 'ca.pem')) copy_file(os.path.join(keydir, 'node.pem'), os.path.join(configdir, 'node.pem')) except EnvironmentError, e: raise service_error(service_error.internal, "Cannot copy keyfiles: %s" % e) # Allow the individual testbeds to access the configuration files, # again by setting an attribute for the relevant pathnames on each # allocation principal. Yeah, that's a long list comprehension. self.append_experiment_authorization(expid, set([ (tbparams[tb].allocID, "%s/%s" % (configpath, f)) \ for tb in tbparams.keys() \ for f in ("hosts", 'ca.pem', 'node.pem', 'route.tgz', gw_secretkey_base, gw_pubkey_base)])) attrs = [ { 'attribute': 'ssh_pubkey', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, gw_pubkey_base) }, { 'attribute': 'ssh_secretkey', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, gw_secretkey_base) }, { 'attribute': 'hosts', 'value': '%s/%s/config/hosts' % \ (self.repo_url, expid) }, { 'attribute': 'seer_ca_pem', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, 'ca.pem') }, { 'attribute': 'seer_node_pem', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, 'node.pem') }, ] # Add info about static routes if we have some if os.path.exists(route_conf): attrs.append( { 'attribute': 'route.tgz', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, 'route.tgz') } ) return attrs def get_vtopo(self, req, fid): """ Return the stored virtual topology for this experiment """ rv = None state = None self.log.info("vtopo call started for %s" % fid) req = req.get('VtopoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VtopoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") try: proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info("vtopo call failed for %s: access denied" % fid) raise e self.state_lock.acquire() # XXX: this needs to be recalculated if key in self.state: if self.state[key].top is not None: vtopo = topdl.topology_to_vtopo(self.state[key].top) rv = { 'experiment' : {keytype: key }, 'vtopo': vtopo, 'proof': proof.to_dict(), } else: state = self.state[key].status self.state_lock.release() if rv: self.log.info("vtopo call completed for %s %s " % \ (key, fid)) return rv else: if state: self.log.info("vtopo call completed for %s %s (Not ready)" % \ (key, fid)) raise service_error(service_error.partial, "Not ready: %s" % state) else: self.log.info("vtopo call completed for %s %s (No experiment)"\ % (key, fid)) raise service_error(service_error.req, "No such experiment") def get_vis(self, req, fid): """ Return the stored visualization for this experiment """ rv = None state = None self.log.info("vis call started for %s" % fid) req = req.get('VisRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VisRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") try: proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info("vis call failed for %s: access denied" % fid) raise e self.state_lock.acquire() # Generate the visualization if key in self.state: if self.state[key].top is not None: try: vis = self.genviz( topdl.topology_to_vtopo(self.state[key].top)) except service_error, e: self.state_lock.release() raise e rv = { 'experiment' : {keytype: key }, 'vis': vis, 'proof': proof.to_dict(), } else: state = self.state[key].status self.state_lock.release() if rv: self.log.info("vis call completed for %s %s " % \ (key, fid)) return rv else: if state: self.log.info("vis call completed for %s %s (not ready)" % \ (key, fid)) raise service_error(service_error.partial, "Not ready: %s" % state) else: self.log.info("vis call completed for %s %s (no experiment)" % \ (key, fid)) raise service_error(service_error.req, "No such experiment") @staticmethod def needs_route_computation(req): ''' Walk the request services looking for a static routing request ''' for s in req.get('service', []): if s.get('name', '') == 'static_routing': return True return False def compute_static_routes(self, tmpdir, top): ''' Compute a set of static routes for the topology. The result is a file called route.tgz in tmpdir that contains a dinrectory, route, with a file per node in the topology that lists the prefix and router for all the routes in that node's routing table. Exceptions from the route calculation program and the tar creation call are not caught. ''' if self.routing is None: raise service_error(service_error.server, 'Cannot provide staticroutes, no routing program specified') rg = tempfile.NamedTemporaryFile() outdir = os.path.join(tmpdir, 'route') tarfile = os.path.join(tmpdir, 'route.tgz') topology_to_route_file(top, file=rg) os.mkdir(outdir) try: for cmd in ( [self.routing, '--input', rg.name,'--output', outdir], ['tar', '-C', tmpdir, '-czf', tarfile, 'route']): subprocess.check_call(cmd) except subprocess.CalledProcessError, e: raise service_error(service_error.internal, 'Cannot call %s: %s' % (' '.join(cmd), e.returncode)) rg.close() shutil.rmtree(outdir) def save_federant_information(self, allocated, tbparams, eid, top): """ Store the various data that have changed in the experiment state between when it was started and the beginning of resource allocation. This is basically the information about each local allocation. This fills in the values of the placeholder allocation in the state. It also collects the access proofs and returns them as dicts for a response message. """ self.state_lock.acquire() exp = self.state[eid] exp.top = top.clone() # save federant information for k in allocated.keys(): exp.add_allocation(tbparams[k]) top.elements.append(topdl.Testbed(uri=tbparams[k].uri, type="testbed", localname=[k], service=[ s.to_topdl() for s in tbparams[k].services])) # Access proofs for the response message proofs = [copy.deepcopy(p) for k in tbparams.keys()\ for p in tbparams[k].proof] exp.updated() if self.state_filename: self.write_state() self.state_lock.release() return proofs def clear_placeholder(self, eid, expid, tmpdir): """ Clear the placeholder and remove any allocated temporary dir. """ self.state_lock.acquire() del self.state[eid] del self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() if tmpdir and self.cleanup: self.remove_dirs(tmpdir) # end of create_experiment sub-functions def create_experiment(self, req, fid): """ The external interface to experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the various subsections using the classes above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ self.log.info("Create experiment call started for %s" % fid) req = req.get('CreateRequestBody', None) if req: key = self.get_experiment_key(req) else: raise service_error(service_error.req, "Bad request format (no CreateRequestBody)") # Import information from the requester # import may partially succeed so always save credentials and warn if not self.auth.import_credentials(data_list=req.get('credential',[])): self.log.debug("Failed to import delegation credentials(!)") self.get_grouper_updates(fid) self.auth.update() self.auth.save() try: # Make sure that the caller can talk to us proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info("Create experiment call failed for %s: access denied"\ % fid) raise e # Install the testbed map entries supplied with the request into a copy # of the testbed map. tbmap = dict(self.tbmap) for m in req.get('testbedmap', []): if 'testbed' in m and 'uri' in m: tbmap[m['testbed']] = m['uri'] # a place to work try: tmpdir = tempfile.mkdtemp(prefix="split-") os.mkdir(tmpdir+"/keys") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") tbparams = { } eid, expid, expcert_file = \ self.get_experiment_ids_and_start(key, tmpdir) # This catches exceptions to clear the placeholder if necessary try: if not (eid and expid): raise service_error(service_error.internal, "Cannot find local experiment info!?") top = self.get_topology(req, tmpdir) self.confirm_software(top) # Assign the IPs hosts, ip_allocator = self.allocate_ips_to_topo(top) if self.needs_route_computation(req): self.compute_static_routes(tmpdir, top) # Find the testbeds to look up tb_hosts = { } testbeds = [ ] for e in top.elements: if isinstance(e, topdl.Computer): tb = e.get_attribute('testbed') # Put nodes not in a testbed into the default testbed if # there is one. if tb is None: if self.default_tb is None: raise service_error(service_error.req, '%s not in a testbed (and no default)' % e.name) tb = self.default_tb e.set_attribute('testbed', tb) if tb in tb_hosts: tb_hosts[tb].append(e.name) else: tb_hosts[tb] = [ e.name ] testbeds.append(tb) masters, pmasters = self.get_testbed_services(req, testbeds) allocated = { } # Testbeds we can access topo ={ } # Sub topologies connInfo = { } # Connection information self.get_access_to_testbeds(testbeds, fid, allocated, tbparams, masters, tbmap, expid, expcert_file) # tbactive is the set of testbeds that have NATs in front of their # portals. They need to initiate connections. tbactive = set([k for k, v in tbparams.items() \ if v.get_attribute('nat_portals')]) self.split_topology(top, topo, testbeds) attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams) part = experiment_partition(self.auth, self.store_url, tbmap, self.muxmax, self.direct_transit, tbactive) part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator, connInfo, expid) auth_attrs = set() # Now get access to the dynamic testbeds (those added above) for tb in [ t for t in topo if t not in allocated]: self.get_access(tb, tbparams, fid, masters, tbmap, expid, expcert_file) allocated[tb] = 1 store_keys = topo[tb].get_attribute('store_keys') # Give the testbed access to keys it exports or imports if store_keys: auth_attrs.update(set([ (tbparams[tb].allocID, sk) \ for sk in store_keys.split(" ")])) if auth_attrs: self.append_experiment_authorization(expid, auth_attrs) # transit and disconnected testbeds may not have a connInfo entry. # Fill in the blanks. for t in allocated.keys(): if not connInfo.has_key(t): connInfo[t] = { } self.wrangle_software(expid, top, topo, tbparams) proofs = self.save_federant_information(allocated, tbparams, eid, top) except service_error, e: # If something goes wrong in the parse (usually an access error) # clear the placeholder state. From here on out the code delays # exceptions. Failing at this point returns a fault to the remote # caller. self.log.info("Create experiment call failed for %s %s: %s" % (eid, fid, e)) self.clear_placeholder(eid, expid, tmpdir) raise e # Start the background swapper and return the starting state. From # here on out, the state will stick around a while. # Create a logger that logs to the experiment's state object as well as # to the main log file. alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) alloc_collector = self.list_log(self.state[eid].log) h = logging.StreamHandler(alloc_collector) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) # Start a thread to do the resource allocation t = Thread(target=self.allocate_resources, args=(allocated, masters, eid, expid, tbparams, top, topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo, tbmap, expcert_file), name=eid) t.start() rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'starting', 'proof': [ proof.to_dict() ] + proofs, } self.log.info("Create experiment call succeeded for %s %s" % \ (eid, fid)) return rv def get_experiment_fedid(self, key): """ find the fedid associated with the localname key in the state database. """ rv = None self.state_lock.acquire() if key in self.state: rv = self.state[key].fedid self.state_lock.release() return rv def check_experiment_access(self, fid, key): """ Confirm that the fid has access to the experiment. Though a request may be made in terms of a local name, the access attribute is always the experiment's fedid. """ if not isinstance(key, fedid): key = self.get_experiment_fedid(key) access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True) if access_ok: return proof else: raise service_error(service_error.access, "Access Denied", proof) def get_handler(self, path, fid): """ Perhaps surprisingly named, this function handles HTTP GET requests to this server (SOAP requests are POSTs). """ self.log.info("Get handler %s %s" % (path, fid)) if len("%s" % fid) == 0: return (None, None) # XXX: log proofs? if self.auth.check_attribute(fid, path): return ("%s/%s" % (self.repodir, path), "application/binary") else: return (None, None) def update_info(self, key, force=False): top = None self.state_lock.acquire() if key in self.state: if force or self.state[key].older_than(self.info_cache_limit): top = self.state[key].top if top is not None: top = top.clone() d1, info_params, cert, d2 = \ self.get_segment_info(self.state[key], need_lock=False) self.state_lock.release() if top is None: return try: tmpdir = tempfile.mkdtemp(prefix="info-") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") cert_file = self.make_temp_certfile(cert, tmpdir) data = [] try: for k, (uri, aid) in info_params.items(): info=self.info_segment(log=self.log, testbed=uri, cert_file=cert_file, cert_pwd=None, trusted_certs=self.trusted_certs, caller=self.call_InfoSegment) info(uri, aid) data.append(info) # Clean up the tmpdir no matter what finally: if tmpdir: self.remove_dirs(tmpdir) self.annotate_topology(top, data) self.state_lock.acquire() if key in self.state: self.state[key].top = top self.state[key].updated() if self.state_filename: self.write_state() self.state_lock.release() def get_info(self, req, fid): """ Return all the stored info about this experiment """ rv = None self.log.info("Info call started for %s" % fid) req = req.get('InfoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no InfoRequestBody)") exp = req.get('experiment', None) legacy = req.get('legacy', False) fresh = req.get('fresh', False) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") try: proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info("Info call failed for %s: access denied" % fid) self.update_info(key, fresh) self.state_lock.acquire() if self.state.has_key(key): rv = self.state[key].get_info() # Copy the topo if we need legacy annotations if legacy: top = self.state[key].top if top is not None: top = top.clone() self.state_lock.release() self.log.info("Gathered Info for %s %s" % (key, fid)) # If the legacy visualization and topology representations are # requested, calculate them and add them to the return. if legacy and rv is not None: self.log.info("Generating legacy Info for %s %s" % (key, fid)) if top is not None: vtopo = topdl.topology_to_vtopo(top) if vtopo is not None: rv['vtopo'] = vtopo try: vis = self.genviz(vtopo) except service_error, e: self.log.debug('Problem generating visualization: %s' \ % e) vis = None if vis is not None: rv['vis'] = vis if rv: self.log.info("Info succeded for %s %s" % (key, fid)) rv['proof'] = proof.to_dict() return rv else: self.log.info("Info failed for %s %s: no experiment" % (key, fid)) raise service_error(service_error.req, "No such experiment") def operate_on_segments(self, op_params, cert, op, testbeds, params, results): """ Call OperateSegment on multiple testbeds and gather the results. op_params contains the parameters needed to contact that testbed, cert is a certificate containing the fedid to use, op is the operation, testbeds is a dict mapping testbed name to targets in that testbed, params are the parameters to include a,d results is a growing list of the results of the calls. """ try: tmpdir = tempfile.mkdtemp(prefix="info-") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") cert_file = self.make_temp_certfile(cert, tmpdir) try: for tb, targets in testbeds.items(): if tb in op_params: uri, aid = op_params[tb] operate=self.operation_segment(log=self.log, testbed=uri, cert_file=cert_file, cert_pwd=None, trusted_certs=self.trusted_certs, caller=self.call_OperationSegment) if operate(uri, aid, op, targets, params): if operate.status is not None: results.extend(operate.status) continue # Something went wrong in a weird way. Add statuses # that reflect that to results for t in targets: results.append(operation_status(t, operation_status.federant, 'Unexpected error on %s' % tb)) # Clean up the tmpdir no matter what finally: if tmpdir: self.remove_dirs(tmpdir) def do_operation(self, req, fid): """ Find the testbeds holding each target and ask them to carry out the operation. Return the statuses. """ # Map an element to the testbed containing it def element_to_tb(e): if isinstance(e, topdl.Computer): return e.get_attribute("testbed") elif isinstance(e, topdl.Testbed): return e.name else: return None # If d is an operation_status object, make it a dict def make_dict(d): if isinstance(d, dict): return d elif isinstance(d, operation_status): return d.to_dict() else: return { } def element_name(e): if isinstance(e, topdl.Computer): return e.name elif isinstance(e, topdl.Testbed): if e.localname: return e.localname[0] else: return None else: return None self.log.info("Operation call started for %s" % fid) req = req.get('OperationRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no OperationRequestBody)") exp = req.get('experiment', None) op = req.get('operation', None) targets = set(req.get('target', [])) params = req.get('parameter', None) if exp: if 'fedid' in exp: key = exp['fedid'] keytype = "fedid" elif 'localname' in exp: key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") if op is None or not targets: raise service_error(service_error.req, "No request?") try: proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info("Operation call failed for %s: access denied" % fid) raise e self.state_lock.acquire() if key in self.state: d1, op_params, cert, d2 = \ self.get_segment_info(self.state[key], need_lock=False, key='tb') top = self.state[key].top if top is not None: top = top.clone() self.state_lock.release() if top is None: self.log.info("Operation call failed for %s: not active" % fid) raise service_error(service_error.partial, "No topology yet", proof=proof) testbeds = { } results = [] for e in top.elements: ename = element_name(e) if ename in targets: tb = element_to_tb(e) targets.remove(ename) if tb is not None: if tb in testbeds: testbeds[tb].append(ename) else: testbeds[tb] = [ ename ] else: results.append(operation_status(e.name, code=operation_status.no_target, description='Cannot map target to testbed')) for t in targets: results.append(operation_status(t, operation_status.no_target)) self.operate_on_segments(op_params, cert, op, testbeds, params, results) self.log.info("Operation call succeeded for %s" % fid) return { 'experiment': exp, 'status': [make_dict(r) for r in results], 'proof': proof.to_dict() } def get_multi_info(self, req, fid): """ Return all the stored info that this fedid can access """ rv = { 'info': [ ], 'proof': [ ] } self.log.info("Multi Info call started for %s" % fid) self.get_grouper_updates(fid) self.auth.update() self.auth.save() self.state_lock.acquire() for key in [ k for k in self.state.keys() if isinstance(k, fedid)]: try: proof = self.check_experiment_access(fid, key) except service_error, e: if e.code == service_error.access: continue else: self.log.info("Multi Info call failed for %s: %s" % \ (e,fid)) self.state_lock.release() raise e if self.state.has_key(key): e = self.state[key].get_info() e['proof'] = proof.to_dict() rv['info'].append(e) rv['proof'].append(proof.to_dict()) self.state_lock.release() self.log.info("Multi Info call succeeded for %s" % fid) return rv def check_termination_status(self, fed_exp, force): """ Confirm that the experiment is sin a valid state to stop (or force it) return the state - invalid states for deletion and force settings cause exceptions. """ self.state_lock.acquire() status = fed_exp.status if status: if status in ('starting', 'terminating'): if not force: self.state_lock.release() raise service_error(service_error.partial, 'Experiment still being created or destroyed') else: self.log.warning('Experiment in %s state ' % status + \ 'being terminated by force.') self.state_lock.release() return status else: # No status??? trouble self.state_lock.release() raise service_error(service_error.internal, "Experiment has no status!?") def get_segment_info(self, fed_exp, need_lock=True, key='aid'): ids = [] term_params = { } if need_lock: self.state_lock.acquire() ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ] expcert = fed_exp.identity repo = "%s" % fed_exp.fedid # Collect the allocation/segment ids into a dict keyed by the fedid # of the allocation that contains a tuple of uri, aid for i, fed in enumerate(fed_exp.get_all_allocations()): uri = fed.uri aid = fed.allocID if key == 'aid': term_params[aid] = (uri, aid) elif key == 'tb': term_params[fed.tb] = (uri, aid) if need_lock: self.state_lock.release() return ids, term_params, expcert, repo def get_termination_info(self, fed_exp): self.state_lock.acquire() ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False) # Change the experiment state fed_exp.status = 'terminating' fed_exp.updated() if self.state_filename: self.write_state() self.state_lock.release() return ids, term_params, expcert, repo def deallocate_resources(self, term_params, expcert, status, force, dealloc_log): tmpdir = None # This try block makes sure the tempdir is cleared try: # If no expcert, try the deallocation as the experiment # controller instance. if expcert and self.auth_type != 'legacy': try: tmpdir = tempfile.mkdtemp(prefix="term-") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") cert_file = self.make_temp_certfile(expcert, tmpdir) pw = None else: cert_file = self.cert_file pw = self.cert_pwd # Stop everyone. NB, wait_for_all waits until a thread starts # and then completes, so we can't wait if nothing starts. So, # no tbparams, no start. if len(term_params) > 0: tp = thread_pool(self.nthreads) for k, (uri, aid) in term_params.items(): # Create and start a thread to stop the segment tp.wait_for_slot() t = pooled_thread(\ target=self.terminate_segment(log=dealloc_log, testbed=uri, cert_file=cert_file, cert_pwd=pw, trusted_certs=self.trusted_certs, caller=self.call_TerminateSegment), args=(uri, aid), name=k, pdata=tp, trace_file=self.trace_file) t.start() # Wait for completions tp.wait_for_all_done() # release the allocations (failed experiments have done this # already, and starting experiments may be in odd states, so we # ignore errors releasing those allocations try: for k, (uri, aid) in term_params.items(): self.release_access(None, aid, uri=uri, cert_file=cert_file, cert_pwd=pw) except service_error, e: if status != 'failed' and not force: raise e # Clean up the tmpdir no matter what finally: if tmpdir: self.remove_dirs(tmpdir) def terminate_experiment(self, req, fid): """ Swap this experiment out on the federants and delete the shared information """ self.log.info("Terminate experiment call started for %s" % fid) tbparams = { } req = req.get('TerminateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no TerminateRequestBody)") key = self.get_experiment_key(req, 'experiment') try: proof = self.check_experiment_access(fid, key) except service_error, e: self.log.info( "Terminate experiment call failed for %s: access denied" \ % fid) raise e exp = req.get('experiment', False) force = req.get('force', False) dealloc_list = [ ] # Create a logger that logs to the dealloc_list as well as to the main # log file. dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) dealloc_log.info("Terminating %s " %key) h = logging.StreamHandler(self.list_log(dealloc_list)) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) dealloc_log.addHandler(h) self.state_lock.acquire() fed_exp = self.state.get(key, None) self.state_lock.release() repo = None if fed_exp: status = self.check_termination_status(fed_exp, force) # get_termination_info updates the experiment state ids, term_params, expcert, repo = self.get_termination_info(fed_exp) self.deallocate_resources(term_params, expcert, status, force, dealloc_log) # Remove the terminated experiment self.state_lock.acquire() for id in ids: self.clear_experiment_authorization(id, need_state_lock=False) if id in self.state: del self.state[id] if self.state_filename: self.write_state() self.state_lock.release() # Delete any synch points associated with this experiment. All # synch points begin with the fedid of the experiment. fedid_keys = set(["fedid:%s" % f for f in ids \ if isinstance(f, fedid)]) for k in self.synch_store.all_keys(): try: if len(k) > 45 and k[0:46] in fedid_keys: self.synch_store.del_value(k) except synch_store.BadDeletionError: pass self.write_store() # Remove software and other cached stuff from the filesystem. if repo: self.remove_dirs("%s/%s" % (self.repodir, repo)) self.log.info("Terminate experiment succeeded for %s %s" % \ (key, fid)) return { 'experiment': exp , 'deallocationLog': string.join(dealloc_list, ''), 'proof': [proof.to_dict()], } else: self.log.info("Terminate experiment failed for %s %s: no state" % \ (key, fid)) raise service_error(service_error.req, "No saved state") def GetValue(self, req, fid): """ Get a value from the synchronized store """ req = req.get('GetValueRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no GetValueRequestBody)") name = req.get('name', None) wait = req.get('wait', False) rv = { 'name': name } if not name: raise service_error(service_error.req, "No name?") access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True) if access_ok: self.log.debug("[GetValue] asking for %s " % name) try: v = self.synch_store.get_value(name, wait) except synch_store.RevokedKeyError: # No more synch on this key raise service_error(service_error.federant, "Synch key %s revoked" % name) if v is not None: rv['value'] = v rv['proof'] = proof.to_dict() self.log.debug("[GetValue] got %s from %s" % (v, name)) return rv else: raise service_error(service_error.access, "Access Denied", proof=proof) def SetValue(self, req, fid): """ Set a value in the synchronized store """ req = req.get('SetValueRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no SetValueRequestBody)") name = req.get('name', None) v = req.get('value', '') if not name: raise service_error(service_error.req, "No name?") access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True) if access_ok: try: self.synch_store.set_value(name, v) self.write_store() self.log.debug("[SetValue] set %s to %s" % (name, v)) except synch_store.CollisionError: # Translate into a service_error raise service_error(service_error.req, "Value already set: %s" %name) except synch_store.RevokedKeyError: # No more synch on this key raise service_error(service_error.federant, "Synch key %s revoked" % name) return { 'name': name, 'value': v, 'proof': proof.to_dict() } else: raise service_error(service_error.access, "Access Denied", proof=proof)