#!/usr/local/bin/python import os,sys import re import random import string import subprocess import tempfile import copy import pickle import logging import signal import time import traceback # For parsing visualization output and splitter output import xml.parsers.expat from threading import Lock, Thread, Condition from subprocess import call, Popen, PIPE from urlparse import urlparse from urllib2 import urlopen from util import * from fedid import fedid, generate_fedid from remote_service import xmlrpc_handler, soap_handler, service_caller from service_error import service_error from synch_store import synch_store from experiment_partition import experiment_partition import topdl import list_log from ip_allocator import ip_allocator from ip_addr import ip_addr class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.experiment_control") fl.addHandler(nullHandler()) # Right now, no support for composition. class federated_service: def __init__(self, name, exporter=None, importers=None, params=None, reqs=None, portal=None): self.name=name self.exporter=exporter if importers is None: self.importers = [] else: self.importers=importers if params is None: self.params = { } else: self.params = params if reqs is None: self.reqs = [] else: self.reqs = reqs if portal is not None: self.portal = portal else: self.portal = (name in federated_service.needs_portal) def __str__(self): return "name %s export %s import %s params %s reqs %s" % \ (self.name, self.exporter, self.importers, self.params, [ (r['name'], r['visibility']) for r in self.reqs] ) needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master') class experiment_control_local: """ Control of experiments that this system can directly access. Includes experiment creation, termination and information dissemination. Thred safe. """ class ssh_cmd_timeout(RuntimeError): pass class thread_pool: """ A class to keep track of a set of threads all invoked for the same task. Manages the mutual exclusion of the states. """ def __init__(self, nthreads): """ Start a pool. """ self.changed = Condition() self.started = 0 self.terminated = 0 self.nthreads = nthreads def acquire(self): """ Get the pool's lock. """ self.changed.acquire() def release(self): """ Release the pool's lock. """ self.changed.release() def wait(self, timeout = None): """ Wait for a pool thread to start or stop. """ self.changed.wait(timeout) def start(self): """ Called by a pool thread to report starting. """ self.changed.acquire() self.started += 1 self.changed.notifyAll() self.changed.release() def terminate(self): """ Called by a pool thread to report finishing. """ self.changed.acquire() self.terminated += 1 self.changed.notifyAll() self.changed.release() def clear(self): """ Clear all pool data. """ self.changed.acquire() self.started = 0 self.terminated =0 self.changed.notifyAll() self.changed.release() def wait_for_slot(self): """ Wait until we have a free slot to start another pooled thread """ self.acquire() while self.started - self.terminated >= self.nthreads: self.wait() self.release() def wait_for_all_done(self, timeout=None): """ Wait until all active threads finish (and at least one has started). If a timeout is given, return after waiting that long for termination. If all threads are done (and one has started in the since the last clear()) return True, otherwise False. """ if timeout: deadline = time.time() + timeout self.acquire() while self.started == 0 or self.started > self.terminated: self.wait(timeout) if timeout: if time.time() > deadline: break timeout = deadline - time.time() self.release() return not (self.started == 0 or self.started > self.terminated) class pooled_thread(Thread): """ One of a set of threads dedicated to a specific task. Uses the thread_pool class above for coordination. """ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, pdata=None, trace_file=None): Thread.__init__(self, group, target, name, args, kwargs) self.rv = None # Return value of the ops in this thread self.exception = None # Exception that terminated this thread self.target=target # Target function to run on start() self.args = args # Args to pass to target self.kwargs = kwargs # Additional kw args self.pdata = pdata # thread_pool for this class # Logger for this thread self.log = logging.getLogger("fedd.experiment_control") def run(self): """ Emulate Thread.run, except add pool data manipulation and error logging. """ if self.pdata: self.pdata.start() if self.target: try: self.rv = self.target(*self.args, **self.kwargs) except service_error, s: self.exception = s self.log.error("Thread exception: %s %s" % \ (s.code_string(), s.desc)) except: self.exception = sys.exc_info()[1] self.log.error(("Unexpected thread exception: %s" +\ "Trace %s") % (self.exception,\ traceback.format_exc())) if self.pdata: self.pdata.terminate() call_RequestAccess = service_caller('RequestAccess') call_ReleaseAccess = service_caller('ReleaseAccess') call_StartSegment = service_caller('StartSegment') call_TerminateSegment = service_caller('TerminateSegment') call_Ns2Topdl = service_caller('Ns2Topdl') def __init__(self, config=None, auth=None): """ Intialize the various attributes, most from the config object """ def parse_tarfile_list(tf): """ Parse a tarfile list from the configuration. This is a set of paths and tarfiles separated by spaces. """ rv = [ ] if tf is not None: tl = tf.split() while len(tl) > 1: p, t = tl[0:2] del tl[0:2] rv.append((p, t)) return rv self.thread_with_rv = experiment_control_local.pooled_thread self.thread_pool = experiment_control_local.thread_pool self.list_log = list_log.list_log self.cert_file = config.get("experiment_control", "cert_file") if self.cert_file: self.cert_pwd = config.get("experiment_control", "cert_pwd") else: self.cert_file = config.get("globals", "cert_file") self.cert_pwd = config.get("globals", "cert_pwd") self.trusted_certs = config.get("experiment_control", "trusted_certs") \ or config.get("globals", "trusted_certs") self.repodir = config.get("experiment_control", "repodir") self.repo_url = config.get("experiment_control", "repo_url", "https://users.isi.deterlab.net:23235"); self.exp_stem = "fed-stem" self.log = logging.getLogger("fedd.experiment_control") set_log_level(config, "experiment_control", self.log) self.muxmax = 2 self.nthreads = 10 self.randomize_experiments = False self.splitter = None self.ssh_keygen = "/usr/bin/ssh-keygen" self.ssh_identity_file = None self.debug = config.getboolean("experiment_control", "create_debug") self.cleanup = not config.getboolean("experiment_control", "leave_tmpfiles") self.state_filename = config.get("experiment_control", "experiment_state") self.store_filename = config.get("experiment_control", "synch_store") self.store_url = config.get("experiment_control", "store_url") self.splitter_url = config.get("experiment_control", "ns2topdl_uri") self.fedkit = parse_tarfile_list(\ config.get("experiment_control", "fedkit")) self.gatewaykit = parse_tarfile_list(\ config.get("experiment_control", "gatewaykit")) accessdb_file = config.get("experiment_control", "accessdb") self.ssh_pubkey_file = config.get("experiment_control", "ssh_pubkey_file") self.ssh_privkey_file = config.get("experiment_control", "ssh_privkey_file") dt = config.get("experiment_control", "direct_transit") if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")] else: self.direct_transit = [ ] # NB for internal master/slave ops, not experiment setup self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa") self.overrides = set([]) ovr = config.get('experiment_control', 'overrides') if ovr: for o in ovr.split(","): o = o.strip() if o.startswith('fedid:'): o = o[len('fedid:'):] self.overrides.add(fedid(hexstr=o)) self.state = { } self.state_lock = Lock() self.tclsh = "/usr/local/bin/otclsh" self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \ config.get("experiment_control", "tcl_splitter", "/usr/testbed/lib/ns2ir/parse.tcl") mapdb_file = config.get("experiment_control", "mapdb") self.trace_file = sys.stderr self.def_expstart = \ "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\ "/tmp/federate"; self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\ "FEDDIR/hosts"; self.def_gwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\ "/tmp/bridge.log"; self.def_mgwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\ "/tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; self.local_access = { } if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() if self.ssh_pubkey_file: try: f = open(self.ssh_pubkey_file, 'r') self.ssh_pubkey = f.read() f.close() except EnvironmentError: raise service_error(service_error.internal, "Cannot read sshpubkey") else: raise service_error(service_error.internal, "No SSH public key file?") if not self.ssh_privkey_file: raise service_error(service_error.internal, "No SSH public key file?") if mapdb_file: self.read_mapdb(mapdb_file) else: self.log.warn("[experiment_control] No testbed map, using defaults") self.tbmap = { 'deter':'https://users.isi.deterlab.net:23235', 'emulab':'https://users.isi.deterlab.net:23236', 'ucb':'https://users.isi.deterlab.net:23237', } if accessdb_file: self.read_accessdb(accessdb_file) else: raise service_error(service_error.internal, "No accessdb specified in config") # Grab saved state. OK to do this w/o locking because it's read only # and only one thread should be in existence that can see self.state at # this point. if self.state_filename: self.read_state() if self.store_filename: self.read_store() else: self.log.warning("No saved synch store") self.synch_store = synch_store # Dispatch tables self.soap_services = {\ 'New': soap_handler('New', self.new_experiment), 'Create': soap_handler('Create', self.create_experiment), 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 'Vis': soap_handler('Vis', self.get_vis), 'Info': soap_handler('Info', self.get_info), 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 'Terminate': soap_handler('Terminate', self.terminate_experiment), 'GetValue': soap_handler('GetValue', self.GetValue), 'SetValue': soap_handler('SetValue', self.SetValue), } self.xmlrpc_services = {\ 'New': xmlrpc_handler('New', self.new_experiment), 'Create': xmlrpc_handler('Create', self.create_experiment), 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 'Vis': xmlrpc_handler('Vis', self.get_vis), 'Info': xmlrpc_handler('Info', self.get_info), 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 'Terminate': xmlrpc_handler('Terminate', self.terminate_experiment), 'GetValue': xmlrpc_handler('GetValue', self.GetValue), 'SetValue': xmlrpc_handler('SetValue', self.SetValue), } # Call while holding self.state_lock def write_state(self): """ Write a new copy of experiment state after copying the existing state to a backup. State format is a simple pickling of the state dictionary. """ if os.access(self.state_filename, os.W_OK): copy_file(self.state_filename, \ "%s.bak" % self.state_filename) try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) @staticmethod def get_alloc_ids(state): """ Pull the fedids of the identifiers of each allocation from the state. Again, a dict dive that's best isolated. Used by read_store and read state """ return [ f['allocID']['fedid'] for f in state.get('federant',[]) \ if f.has_key('allocID') and \ f['allocID'].has_key('fedid')] # Call while holding self.state_lock def read_state(self): """ Read a new copy of experiment state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ def get_experiment_id(state): """ Pull the fedid experimentID out of the saved state. This is kind of a gross walk through the dict. """ if state.has_key('experimentID'): for e in state['experimentID']: if e.has_key('fedid'): return e['fedid'] else: return None else: return None try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except EnvironmentError, e: self.log.warning("[read_state]: No saved state: Can't open %s: %s"\ % (self.state_filename, e)) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) for s in self.state.values(): try: eid = get_experiment_id(s) if eid : # Give the owner rights to the experiment self.auth.set_attribute(s['owner'], eid) # And holders of the eid as well self.auth.set_attribute(eid, eid) # allow overrides to control experiments as well for o in self.overrides: self.auth.set_attribute(o, eid) # Set permissions to allow reading of the software repo, if # any, as well. for a in self.get_alloc_ids(s): self.auth.set_attribute(a, 'repo/%s' % eid) else: raise KeyError("No experiment id") except KeyError, e: self.log.warning("[read_state]: State ownership or identity " +\ "misformatted in %s: %s" % (self.state_filename, e)) def read_accessdb(self, accessdb_file): """ Read the mapping from fedids that can create experiments to their name in the 3-level access namespace. All will be asserted from this testbed and can include the local username and porject that will be asserted on their behalf by this fedd. Each fedid is also added to the authorization system with the "create" attribute. """ self.accessdb = {} # These are the regexps for parsing the db name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+" project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$") user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ "\s*->\s*(" + name_expr + ")\s*$") lineno = 0 # Parse the mappings and store in self.authdb, a dict of # fedid -> (proj, user) try: f = open(accessdb_file, "r") for line in f: lineno += 1 line = line.strip() if len(line) == 0 or line.startswith('#'): continue m = project_line.match(line) if m: fid = fedid(hexstr=m.group(1)) project, user = m.group(2,3) if not self.accessdb.has_key(fid): self.accessdb[fid] = [] self.accessdb[fid].append((project, user)) continue m = user_line.match(line) if m: fid = fedid(hexstr=m.group(1)) project = None user = m.group(2) if not self.accessdb.has_key(fid): self.accessdb[fid] = [] self.accessdb[fid].append((project, user)) continue self.log.warn("[experiment_control] Error parsing access " +\ "db %s at line %d" % (accessdb_file, lineno)) except EnvironmentError: raise service_error(service_error.internal, ("Error opening/reading %s as experiment " +\ "control accessdb") % accessdb_file) f.close() # Initialize the authorization attributes for fid in self.accessdb.keys(): self.auth.set_attribute(fid, 'create') self.auth.set_attribute(fid, 'new') def read_mapdb(self, file): """ Read a simple colon separated list of mappings for the label-to-testbed-URL mappings. Clears or creates self.tbmap. """ self.tbmap = { } lineno =0 try: f = open(file, "r") for line in f: lineno += 1 line = line.strip() if line.startswith('#') or len(line) == 0: continue try: label, url = line.split(':', 1) self.tbmap[label] = url except ValueError, e: self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\ "map db: %s %s" % (lineno, line, e)) except EnvironmentError, e: self.log.warning("[read_mapdb]: No saved map database: Can't " +\ "open %s: %s" % (file, e)) f.close() def read_store(self): try: self.synch_store = synch_store() self.synch_store.load(self.store_filename) self.log.debug("[read_store]: Read store from %s" % \ self.store_filename) except EnvironmentError, e: self.log.warning("[read_store]: No saved store: Can't open %s: %s"\ % (self.state_filename, e)) self.synch_store = synch_store() # Set the initial permissions on data in the store. XXX: This ad hoc # authorization attribute initialization is getting out of hand. for k in self.synch_store.all_keys(): try: if k.startswith('fedid:'): fid = fedid(hexstr=k[6:46]) if self.state.has_key(fid): for a in self.get_alloc_ids(self.state[fid]): self.auth.set_attribute(a, k) except ValueError, e: self.log.warn("Cannot deduce permissions for %s" % k) def write_store(self): """ Write a new copy of synch_store after writing current state to a backup. We use the internal synch_store pickle method to avoid incinsistent data. State format is a simple pickling of the store. """ if os.access(self.store_filename, os.W_OK): copy_file(self.store_filename, \ "%s.bak" % self.store_filename) try: self.synch_store.save(self.store_filename) except EnvironmentError, e: self.log.error("Can't write file %s: %s" % \ (self.store_filename, e)) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) def generate_ssh_keys(self, dest, type="rsa" ): """ Generate a set of keys for the gateways to use to talk. Keys are of type type and are stored in the required dest file. """ valid_types = ("rsa", "dsa") t = type.lower(); if t not in valid_types: raise ValueError cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] try: trace = open("/dev/null", "w") except EnvironmentError: raise service_error(service_error.internal, "Cannot open /dev/null??"); # May raise CalledProcessError self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) rv = call(cmd, stdout=trace, stderr=trace, close_fds=True) if rv != 0: raise service_error(service_error.internal, "Cannot generate nonce ssh keys. %s return code %d" \ % (self.ssh_keygen, rv)) def gentopo(self, str): """ Generate the topology dtat structure from the splitter's XML representation of it. The topology XML looks like: ip1:ip2 node:port """ class topo_parse: """ Parse the topology XML and create the dats structure. """ def __init__(self): # Typing of the subelements for data conversion self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') self.int_subelements = ( 'bandwidth',) self.float_subelements = ( 'delay',) # The final data structure self.nodes = [ ] self.lans = [ ] self.topo = { \ 'node': self.nodes,\ 'lan' : self.lans,\ } self.element = { } # Current element being created self.chars = "" # Last text seen def end_element(self, name): # After each sub element the contents is added to the current # element or to the appropriate list. if name == 'node': self.nodes.append(self.element) self.element = { } elif name == 'lan': self.lans.append(self.element) self.element = { } elif name in self.str_subelements: self.element[name] = self.chars self.chars = "" elif name in self.int_subelements: self.element[name] = int(self.chars) self.chars = "" elif name in self.float_subelements: self.element[name] = float(self.chars) self.chars = "" def found_chars(self, data): self.chars += data.rstrip() tp = topo_parse(); parser = xml.parsers.expat.ParserCreate() parser.EndElementHandler = tp.end_element parser.CharacterDataHandler = tp.found_chars parser.Parse(str) return tp.topo def genviz(self, topo): """ Generate the visualization the virtual topology """ neato = "/usr/local/bin/neato" # These are used to parse neato output and to create the visualization # file. vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"') vis_fmt = "%s%s%s" + \ "%s" try: # Node names nodes = [ n['vname'] for n in topo['node'] ] topo_lans = topo['lan'] except KeyError, e: raise service_error(service_error.internal, "Bad topology: %s" %e) lans = { } links = { } # Walk through the virtual topology, organizing the connections into # 2-node connections (links) and more-than-2-node connections (lans). # When a lan is created, it's added to the list of nodes (there's a # node in the visualization for the lan). for l in topo_lans: if links.has_key(l['vname']): if len(links[l['vname']]) < 2: links[l['vname']].append(l['vnode']) else: nodes.append(l['vname']) lans[l['vname']] = links[l['vname']] del links[l['vname']] lans[l['vname']].append(l['vnode']) elif lans.has_key(l['vname']): lans[l['vname']].append(l['vnode']) else: links[l['vname']] = [ l['vnode'] ] # Open up a temporary file for dot to turn into a visualization try: df, dotname = tempfile.mkstemp() dotfile = os.fdopen(df, 'w') except EnvironmentError: raise service_error(service_error.internal, "Failed to open file in genviz") try: dnull = open('/dev/null', 'w') except EnvironmentError: service_error(service_error.internal, "Failed to open /dev/null in genviz") # Generate a dot/neato input file from the links, nodes and lans try: print >>dotfile, "graph G {" for n in nodes: print >>dotfile, '\t"%s"' % n for l in links.keys(): print >>dotfile, '\t"%s" -- "%s"' % tuple(links[l]) for l in lans.keys(): for n in lans[l]: print >>dotfile, '\t "%s" -- "%s"' % (n,l) print >>dotfile, "}" dotfile.close() except TypeError: raise service_error(service_error.internal, "Single endpoint link in vtopo") except EnvironmentError: raise service_error(service_error.internal, "Cannot write dot file") # Use dot to create a visualization dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE, stderr=dnull, close_fds=True) dnull.close() # Translate dot to vis format vis_nodes = [ ] vis = { 'node': vis_nodes } for line in dot.stdout: m = vis_re.match(line) if m: vn = m.group(1) vis_node = {'name': vn, \ 'x': float(m.group(2)),\ 'y' : float(m.group(3)),\ } if vn in links.keys() or vn in lans.keys(): vis_node['type'] = 'lan' else: vis_node['type'] = 'node' vis_nodes.append(vis_node) rv = dot.wait() os.remove(dotname) if rv == 0 : return vis else: return None def get_access(self, tb, nodes, tbparam, access_user, masters, tbmap): """ Get access to testbed through fedd and set the parameters for that tb """ def get_export_project(svcs): """ Look through for the list of federated_service for this testbed objects for a project_export service, and extract the project parameter. """ pe = [s for s in svcs if s.name=='project_export'] if len(pe) == 1: return pe[0].params.get('project', None) elif len(pe) == 0: return None else: raise service_error(service_error.req, "More than one project export is not supported") uri = tbmap.get(testbed_base(tb), None) if not uri: raise service_error(service_error.server_config, "Unknown testbed: %s" % tb) export_svcs = masters.get(tb,[]) import_svcs = [ s for m in masters.values() \ for s in m \ if tb in s.importers ] export_project = get_export_project(export_svcs) # Tweak search order so that if there are entries in access_user that # have a project matching the export project, we try them first if export_project: access_sequence = [ (p, u) for p, u in access_user \ if p == export_project] access_sequence.extend([(p, u) for p, u in access_user \ if p != export_project]) else: access_sequence = access_user for p, u in access_sequence: self.log.debug(("[get_access] Attempting access from (%s, %s) " + \ "to %s") % ((p or "None"), u, uri)) if p: # Request with user and project specified req = {\ 'credential': [ "project: %s" % p, "user: %s" % u], } else: # Request with only user specified req = {\ 'credential': [ 'user: %s' % u ], } # Make the service request from the services we're importing and # exporting. Keep track of the export request ids so we can # collect the resulting info from the access response. e_keys = { } if import_svcs or export_svcs: req['service'] = [ ] for i, s in enumerate(import_svcs): idx = 'import%d' % i sr = {'id': idx, 'name': s.name, 'visibility': 'import' } if s.params: sr['fedAttr'] = [ { 'attribute': k, 'value': v } \ for k, v in s.params.items()] req['service'].append(sr) for i, s in enumerate(export_svcs): idx = 'export%d' % i e_keys[idx] = s sr = {'id': idx, 'name': s.name, 'visibility': 'export' } if s.params: sr['fedAttr'] = [ { 'attribute': k, 'value': v } for k, v in s.params.items()] req['service'].append(sr) # node resources if any if nodes != None and len(nodes) > 0: rnodes = [ ] for n in nodes: rn = { } image, hw, count = n.split(":") if image: rn['image'] = [ image ] if hw: rn['hardware'] = [ hw ] if count and int(count) >0 : rn['count'] = int(count) rnodes.append(rn) req['resources']= { } req['resources']['node'] = rnodes try: if self.local_access.has_key(uri): # Local access call req = { 'RequestAccessRequestBody' : req } r = self.local_access[uri].RequestAccess(req, fedid(file=self.cert_file)) r = { 'RequestAccessResponseBody' : r } else: r = self.call_RequestAccess(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) except service_error, e: if e.code == service_error.access: self.log.debug("[get_access] Access denied") r = None continue else: raise e if r.has_key('RequestAccessResponseBody'): # Through to here we have a valid response, not a fault. # Access denied is a fault, so something better or worse than # access denied has happened. r = r['RequestAccessResponseBody'] self.log.debug("[get_access] Access granted") break else: raise service_error(service_error.protocol, "Bad proxy response") if not r: raise service_error(service_error.access, "Access denied by %s (%s)" % (tb, uri)) tbparam[tb] = { "allocID" : r['allocID'], "uri": uri, } # Collect the responses corresponding to the services this testbed # exports. These will be the service requests that we will include in # the start segment requests (with appropriate visibility values) to # import and export the segments. for s in r.get('service', []): id = s.get('id', None) if id and id in e_keys: e_keys[id].reqs.append(s) # Add attributes to parameter space. We don't allow attributes to # overlay any parameters already installed. for a in r.get('fedAttr', []): try: if a['attribute'] and \ isinstance(a['attribute'], basestring)\ and not tbparam[tb].has_key(a['attribute'].lower()): tbparam[tb][a['attribute'].lower()] = a['value'] except KeyError: self.log.error("Bad attribute in response: %s" % a) def release_access(self, tb, aid, tbmap=None, uri=None): """ Release access to testbed through fedd """ if not uri and tbmap: uri = tbmap.get(tb, None) if not uri: raise service_error(service_error.server_config, "Unknown testbed: %s" % tb) if self.local_access.has_key(uri): resp = self.local_access[uri].ReleaseAccess(\ { 'ReleaseAccessRequestBody' : {'allocID': aid},}, fedid(file=self.cert_file)) resp = { 'ReleaseAccessResponseBody': resp } else: resp = self.call_ReleaseAccess(uri, {'allocID': aid}, self.cert_file, self.cert_pwd, self.trusted_certs) # better error coding def remote_ns2topdl(self, uri, desc): req = { 'description' : { 'ns2description': desc }, } r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('Ns2TopdlResponseBody'): r = r['Ns2TopdlResponseBody'] ed = r.get('experimentdescription', None) if ed.has_key('topdldescription'): return topdl.Topology(**ed['topdldescription']) else: raise service_error(service_error.protocol, "Bad splitter response (no output)") else: raise service_error(service_error.protocol, "Bad splitter response") class start_segment: def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None, log_collector=None): self.log = log self.debug = debug self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = None self.caller = caller self.testbed = testbed self.log_collector = log_collector self.response = None self.node = { } def make_map(self, resp): for e in resp.get('embedding', []): if 'toponame' in e and 'physname' in e: self.node[e['toponame']] = e['physname'][0] def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None): req = { 'allocID': { 'fedid' : aid }, 'segmentdescription': { 'topdldescription': topo.to_dict(), }, } if connInfo: req['connection'] = connInfo import_svcs = [ s for m in masters.values() \ for s in m if self.testbed in s.importers] if import_svcs or self.testbed in masters: req['service'] = [] for s in import_svcs: for r in s.reqs: sr = copy.deepcopy(r) sr['visibility'] = 'import'; req['service'].append(sr) for s in masters.get(self.testbed, []): for r in s.reqs: sr = copy.deepcopy(r) sr['visibility'] = 'export'; req['service'].append(sr) if attrs: req['fedAttr'] = attrs try: self.log.debug("Calling StartSegment at %s " % uri) r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('StartSegmentResponseBody'): lval = r['StartSegmentResponseBody'].get('allocationLog', None) if lval and self.log_collector: for line in lval.splitlines(True): self.log_collector.write(line) self.make_map(r['StartSegmentResponseBody']) self.response = r else: raise service_error(service_error.internal, "Bad response!?: %s" %r) return True except service_error, e: self.log.error("Start segment failed on %s: %s" % \ (self.testbed, e)) return False class terminate_segment: def __init__(self, debug=False, log=None, testbed="", cert_file=None, cert_pwd=None, trusted_certs=None, caller=None): self.log = log self.debug = debug self.cert_file = cert_file self.cert_pwd = cert_pwd self.trusted_certs = None self.caller = caller self.testbed = testbed def __call__(self, uri, aid ): req = { 'allocID': aid , } try: r = self.caller(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) return True except service_error, e: self.log.error("Terminate segment failed on %s: %s" % \ (self.testbed, e)) return False def allocate_resources(self, allocated, masters, eid, expid, tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, attrs=None, connInfo={}, tbmap=None): started = { } # Testbeds where a sub-experiment started # successfully # XXX fail_soft = False if tbmap is None: tbmap = { } log = alloc_log or self.log thread_pool = self.thread_pool(self.nthreads) threads = [ ] starters = [ ] for tb in allocated.keys(): # Create and start a thread to start the segment, and save it # to get the return value later tb_attrs = copy.copy(attrs) thread_pool.wait_for_slot() uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None)) base, suffix = split_testbed(tb) if suffix: tb_attrs.append({'attribute': 'experiment_name', 'value': "%s-%s" % (eid, suffix)}) else: tb_attrs.append({'attribute': 'experiment_name', 'value': eid}) if not uri: raise service_error(service_error.internal, "Unknown testbed %s !?" % tb) if tbparams[tb].has_key('allocID') and \ tbparams[tb]['allocID'].has_key('fedid'): aid = tbparams[tb]['allocID']['fedid'] else: raise service_error(service_error.internal, "No alloc id for testbed %s !?" % tb) s = self.start_segment(log=log, debug=self.debug, testbed=tb, cert_file=self.cert_file, cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, caller=self.call_StartSegment, log_collector=log_collector) starters.append(s) t = self.pooled_thread(\ target=s, name=tb, args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]), pdata=thread_pool, trace_file=self.trace_file) threads.append(t) t.start() # Wait until all finish (keep pinging the log, though) mins = 0 revoked = False while not thread_pool.wait_for_all_done(60.0): mins += 1 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ % mins) if not revoked and \ len([ t.getName() for t in threads if t.rv == False]) > 0: # a testbed has failed. Revoke this experiment's # synchronizarion values so that sub experiments will not # deadlock waiting for synchronization that will never happen self.log.info("A subexperiment has failed to swap in, " + \ "revoking synch keys") var_key = "fedid:%s" % expid for k in self.synch_store.all_keys(): if len(k) > 45 and k[0:46] == var_key: self.synch_store.revoke_key(k) revoked = True failed = [ t.getName() for t in threads if not t.rv ] succeeded = [tb for tb in allocated.keys() if tb not in failed] # If one failed clean up, unless fail_soft is set if failed: if not fail_soft: thread_pool.clear() for tb in succeeded: # Create and start a thread to stop the segment thread_pool.wait_for_slot() uri = tbparams[tb]['uri'] t = self.pooled_thread(\ target=self.terminate_segment(log=log, testbed=tb, cert_file=self.cert_file, cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, caller=self.call_TerminateSegment), args=(uri, tbparams[tb]['federant']['allocID']), name=tb, pdata=thread_pool, trace_file=self.trace_file) t.start() # Wait until all finish (if any are being stopped) if succeeded: thread_pool.wait_for_all_done() # release the allocations for tb in tbparams.keys(): self.release_access(tb, tbparams[tb]['allocID'], tbmap=tbmap, uri=tbparams[tb].get('uri', None)) # Remove the placeholder self.state_lock.acquire() self.state[eid]['experimentStatus'] = 'failed' if self.state_filename: self.write_state() self.state_lock.release() # Remove the repo dir self.remove_dirs("%s/%s" %(self.repodir, expid)) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: log.debug("[start_experiment]: not removing %s" % tmpdir) log.error("Swap in failed on %s" % ",".join(failed)) return else: # Walk through the successes and gather the virtual to physical # mapping. embedding = [ ] for s in starters: for k, v in s.node.items(): embedding.append({ 'toponame': k, 'physname': [ v], 'testbed': s.testbed }) log.info("[start_segment]: Experiment %s active" % eid) # Walk up tmpdir, deleting as we go if self.cleanup: self.remove_dirs(tmpdir) else: log.debug("[start_experiment]: not removing %s" % tmpdir) # Insert the experiment into our state and update the disk copy. self.state_lock.acquire() self.state[expid]['experimentStatus'] = 'active' self.state[eid] = self.state[expid] self.state[eid]['experimentdescription']['topdldescription'] = \ top.to_dict() self.state[eid]['embedding'] = embedding if self.state_filename: self.write_state() self.state_lock.release() return def add_kit(self, e, kit): """ Add a Software object created from the list of (install, location) tuples passed as kit to the software attribute of an object e. We do this enough to break out the code, but it's kind of a hack to avoid changing the old tuple rep. """ s = [ topdl.Software(install=i, location=l) for i, l in kit] if isinstance(e.software, list): e.software.extend(s) else: e.software = s def create_experiment_state(self, fid, req, expid, expcert, state='starting'): """ Create the initial entry in the experiment's state. The expid and expcert are the experiment's fedid and certifacte that represents that ID, which are installed in the experiment state. If the request includes a suggested local name that is used if possible. If the local name is already taken by an experiment owned by this user that has failed, it is overwritten. Otherwise new letters are added until a valid localname is found. The generated local name is returned. """ if req.has_key('experimentID') and \ req['experimentID'].has_key('localname'): overwrite = False eid = req['experimentID']['localname'] # If there's an old failed experiment here with the same local name # and accessible by this user, we'll overwrite it, otherwise we'll # fall through and do the collision avoidance. old_expid = self.get_experiment_fedid(eid) if old_expid and self.check_experiment_access(fid, old_expid): self.state_lock.acquire() status = self.state[eid].get('experimentStatus', None) if status and status == 'failed': # remove the old access attribute self.auth.unset_attribute(fid, old_expid) overwrite = True del self.state[eid] del self.state[old_expid] self.state_lock.release() self.state_lock.acquire() while (self.state.has_key(eid) and not overwrite): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': state, 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() else: eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state_lock.acquire() while (self.state.has_key(eid)): eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': state, 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() return eid def allocate_ips_to_topo(self, top): """ Add an ip4_address attribute to all the hosts in the topology, based on the shared substrates on which they sit. An /etc/hosts file is also created and returned as a list of hostfiles entries. We also return the allocator, because we may need to allocate IPs to portals (specifically DRAGON portals). """ subs = sorted(top.substrates, cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), reverse=True) ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) ifs = { } hosts = [ ] for idx, s in enumerate(subs): net_size = len(s.interfaces)+2 a = ips.allocate(net_size) if a : base, num = a if num < net_size: raise service_error(service_error.internal, "Allocator returned wrong number of IPs??") else: raise service_error(service_error.req, "Cannot allocate IP addresses") mask = ips.min_alloc while mask < net_size: mask *= 2 netmask = ((2**32-1) ^ (mask-1)) base += 1 for i in s.interfaces: i.attribute.append( topdl.Attribute('ip4_address', "%s" % ip_addr(base))) i.attribute.append( topdl.Attribute('ip4_netmask', "%s" % ip_addr(int(netmask)))) hname = i.element.name if ifs.has_key(hname): hosts.append("%s\t%s-%s %s-%d" % \ (ip_addr(base), hname, s.name, hname, ifs[hname])) else: ifs[hname] = 0 hosts.append("%s\t%s-%s %s-%d %s" % \ (ip_addr(base), hname, s.name, hname, ifs[hname], hname)) ifs[hname] += 1 base += 1 return hosts, ips def get_access_to_testbeds(self, testbeds, access_user, allocated, tbparams, masters, tbmap): """ Request access to the various testbeds required for this instantiation (passed in as testbeds). User, access_user, expoert_project and master are used to construct the correct requests. Per-testbed parameters are returned in tbparams. """ for tb in testbeds: self.get_access(tb, None, tbparams, access_user, masters, tbmap) allocated[tb] = 1 def split_topology(self, top, topo, testbeds): """ Create the sub-topologies that are needed for experiment instantiation. """ for tb in testbeds: topo[tb] = top.clone() # copy in for loop allows deletions from the original for e in [ e for e in topo[tb].elements]: etb = e.get_attribute('testbed') # NB: elements without a testbed attribute won't appear in any # sub topologies. if not etb or etb != tb: for i in e.interface: for s in i.subs: try: s.interfaces.remove(i) except ValueError: raise service_error(service_error.internal, "Can't remove interface??") topo[tb].elements.remove(e) topo[tb].make_indices() def wrangle_software(self, expid, top, topo, tbparams): """ Copy software out to the repository directory, allocate permissions and rewrite the segment topologies to look for the software in local places. """ # Copy the rpms and tarfiles to a distribution directory from # which the federants can retrieve them linkpath = "%s/software" % expid softdir ="%s/%s" % ( self.repodir, linkpath) softmap = { } # These are in a list of tuples format (each kit). This comprehension # unwraps them into a single list of tuples that initilaizes the set of # tuples. pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \ for p, t in l ]) pkgs.update([x.location for e in top.elements \ for x in e.software]) try: os.makedirs(softdir) except EnvironmentError, e: raise service_error( "Cannot create software directory: %s" % e) # The actual copying. Everything's converted into a url for copying. for pkg in pkgs: loc = pkg scheme, host, path = urlparse(loc)[0:3] dest = os.path.basename(path) if not scheme: if not loc.startswith('/'): loc = "/%s" % loc loc = "file://%s" %loc try: u = urlopen(loc) except Exception, e: raise service_error(service_error.req, "Cannot open %s: %s" % (loc, e)) try: f = open("%s/%s" % (softdir, dest) , "w") self.log.debug("Writing %s/%s" % (softdir,dest) ) data = u.read(4096) while data: f.write(data) data = u.read(4096) f.close() u.close() except Exception, e: raise service_error(service_error.internal, "Could not copy %s: %s" % (loc, e)) path = re.sub("/tmp", "", linkpath) # XXX softmap[pkg] = \ "%s/%s/%s" %\ ( self.repo_url, path, dest) # Allow the individual segments to access the software. for tb in tbparams.keys(): self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) # Convert the software locations in the segments into the local # copies on this host for soft in [ s for tb in topo.values() \ for e in tb.elements \ if getattr(e, 'software', False) \ for s in e.software ]: if softmap.has_key(soft.location): soft.location = softmap[soft.location] def new_experiment(self, req, fid): """ The external interface to empty initial experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the avrious subsections using the lcasses above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ if not self.auth.check_attribute(fid, 'new'): raise service_error(service_error.access, "New access denied") try: tmpdir = tempfile.mkdtemp(prefix="split-") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") try: access_user = self.accessdb[fid] except KeyError: raise service_error(service_error.internal, "Access map and authorizer out of sync in " + \ "new_experiment for fedid %s" % fid) pid = "dummy" gid = "dummy" req = req.get('NewRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no NewRequestBody)") # Generate an ID for the experiment (slice) and a certificate that the # allocator can use to prove they own it. We'll ship it back through # the encrypted connection. (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) #now we're done with the tmpdir, and it should be empty if self.cleanup: self.log.debug("[new_experiment]: removing %s" % tmpdir) os.rmdir(tmpdir) else: self.log.debug("[new_experiment]: not removing %s" % tmpdir) eid = self.create_experiment_state(fid, req, expid, expcert, state='empty') # Let users touch the state self.auth.set_attribute(fid, expid) self.auth.set_attribute(expid, expid) # Override fedids can manipulate state as well for o in self.overrides: self.auth.set_attribute(o, expid) rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'empty', 'experimentAccess': { 'X509' : expcert } } return rv def create_experiment(self, req, fid): """ The external interface to experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the various subsections using the classes above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ req = req.get('CreateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no CreateRequestBody)") # Get the experiment access exp = req.get('experimentID', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] expid = key eid = None elif exp.has_key('localname'): key = exp['localname'] eid = key expid = None else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) # Install the testbed map entries supplied with the request into a copy # of the testbed map. tbmap = dict(self.tbmap) for m in req.get('testbedmap', []): if 'testbed' in m and 'uri' in m: tbmap[m['testbed']] = m['uri'] try: tmpdir = tempfile.mkdtemp(prefix="split-") os.mkdir(tmpdir+"/keys") except EnvironmentError: raise service_error(service_error.internal, "Cannot create tmp dir") gw_pubkey_base = "fed.%s.pub" % self.ssh_type gw_secretkey_base = "fed.%s" % self.ssh_type gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base tclfile = tmpdir + "/experiment.tcl" tbparams = { } try: access_user = self.accessdb[fid] except KeyError: raise service_error(service_error.internal, "Access map and authorizer out of sync in " + \ "create_experiment for fedid %s" % fid) pid = "dummy" gid = "dummy" # The tcl parser needs to read a file so put the content into that file descr=req.get('experimentdescription', None) if descr: file_content=descr.get('ns2description', None) if file_content: try: f = open(tclfile, 'w') f.write(file_content) f.close() except EnvironmentError: raise service_error(service_error.internal, "Cannot write temp experiment description") else: raise service_error(service_error.req, "Only ns2descriptions supported") else: raise service_error(service_error.req, "No experiment description") self.state_lock.acquire() if self.state.has_key(key): self.state[key]['experimentStatus'] = "starting" for e in self.state[key].get('experimentID',[]): if not expid and e.has_key('fedid'): expid = e['fedid'] elif not eid and e.has_key('localname'): eid = e['localname'] self.state_lock.release() if not (eid and expid): raise service_error(service_error.internal, "Cannot find local experiment info!?") try: # This catches exceptions to clear the placeholder if necessary try: self.generate_ssh_keys(gw_secretkey, self.ssh_type) except ValueError: raise service_error(service_error.server_config, "Bad key type (%s)" % self.ssh_type) # Copy the service request tb_services = [ s for s in req.get('service',[]) ] # Translate to topdl if self.splitter_url: self.log.debug("Calling remote topdl translator at %s" % \ self.splitter_url) top = self.remote_ns2topdl(self.splitter_url, file_content) else: tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', str(self.muxmax), '-m', 'dummy'] tclcmd.extend([pid, gid, eid, tclfile]) self.log.debug("running local splitter %s", " ".join(tclcmd)) # This is just fantastic. As a side effect the parser copies # tb_compat.tcl into the current directory, so that directory # must be writable by the fedd user. Doing this in the # temporary subdir ensures this is the case. tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, cwd=tmpdir) split_data = tclparser.stdout top = topdl.topology_from_xml(file=split_data, top="experiment") hosts, ip_allocator = self.allocate_ips_to_topo(top) # Find the testbeds to look up testbeds = set([ a.value for e in top.elements \ for a in e.attribute \ if a.attribute == 'testbed']) tb_hosts = { } for tb in testbeds: tb_hosts[tb] = [ e.name for e in top.elements \ if isinstance(e, topdl.Computer) and \ e.get_attribute('testbed') and \ e.get_attribute('testbed') == tb] masters = { } # testbeds exporting services pmasters = { } # Testbeds exporting services that # need portals for s in tb_services: # If this is a service request with the importall field # set, fill it out. if s.get('importall', False): s['import'] = [ tb for tb in testbeds \ if tb not in s.get('export',[])] del s['importall'] # Add the service to masters for tb in s.get('export', []): if s.get('name', None): if tb not in masters: masters[tb] = [ ] params = { } if 'fedAttr' in s: for a in s['fedAttr']: params[a.get('attribute', '')] = \ a.get('value','') fser = federated_service(name=s['name'], exporter=tb, importers=s.get('import',[]), params=params) if fser.name == 'hide_hosts' \ and 'hosts' not in fser.params: fser.params['hosts'] = \ ",".join(tb_hosts.get(fser.exporter, [])) masters[tb].append(fser) if fser.portal: if tb not in pmasters: pmasters[tb] = [ fser ] else: pmasters[tb].append(fser) else: self.log.error('Testbed service does not have name " + \ "and importers') allocated = { } # Testbeds we can access topo ={ } # Sub topologies connInfo = { } # Connection information self.get_access_to_testbeds(testbeds, access_user, allocated, tbparams, masters, tbmap) self.split_topology(top, topo, testbeds) # Copy configuration files into the remote file store # The config urlpath configpath = "/%s/config" % expid # The config file system location configdir ="%s%s" % ( self.repodir, configpath) try: os.makedirs(configdir) except EnvironmentError, e: raise service_error(service_error.internal, "Cannot create config directory: %s" % e) try: f = open("%s/hosts" % configdir, "w") f.write('\n'.join(hosts)) f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot write hosts file: %s" % e) try: copy_file("%s" % gw_pubkey, "%s/%s" % \ (configdir, gw_pubkey_base)) copy_file("%s" % gw_secretkey, "%s/%s" % \ (configdir, gw_secretkey_base)) except EnvironmentError, e: raise service_error(service_error.internal, "Cannot copy keyfiles: %s" % e) # Allow the individual testbeds to access the configuration files. for tb in tbparams.keys(): asignee = tbparams[tb]['allocID']['fedid'] for f in ("hosts", gw_secretkey_base, gw_pubkey_base): self.auth.set_attribute(asignee, "%s/%s" % (configpath, f)) part = experiment_partition(self.auth, self.store_url, tbmap, self.muxmax, self.direct_transit) part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator, connInfo, expid) # Now get access to the dynamic testbeds (those added above) for tb in [ t for t in topo if t not in allocated]: self.get_access(tb, None, tbparams, access_user, masters) allocated[tb] = 1 store_keys = topo[tb].get_attribute('store_keys') # Give the testbed access to keys it exports or imports if store_keys: for sk in store_keys.split(" "): self.auth.set_attribute(\ tbparams[tb]['allocID']['fedid'], sk) self.wrangle_software(expid, top, topo, tbparams) vtopo = topdl.topology_to_vtopo(top) vis = self.genviz(vtopo) # save federant information for k in allocated.keys(): tbparams[k]['federant'] = { 'name': [ { 'localname' : eid} ], 'allocID' : tbparams[k]['allocID'], 'uri': tbparams[k]['uri'], } self.state_lock.acquire() self.state[eid]['vtopo'] = vtopo self.state[eid]['vis'] = vis self.state[eid]['experimentdescription'] = \ { 'topdldescription': top.to_dict() } self.state[eid]['federant'] = \ [ tbparams[tb]['federant'] for tb in tbparams.keys() \ if tbparams[tb].has_key('federant') ] if self.state_filename: self.write_state() self.state_lock.release() except service_error, e: # If something goes wrong in the parse (usually an access error) # clear the placeholder state. From here on out the code delays # exceptions. Failing at this point returns a fault to the remote # caller. self.state_lock.acquire() del self.state[eid] del self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() raise e # Start the background swapper and return the starting state. From # here on out, the state will stick around a while. # Let users touch the state self.auth.set_attribute(fid, expid) self.auth.set_attribute(expid, expid) # Override fedids can manipulate state as well for o in self.overrides: self.auth.set_attribute(o, expid) # Create a logger that logs to the experiment's state object as well as # to the main log file. alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) alloc_collector = self.list_log(self.state[eid]['log']) h = logging.StreamHandler(alloc_collector) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) attrs = [ { 'attribute': 'ssh_pubkey', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, gw_pubkey_base) }, { 'attribute': 'ssh_secretkey', 'value': '%s/%s/config/%s' % \ (self.repo_url, expid, gw_secretkey_base) }, { 'attribute': 'hosts', 'value': '%s/%s/config/hosts' % \ (self.repo_url, expid) }, ] # transit and disconnected testbeds may not have a connInfo entry. # Fill in the blanks. for t in allocated.keys(): if not connInfo.has_key(t): connInfo[t] = { } # Start a thread to do the resource allocation t = Thread(target=self.allocate_resources, args=(allocated, masters, eid, expid, tbparams, top, topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo, tbmap), name=eid) t.start() rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'starting', } return rv def get_experiment_fedid(self, key): """ find the fedid associated with the localname key in the state database. """ rv = None self.state_lock.acquire() if self.state.has_key(key): if isinstance(self.state[key], dict): try: kl = [ f['fedid'] for f in \ self.state[key]['experimentID']\ if f.has_key('fedid') ] except KeyError: self.state_lock.release() raise service_error(service_error.internal, "No fedid for experiment %s when getting "+\ "fedid(!?)" % key) if len(kl) == 1: rv = kl[0] else: self.state_lock.release() raise service_error(service_error.internal, "multiple fedids for experiment %s when " +\ "getting fedid(!?)" % key) else: self.state_lock.release() raise service_error(service_error.internal, "Unexpected state for %s" % key) self.state_lock.release() return rv def check_experiment_access(self, fid, key): """ Confirm that the fid has access to the experiment. Though a request may be made in terms of a local name, the access attribute is always the experiment's fedid. """ if not isinstance(key, fedid): key = self.get_experiment_fedid(key) if self.auth.check_attribute(fid, key): return True else: raise service_error(service_error.access, "Access Denied") def get_handler(self, path, fid): self.log.info("Get handler %s %s" % (path, fid)) if self.auth.check_attribute(fid, path): return ("%s/%s" % (self.repodir, path), "application/binary") else: return (None, None) def get_vtopo(self, req, fid): """ Return the stored virtual topology for this experiment """ rv = None state = None req = req.get('VtopoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VtopoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) self.state_lock.acquire() if self.state.has_key(key): if self.state[key].has_key('vtopo'): rv = { 'experiment' : {keytype: key },\ 'vtopo': self.state[key]['vtopo'],\ } else: state = self.state[key]['experimentStatus'] self.state_lock.release() if rv: return rv else: if state: raise service_error(service_error.partial, "Not ready: %s" % state) else: raise service_error(service_error.req, "No such experiment") def get_vis(self, req, fid): """ Return the stored visualization for this experiment """ rv = None state = None req = req.get('VisRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VisRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) self.state_lock.acquire() if self.state.has_key(key): if self.state[key].has_key('vis'): rv = { 'experiment' : {keytype: key },\ 'vis': self.state[key]['vis'],\ } else: state = self.state[key]['experimentStatus'] self.state_lock.release() if rv: return rv else: if state: raise service_error(service_error.partial, "Not ready: %s" % state) else: raise service_error(service_error.req, "No such experiment") def clean_info_response(self, rv): """ Remove the information in the experiment's state object that is not in the info response. """ # Remove the owner info (should always be there, but...) if rv.has_key('owner'): del rv['owner'] # Convert the log into the allocationLog parameter and remove the # log entry (with defensive programming) if rv.has_key('log'): rv['allocationLog'] = "".join(rv['log']) del rv['log'] else: rv['allocationLog'] = "" if rv['experimentStatus'] != 'active': if rv.has_key('federant'): del rv['federant'] else: # remove the allocationID and uri info from each federant for f in rv.get('federant', []): if f.has_key('allocID'): del f['allocID'] if f.has_key('uri'): del f['uri'] return rv def get_info(self, req, fid): """ Return all the stored info about this experiment """ rv = None req = req.get('InfoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no InfoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) # The state may be massaged by the service function that called # get_info (e.g., encoded for XMLRPC transport) so send a copy of the # state. self.state_lock.acquire() if self.state.has_key(key): rv = copy.deepcopy(self.state[key]) self.state_lock.release() if rv: return self.clean_info_response(rv) else: raise service_error(service_error.req, "No such experiment") def get_multi_info(self, req, fid): """ Return all the stored info that this fedid can access """ rv = { 'info': [ ] } self.state_lock.acquire() for key in [ k for k in self.state.keys() if isinstance(k, fedid)]: try: self.check_experiment_access(fid, key) except service_error, e: if e.code == service_error.access: continue else: self.state_lock.release() raise e if self.state.has_key(key): e = copy.deepcopy(self.state[key]) e = self.clean_info_response(e) rv['info'].append(e) self.state_lock.release() return rv def remove_dirs(self, dir): """ Remove the directory tree and all files rooted at dir. Log any errors, but continue. """ self.log.debug("[removedirs]: removing %s" % dir) try: for path, dirs, files in os.walk(dir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(dir) except EnvironmentError, e: self.log.error("Error deleting directory tree in %s" % e); def terminate_experiment(self, req, fid): """ Swap this experiment out on the federants and delete the shared information """ tbparams = { } req = req.get('TerminateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no TerminateRequestBody)") force = req.get('force', False) exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) dealloc_list = [ ] # Create a logger that logs to the dealloc_list as well as to the main # log file. dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) h = logging.StreamHandler(self.list_log(dealloc_list)) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) dealloc_log.addHandler(h) self.state_lock.acquire() fed_exp = self.state.get(key, None) repo = None if fed_exp: # This branch of the conditional holds the lock to generate a # consistent temporary tbparams variable to deallocate experiments. # It releases the lock to do the deallocations and reacquires it to # remove the experiment state when the termination is complete. # First make sure that the experiment creation is complete. status = fed_exp.get('experimentStatus', None) if status: if status in ('starting', 'terminating'): if not force: self.state_lock.release() raise service_error(service_error.partial, 'Experiment still being created or destroyed') else: self.log.warning('Experiment in %s state ' % status + \ 'being terminated by force.') else: # No status??? trouble self.state_lock.release() raise service_error(service_error.internal, "Experiment has no status!?") ids = [] # experimentID is a list of dicts that are self-describing # identifiers. This finds all the fedids and localnames - the # keys of self.state - and puts them into ids. for id in fed_exp.get('experimentID', []): if id.has_key('fedid'): ids.append(id['fedid']) repo = "%s" % id['fedid'] if id.has_key('localname'): ids.append(id['localname']) # Collect the allocation/segment ids into a dict keyed by the fedid # of the allocation (or a monotonically increasing integer) that # contains a tuple of uri, aid (which is a dict...) for i, fed in enumerate(fed_exp.get('federant', [])): try: uri = fed['uri'] aid = fed['allocID'] k = fed['allocID'].get('fedid', i) except KeyError, e: continue tbparams[k] = (uri, aid) fed_exp['experimentStatus'] = 'terminating' if self.state_filename: self.write_state() self.state_lock.release() # Stop everyone. NB, wait_for_all waits until a thread starts and # then completes, so we can't wait if nothing starts. So, no # tbparams, no start. if len(tbparams) > 0: thread_pool = self.thread_pool(self.nthreads) for k in tbparams.keys(): # Create and start a thread to stop the segment thread_pool.wait_for_slot() uri, aid = tbparams[k] t = self.pooled_thread(\ target=self.terminate_segment(log=dealloc_log, testbed=uri, cert_file=self.cert_file, cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, caller=self.call_TerminateSegment), args=(uri, aid), name=k, pdata=thread_pool, trace_file=self.trace_file) t.start() # Wait for completions thread_pool.wait_for_all_done() # release the allocations (failed experiments have done this # already, and starting experiments may be in odd states, so we # ignore errors releasing those allocations try: for k in tbparams.keys(): # This releases access by uri uri, aid = tbparams[k] self.release_access(None, aid, uri=uri) except service_error, e: if status != 'failed' and not force: raise e # Remove the terminated experiment self.state_lock.acquire() for id in ids: if self.state.has_key(id): del self.state[id] if self.state_filename: self.write_state() self.state_lock.release() # Delete any synch points associated with this experiment. All # synch points begin with the fedid of the experiment. fedid_keys = set(["fedid:%s" % f for f in ids \ if isinstance(f, fedid)]) for k in self.synch_store.all_keys(): try: if len(k) > 45 and k[0:46] in fedid_keys: self.synch_store.del_value(k) except synch_store.BadDeletionError: pass self.write_store() # Remove software and other cached stuff from the filesystem. if repo: self.remove_dirs("%s/%s" % (self.repodir, repo)) return { 'experiment': exp , 'deallocationLog': "".join(dealloc_list), } else: # Don't forget to release the lock self.state_lock.release() raise service_error(service_error.req, "No saved state") def GetValue(self, req, fid): """ Get a value from the synchronized store """ req = req.get('GetValueRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no GetValueRequestBody)") name = req['name'] wait = req['wait'] rv = { 'name': name } if self.auth.check_attribute(fid, name): self.log.debug("[GetValue] asking for %s " % name) try: v = self.synch_store.get_value(name, wait) except synch_store.RevokedKeyError: # No more synch on this key raise service_error(service_error.federant, "Synch key %s revoked" % name) if v is not None: rv['value'] = v self.log.debug("[GetValue] got %s from %s" % (v, name)) return rv else: raise service_error(service_error.access, "Access Denied") def SetValue(self, req, fid): """ Set a value in the synchronized store """ req = req.get('SetValueRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no SetValueRequestBody)") name = req['name'] v = req['value'] if self.auth.check_attribute(fid, name): try: self.synch_store.set_value(name, v) self.write_store() self.log.debug("[SetValue] set %s to %s" % (name, v)) except synch_store.CollisionError: # Translate into a service_error raise service_error(service_error.req, "Value already set: %s" %name) except synch_store.RevokedKeyError: # No more synch on this key raise service_error(service_error.federant, "Synch key %s revoked" % name) return { 'name': name, 'value': v } else: raise service_error(service_error.access, "Access Denied")