#!/usr/local/bin/python import os,sys import re import random import string import subprocess import tempfile import copy import pickle import logging import signal import time import traceback # For parsing visualization output and splitter output import xml.parsers.expat from threading import * from subprocess import * from urlparse import urlparse from urllib2 import urlopen from util import * from fedid import fedid, generate_fedid from remote_service import xmlrpc_handler, soap_handler, service_caller from service_error import service_error import topdl from ip_allocator import ip_allocator from ip_addr import ip_addr class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.experiment_control") fl.addHandler(nullHandler()) class experiment_control_local: """ Control of experiments that this system can directly access. Includes experiment creation, termination and information dissemination. Thred safe. """ class ssh_cmd_timeout(RuntimeError): pass class list_log: """ Provide an interface that lets logger.StreamHandler s write to a list of strings. """ def __init__(self, l=[]): """ Link to an existing list or just create a log """ self.ll = l self.lock = Lock() def write(self, str): """ Add the string to the log. Lock for consistency. """ self.lock.acquire() self.ll.append(str) self.lock.release() def flush(self): """ No-op that StreamHandlers expect """ pass class thread_pool: """ A class to keep track of a set of threads all invoked for the same task. Manages the mutual exclusion of the states. """ def __init__(self, nthreads): """ Start a pool. """ self.changed = Condition() self.started = 0 self.terminated = 0 self.nthreads = nthreads def acquire(self): """ Get the pool's lock. """ self.changed.acquire() def release(self): """ Release the pool's lock. """ self.changed.release() def wait(self, timeout = None): """ Wait for a pool thread to start or stop. """ self.changed.wait(timeout) def start(self): """ Called by a pool thread to report starting. """ self.changed.acquire() self.started += 1 self.changed.notifyAll() self.changed.release() def terminate(self): """ Called by a pool thread to report finishing. """ self.changed.acquire() self.terminated += 1 self.changed.notifyAll() self.changed.release() def clear(self): """ Clear all pool data. """ self.changed.acquire() self.started = 0 self.terminated =0 self.changed.notifyAll() self.changed.release() def wait_for_slot(self): """ Wait until we have a free slot to start another pooled thread """ self.acquire() while self.started - self.terminated >= self.nthreads: self.wait() self.release() def wait_for_all_done(self): """ Wait until all active threads finish (and at least one has started) """ self.acquire() while self.started == 0 or self.started > self.terminated: self.wait() self.release() class pooled_thread(Thread): """ One of a set of threads dedicated to a specific task. Uses the thread_pool class above for coordination. """ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, pdata=None, trace_file=None): Thread.__init__(self, group, target, name, args, kwargs) self.rv = None # Return value of the ops in this thread self.exception = None # Exception that terminated this thread self.target=target # Target function to run on start() self.args = args # Args to pass to target self.kwargs = kwargs # Additional kw args self.pdata = pdata # thread_pool for this class # Logger for this thread self.log = logging.getLogger("fedd.experiment_control") def run(self): """ Emulate Thread.run, except add pool data manipulation and error logging. """ if self.pdata: self.pdata.start() if self.target: try: self.rv = self.target(*self.args, **self.kwargs) except service_error, s: self.exception = s self.log.error("Thread exception: %s %s" % \ (s.code_string(), s.desc)) except: self.exception = sys.exc_info()[1] self.log.error(("Unexpected thread exception: %s" +\ "Trace %s") % (self.exception,\ traceback.format_exc())) if self.pdata: self.pdata.terminate() call_RequestAccess = service_caller('RequestAccess') call_ReleaseAccess = service_caller('ReleaseAccess') call_Ns2Split = service_caller('Ns2Split') def __init__(self, config=None, auth=None): """ Intialize the various attributes, most from the config object """ def parse_tarfile_list(tf): """ Parse a tarfile list from the configuration. This is a set of paths and tarfiles separated by spaces. """ rv = [ ] if tf is not None: tl = tf.split() while len(tl) > 1: p, t = tl[0:2] del tl[0:2] rv.append((p, t)) return rv self.thread_with_rv = experiment_control_local.pooled_thread self.thread_pool = experiment_control_local.thread_pool self.list_log = experiment_control_local.list_log self.cert_file = config.get("experiment_control", "cert_file") if self.cert_file: self.cert_pwd = config.get("experiment_control", "cert_pwd") else: self.cert_file = config.get("globals", "cert_file") self.cert_pwd = config.get("globals", "cert_pwd") self.trusted_certs = config.get("experiment_control", "trusted_certs") \ or config.get("globals", "trusted_certs") self.exp_stem = "fed-stem" self.log = logging.getLogger("fedd.experiment_control") set_log_level(config, "experiment_control", self.log) self.muxmax = 2 self.nthreads = 2 self.randomize_experiments = False self.splitter = None self.ssh_keygen = "/usr/bin/ssh-keygen" self.ssh_identity_file = None self.debug = config.getboolean("experiment_control", "create_debug") self.state_filename = config.get("experiment_control", "experiment_state") self.splitter_url = config.get("experiment_control", "splitter_uri") self.fedkit = parse_tarfile_list(\ config.get("experiment_control", "fedkit")) self.gatewaykit = parse_tarfile_list(\ config.get("experiment_control", "gatewaykit")) accessdb_file = config.get("experiment_control", "accessdb") self.ssh_pubkey_file = config.get("experiment_control", "ssh_pubkey_file") self.ssh_privkey_file = config.get("experiment_control", "ssh_privkey_file") # NB for internal master/slave ops, not experiment setup self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa") self.overrides = set([]) ovr = config.get('experiment_control', 'overrides') if ovr: for o in ovr.split(","): o = o.strip() if o.startswith('fedid:'): o = o[len('fedid:'):] self.overrides.add(fedid(hexstr=o)) self.state = { } self.state_lock = Lock() self.tclsh = "/usr/local/bin/otclsh" self.tcl_splitter = config.get("splitter", "tcl_splitter") or \ config.get("experiment_control", "tcl_splitter", "/usr/testbed/lib/ns2ir/parse.tcl") mapdb_file = config.get("experiment_control", "mapdb") self.trace_file = sys.stderr self.def_expstart = \ "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\ "/tmp/federate"; self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\ "FEDDIR/hosts"; self.def_gwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\ "/tmp/bridge.log"; self.def_mgwstart = \ "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\ "/tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; self.local_access = { } if auth: self.auth = auth else: self.log.error(\ "[access]: No authorizer initialized, creating local one.") auth = authorizer() if self.ssh_pubkey_file: try: f = open(self.ssh_pubkey_file, 'r') self.ssh_pubkey = f.read() f.close() except IOError: raise service_error(service_error.internal, "Cannot read sshpubkey") else: raise service_error(service_error.internal, "No SSH public key file?") if not self.ssh_privkey_file: raise service_error(service_error.internal, "No SSH public key file?") if mapdb_file: self.read_mapdb(mapdb_file) else: self.log.warn("[experiment_control] No testbed map, using defaults") self.tbmap = { 'deter':'https://users.isi.deterlab.net:23235', 'emulab':'https://users.isi.deterlab.net:23236', 'ucb':'https://users.isi.deterlab.net:23237', } if accessdb_file: self.read_accessdb(accessdb_file) else: raise service_error(service_error.internal, "No accessdb specified in config") # Grab saved state. OK to do this w/o locking because it's read only # and only one thread should be in existence that can see self.state at # this point. if self.state_filename: self.read_state() # Dispatch tables self.soap_services = {\ 'Create': soap_handler('Create', self.new_create_experiment), 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 'Vis': soap_handler('Vis', self.get_vis), 'Info': soap_handler('Info', self.get_info), 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 'Terminate': soap_handler('Terminate', self.terminate_experiment), } self.xmlrpc_services = {\ 'Create': xmlrpc_handler('Create', self.new_create_experiment), 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 'Vis': xmlrpc_handler('Vis', self.get_vis), 'Info': xmlrpc_handler('Info', self.get_info), 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 'Terminate': xmlrpc_handler('Terminate', self.terminate_experiment), } def copy_file(self, src, dest, size=1024): """ Exceedingly simple file copy. """ s = open(src,'r') d = open(dest, 'w') buf = "x" while buf != "": buf = s.read(size) d.write(buf) s.close() d.close() # Call while holding self.state_lock def write_state(self): """ Write a new copy of experiment state after copying the existing state to a backup. State format is a simple pickling of the state dictionary. """ if os.access(self.state_filename, os.W_OK): self.copy_file(self.state_filename, \ "%s.bak" % self.state_filename) try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except IOError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) except TypeError, e: self.log.error("Pickling problem (TypeError): %s" % e) # Call while holding self.state_lock def read_state(self): """ Read a new copy of experiment state. Old state is overwritten. State format is a simple pickling of the state dictionary. """ try: f = open(self.state_filename, "r") self.state = pickle.load(f) self.log.debug("[read_state]: Read state from %s" % \ self.state_filename) except IOError, e: self.log.warning("[read_state]: No saved state: Can't open %s: %s"\ % (self.state_filename, e)) except pickle.UnpicklingError, e: self.log.warning(("[read_state]: No saved state: " + \ "Unpickling failed: %s") % e) for k in self.state.keys(): try: # This list should only have one element in it, but phrasing it # as a for loop doesn't cost much, really. We have to find the # fedid elements anyway. for eid in [ f['fedid'] \ for f in self.state[k]['experimentID']\ if f.has_key('fedid') ]: self.auth.set_attribute(self.state[k]['owner'], eid) # allow overrides to control experiments as well for o in self.overrides: self.auth.set_attribute(o, eid) except KeyError, e: self.log.warning("[read_state]: State ownership or identity " +\ "misformatted in %s: %s" % (self.state_filename, e)) def read_accessdb(self, accessdb_file): """ Read the mapping from fedids that can create experiments to their name in the 3-level access namespace. All will be asserted from this testbed and can include the local username and porject that will be asserted on their behalf by this fedd. Each fedid is also added to the authorization system with the "create" attribute. """ self.accessdb = {} # These are the regexps for parsing the db name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+" project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$") user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ "\s*->\s*(" + name_expr + ")\s*$") lineno = 0 # Parse the mappings and store in self.authdb, a dict of # fedid -> (proj, user) try: f = open(accessdb_file, "r") for line in f: lineno += 1 line = line.strip() if len(line) == 0 or line.startswith('#'): continue m = project_line.match(line) if m: fid = fedid(hexstr=m.group(1)) project, user = m.group(2,3) if not self.accessdb.has_key(fid): self.accessdb[fid] = [] self.accessdb[fid].append((project, user)) continue m = user_line.match(line) if m: fid = fedid(hexstr=m.group(1)) project = None user = m.group(2) if not self.accessdb.has_key(fid): self.accessdb[fid] = [] self.accessdb[fid].append((project, user)) continue self.log.warn("[experiment_control] Error parsing access " +\ "db %s at line %d" % (accessdb_file, lineno)) except IOError: raise service_error(service_error.internal, "Error opening/reading %s as experiment " +\ "control accessdb" % accessdb_file) f.close() # Initialize the authorization attributes for fid in self.accessdb.keys(): self.auth.set_attribute(fid, 'create') def read_mapdb(self, file): """ Read a simple colon separated list of mappings for the label-to-testbed-URL mappings. Clears or creates self.tbmap. """ self.tbmap = { } lineno =0 try: f = open(file, "r") for line in f: lineno += 1 line = line.strip() if line.startswith('#') or len(line) == 0: continue try: label, url = line.split(':', 1) self.tbmap[label] = url except ValueError, e: self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\ "map db: %s %s" % (lineno, line, e)) except IOError, e: self.log.warning("[read_mapdb]: No saved map database: Can't " +\ "open %s: %s" % (file, e)) f.close() class emulab_segment: def __init__(self, log=None, keyfile=None, debug=False): self.log = log or logging.getLogger(\ 'fedd.experiment_control.emulab_segment') self.ssh_privkey_file = keyfile self.debug = debug self.ssh_exec="/usr/bin/ssh" self.scp_exec = "/usr/bin/scp" self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout def scp_file(self, file, user, host, dest=""): """ scp a file to the remote host. If debug is set the action is only logged. """ scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking yes', '-i', self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)] rv = 0 try: dnull = open("/dev/null", "w") except IOError: self.log.debug("[ssh_file]: failed to open " + \ "/dev/null for redirect") dnull = Null self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) if not self.debug: rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True, close_fds=True) return rv == 0 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): """ Run a remote command on host as user. If debug is set, the action is only logged. Commands are run without stdin, to avoid stray SIGTTINs. """ sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \ "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ (self.ssh_exec, self.ssh_privkey_file, user, host, cmd) try: dnull = open("/dev/null", "w") except IOError: self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ "for redirect") dnull = Null self.log.debug("[ssh_cmd]: %s" % sh_str) if not self.debug: if dnull: sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull, close_fds=True) else: sub = Popen(sh_str, shell=True, close_fds=True) if timeout: i = 0 rv = sub.poll() while i < timeout: if rv is not None: break else: time.sleep(1) rv = sub.poll() i += 1 else: self.log.debug("Process exceeded runtime: %s" % sh_str) os.kill(sub.pid, signal.SIGKILL) raise self.ssh_cmd_timeout(); return rv == 0 else: return sub.wait() == 0 else: if timeout == 0: self.log.debug("debug timeout raised on %s " % sh_str) raise self.ssh_cmd_timeout() else: return True class start_segment(emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): experiment_control_local.emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) def create_config_tree(self, src_dir, dest_dir, script): """ Append commands to script that will create the directory hierarchy on the remote federant. """ if os.path.isdir(src_dir): print >>script, "mkdir -p %s" % dest_dir print >>script, "chmod 770 %s" % dest_dir for f in os.listdir(src_dir): if os.path.isdir(f): self.create_config_tree("%s/%s" % (src_dir, f), "%s/%s" % (dest_dir, f), script) else: self.log.debug("[create_config_tree]: Not a directory: %s" \ % src_dir) def ship_configs(self, host, user, src_dir, dest_dir): """ Copy federant-specific configuration files to the federant. """ for f in os.listdir(src_dir): if os.path.isdir(f): if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), "%s/%s" % (dest_dir, f)): return False else: if not self.scp_file("%s/%s" % (src_dir, f), user, host, dest_dir): return False return True def get_state(self, user, host, tb, pid, eid): # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # Regular expressions to parse the expinfo response state_re = re.compile("State:\s+(\w+)") no_exp_re = re.compile("^No\s+such\s+experiment") swapping_re = re.compile("^No\s+information\s+available.") state = None # Experiment state parsed from expinfo # The expinfo ssh command. Note the identity restriction to use # only the identity provided in the pubkey given. cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking yes', '-i', self.ssh_privkey_file, "%s@%s" % (user, host), expinfo_exec, pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except IOError, e: self.log.error("[get_state]: can't open /dev/null: %s" %e) if self.debug: state = 'swapped' rv = 0 else: status = Popen(cmd, stdout=PIPE, stderr=dev_null, close_fds=True) for line in status.stdout: m = state_re.match(line) if m: state = m.group(1) else: for reg, st in ((no_exp_re, "none"), (swapping_re, "swapping")): m = reg.match(line) if m: state = st rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 and state != 'none': raise service_error(service_error.internal, "Cannot get status of segment %s:%s/%s" % \ (tb, pid, eid)) elif state not in ('active', 'swapped', 'swapping', 'none'): raise service_error(service_error.internal, "Cannot get status of segment %s:%s/%s" % \ (tb, pid, eid)) else: return state def __call__(self, tb, eid, tbparams, tmpdir, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ # ops node in the federant host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) user = tbparams[tb]['user'] # federant user pid = tbparams[tb]['project'] # federant project # XXX base_confs = ( "hosts",) tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description # Configuration directories on the remote machine proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) state = self.get_state(user, host, tb, pid, eid) self.log.debug("[start_segment]: %s: %s" % (tb, state)) self.log.info("[start_segment]:transferring experiment to %s" % tb) if not self.scp_file("%s/%s/%s" % \ (tmpdir, tb, tclfile), user, host): return False if state == 'none': # Create a null copy of the experiment so that we capture any # logs there if the modify fails. Emulab software discards the # logs from a failed startexp if not self.scp_file("%s/null.tcl" % tmpdir, user, host): return False self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) timedout = False try: if not self.ssh_cmd(user, host, ("/usr/testbed/bin/startexp -i -f -w -p %s " + "-e %s null.tcl") % (pid, eid), "startexp", timeout=60 * 10): return False except self.ssh_cmd_timeout: timedout = True if timedout: state = self.get_state(user, host, tb, pid, eid) if state != "swapped": return False # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except IOError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % proj_dir # Clear and create the tarfiles and rpm directories for d in (tarfiles_dir, rpms_dir): print >>scriptfile, "/bin/rm -rf %s/*" % d print >>scriptfile, "mkdir -p %s" % d print >>scriptfile, 'mkdir -p %s' % proj_dir self.create_config_tree("%s/%s" % (tmpdir, tb), proj_dir, scriptfile) if os.path.isdir("%s/tarfiles" % tmpdir): self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, scriptfile) if os.path.isdir("%s/rpms" % tmpdir): self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, scriptfile) print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if self.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False for f in base_confs: if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), proj_dir): return False if os.path.isdir("%s/tarfiles" % tmpdir): if not self.ship_configs(host, user, "%s/tarfiles" % tmpdir, tarfiles_dir): return False if os.path.isdir("%s/rpms" % tmpdir): if not self.ship_configs(host, user, "%s/rpms" % tmpdir, tarfiles_dir): return False # Stage the new configuration (active experiments will stay swapped # in now) self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ (pid, eid, tclfile), "modexp", timeout= 60 * 10): return False except self.ssh_cmd_timeout: self.log.error("Modify command failed to complete in time") # There's really no way to see if this succeeded or failed, so # if it hangs, assume the worst. return False # Active experiments are still swapped, this swaps the others in. if state != 'active': self.log.info("[start_segment]: Swapping %s in on %s" % \ (eid, tb)) timedout = False try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp", timeout=10*60): return False except self.ssh_cmd_timeout: timedout = True # If the command was terminated, but completed successfully, # report success. if timedout: self.log.debug("[start_segment]: swapin timed out " +\ "checking state") state = self.get_state(user, host, tb, pid, eid) self.log.debug("[start_segment]: state is %s" % state) return state == 'active' # Everything has gone OK. return True class stop_segment(emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): experiment_control_local.emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) def __call__(self, tb, eid, tbparams): """ Stop a sub experiment by calling swapexp on the federant """ user = tbparams[tb]['user'] host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) pid = tbparams[tb]['project'] self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) rv = False try: # Clean out tar files: we've gone over quota in the past self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid)) self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \ (pid, eid)) rv = self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) except self.ssh_cmd_timeout: rv = False return rv def generate_ssh_keys(self, dest, type="rsa" ): """ Generate a set of keys for the gateways to use to talk. Keys are of type type and are stored in the required dest file. """ valid_types = ("rsa", "dsa") t = type.lower(); if t not in valid_types: raise ValueError cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] try: trace = open("/dev/null", "w") except IOError: raise service_error(service_error.internal, "Cannot open /dev/null??"); # May raise CalledProcessError self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) rv = call(cmd, stdout=trace, stderr=trace, close_fds=True) if rv != 0: raise service_error(service_error.internal, "Cannot generate nonce ssh keys. %s return code %d" \ % (self.ssh_keygen, rv)) def gentopo(self, str): """ Generate the topology dtat structure from the splitter's XML representation of it. The topology XML looks like: ip1:ip2 node:port """ class topo_parse: """ Parse the topology XML and create the dats structure. """ def __init__(self): # Typing of the subelements for data conversion self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') self.int_subelements = ( 'bandwidth',) self.float_subelements = ( 'delay',) # The final data structure self.nodes = [ ] self.lans = [ ] self.topo = { \ 'node': self.nodes,\ 'lan' : self.lans,\ } self.element = { } # Current element being created self.chars = "" # Last text seen def end_element(self, name): # After each sub element the contents is added to the current # element or to the appropriate list. if name == 'node': self.nodes.append(self.element) self.element = { } elif name == 'lan': self.lans.append(self.element) self.element = { } elif name in self.str_subelements: self.element[name] = self.chars self.chars = "" elif name in self.int_subelements: self.element[name] = int(self.chars) self.chars = "" elif name in self.float_subelements: self.element[name] = float(self.chars) self.chars = "" def found_chars(self, data): self.chars += data.rstrip() tp = topo_parse(); parser = xml.parsers.expat.ParserCreate() parser.EndElementHandler = tp.end_element parser.CharacterDataHandler = tp.found_chars parser.Parse(str) return tp.topo def genviz(self, topo): """ Generate the visualization the virtual topology """ neato = "/usr/local/bin/neato" # These are used to parse neato output and to create the visualization # file. vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"') vis_fmt = "%s%s%s" + \ "%s" try: # Node names nodes = [ n['vname'] for n in topo['node'] ] topo_lans = topo['lan'] except KeyError: raise service_error(service_error.internal, "Bad topology") lans = { } links = { } # Walk through the virtual topology, organizing the connections into # 2-node connections (links) and more-than-2-node connections (lans). # When a lan is created, it's added to the list of nodes (there's a # node in the visualization for the lan). for l in topo_lans: if links.has_key(l['vname']): if len(links[l['vname']]) < 2: links[l['vname']].append(l['vnode']) else: nodes.append(l['vname']) lans[l['vname']] = links[l['vname']] del links[l['vname']] lans[l['vname']].append(l['vnode']) elif lans.has_key(l['vname']): lans[l['vname']].append(l['vnode']) else: links[l['vname']] = [ l['vnode'] ] # Open up a temporary file for dot to turn into a visualization try: df, dotname = tempfile.mkstemp() dotfile = os.fdopen(df, 'w') except IOError: raise service_error(service_error.internal, "Failed to open file in genviz") try: dnull = open('/dev/null', 'w') except IOError: service_error(service_error.internal, "Failed to open /dev/null in genviz") # Generate a dot/neato input file from the links, nodes and lans try: print >>dotfile, "graph G {" for n in nodes: print >>dotfile, '\t"%s"' % n for l in links.keys(): print >>dotfile, '\t"%s" -- "%s"' % tuple(links[l]) for l in lans.keys(): for n in lans[l]: print >>dotfile, '\t "%s" -- "%s"' % (n,l) print >>dotfile, "}" dotfile.close() except TypeError: raise service_error(service_error.internal, "Single endpoint link in vtopo") except IOError: raise service_error(service_error.internal, "Cannot write dot file") # Use dot to create a visualization dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE, stderr=dnull, close_fds=True) dnull.close() # Translate dot to vis format vis_nodes = [ ] vis = { 'node': vis_nodes } for line in dot.stdout: m = vis_re.match(line) if m: vn = m.group(1) vis_node = {'name': vn, \ 'x': float(m.group(2)),\ 'y' : float(m.group(3)),\ } if vn in links.keys() or vn in lans.keys(): vis_node['type'] = 'lan' else: vis_node['type'] = 'node' vis_nodes.append(vis_node) rv = dot.wait() os.remove(dotname) if rv == 0 : return vis else: return None def get_access(self, tb, nodes, user, tbparam, master, export_project, access_user): """ Get access to testbed through fedd and set the parameters for that tb """ uri = self.tbmap.get(tb, None) if not uri: raise service_error(serice_error.server_config, "Unknown testbed: %s" % tb) # currently this lumps all users into one service access group service_keys = [ a for u in user \ for a in u.get('access', []) \ if a.has_key('sshPubkey')] if len(service_keys) == 0: raise service_error(service_error.req, "Must have at least one SSH pubkey for services") for p, u in access_user: self.log.debug(("[get_access] Attempting access from (%s, %s) " + \ "to %s") % ((p or "None"), u, uri)) if p: # Request with user and project specified req = {\ 'destinationTestbed' : { 'uri' : uri }, 'project': { 'name': {'localname': p}, 'user': [ {'userID': { 'localname': u } } ], }, 'user': user, 'allocID' : { 'localname': 'test' }, 'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ], 'serviceAccess' : service_keys } else: # Request with only user specified req = {\ 'destinationTestbed' : { 'uri' : uri }, 'user': [ {'userID': { 'localname': u } } ], 'allocID' : { 'localname': 'test' }, 'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ], 'serviceAccess' : service_keys } if tb == master: # NB, the export_project parameter is a dict that includes # the type req['exportProject'] = export_project # node resources if any if nodes != None and len(nodes) > 0: rnodes = [ ] for n in nodes: rn = { } image, hw, count = n.split(":") if image: rn['image'] = [ image ] if hw: rn['hardware'] = [ hw ] if count and int(count) >0 : rn['count'] = int(count) rnodes.append(rn) req['resources']= { } req['resources']['node'] = rnodes try: if self.local_access.has_key(uri): # Local access call req = { 'RequestAccessRequestBody' : req } r = self.local_access[uri].RequestAccess(req, fedid(file=self.cert_file)) r = { 'RequestAccessResponseBody' : r } else: r = self.call_RequestAccess(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) except service_error, e: if e.code == service_error.access: self.log.debug("[get_access] Access denied") r = None continue else: raise e if r.has_key('RequestAccessResponseBody'): # Through to here we have a valid response, not a fault. # Access denied is a fault, so something better or worse than # access denied has happened. r = r['RequestAccessResponseBody'] self.log.debug("[get_access] Access granted") break else: raise service_error(service_error.protocol, "Bad proxy response") if not r: raise service_error(service_error.access, "Access denied by %s (%s)" % (tb, uri)) e = r['emulab'] p = e['project'] tbparam[tb] = { "boss": e['boss'], "host": e['ops'], "domain": e['domain'], "fs": e['fileServer'], "eventserver": e['eventServer'], "project": unpack_id(p['name']), "emulab" : e, "allocID" : r['allocID'], } # Make the testbed name be the label the user applied p['testbed'] = {'localname': tb } for u in p['user']: role = u.get('role', None) if role == 'experimentCreation': tbparam[tb]['user'] = unpack_id(u['userID']) break else: raise service_error(service_error.internal, "No createExperimentUser from %s" %tb) # Add attributes to barameter space. We don't allow attributes to # overlay any parameters already installed. for a in e['fedAttr']: try: if a['attribute'] and isinstance(a['attribute'], basestring)\ and not tbparam[tb].has_key(a['attribute'].lower()): tbparam[tb][a['attribute'].lower()] = a['value'] except KeyError: self.log.error("Bad attribute in response: %s" % a) def release_access(self, tb, aid): """ Release access to testbed through fedd """ uri = self.tbmap.get(tb, None) if not uri: raise service_error(serice_error.server_config, "Unknown testbed: %s" % tb) if self.local_access.has_key(uri): resp = self.local_access[uri].ReleaseAccess(\ { 'ReleaseAccessRequestBody' : {'allocID': aid},}, fedid(file=self.cert_file)) resp = { 'ReleaseAccessResponseBody': resp } else: resp = self.call_ReleaseAccess(uri, {'allocID': aid}, self.cert_file, self.cert_pwd, self.trusted_certs) # better error coding def remote_splitter(self, uri, desc, master): req = { 'description' : { 'ns2description': desc }, 'master': master, 'include_fedkit': bool(self.fedkit), 'include_gatewaykit': bool(self.gatewaykit) } r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, self.trusted_certs) if r.has_key('Ns2SplitResponseBody'): r = r['Ns2SplitResponseBody'] if r.has_key('output'): return r['output'].splitlines() else: raise service_error(service_error.protocol, "Bad splitter response (no output)") else: raise service_error(service_error.protocol, "Bad splitter response") class current_testbed: """ Object for collecting the current testbed description. The testbed description is saved to a file with the local testbed variables subsittuted line by line. """ def __init__(self, eid, tmpdir, fedkit, gatewaykit): def tar_list_to_string(tl): if tl is None: return None rv = "" for t in tl: rv += " %s PROJDIR/tarfiles/EID/%s" % \ (t[0], os.path.basename(t[1])) return rv self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)") self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)") self.current_testbed = None self.testbed_file = None self.def_expstart = \ "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate"; self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts"; self.def_gwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log"; self.def_mgwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; self.def_mgwcmd = '# ' self.def_mgwcmdparams = '' self.def_gwcmd = '# ' self.def_gwcmdparams = '' self.eid = eid self.tmpdir = tmpdir # Convert fedkit and gateway kit (which are lists of tuples) into a # substituition string. self.fedkit = tar_list_to_string(fedkit) self.gatewaykit = tar_list_to_string(gatewaykit) def __call__(self, line, master, allocated, tbparams): # Capture testbed topology descriptions if self.current_testbed == None: m = self.begin_testbed.match(line) if m != None: self.current_testbed = m.group(1) if self.current_testbed == None: raise service_error(service_error.req, "Bad request format (unnamed testbed)") allocated[self.current_testbed] = \ allocated.get(self.current_testbed,0) + 1 tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed) if not os.path.exists(tb_dir): try: os.mkdir(tb_dir) except IOError: raise service_error(service_error.internal, "Cannot create %s" % tb_dir) try: self.testbed_file = open("%s/%s.%s.tcl" % (tb_dir, self.eid, self.current_testbed), 'w') except IOError: self.testbed_file = None return True else: return False else: m = self.end_testbed.match(line) if m != None: if m.group(1) != self.current_testbed: raise service_error(service_error.internal, "Mismatched testbed markers!?") if self.testbed_file != None: self.testbed_file.close() self.testbed_file = None self.current_testbed = None elif self.testbed_file: # Substitute variables and put the line into the local # testbed file. gwtype = tbparams[self.current_testbed].get(\ 'connectortype', self.def_gwtype) gwimage = tbparams[self.current_testbed].get(\ 'connectorimage', self.def_gwimage) mgwstart = tbparams[self.current_testbed].get(\ 'masterconnectorstartcmd', self.def_mgwstart) mexpstart = tbparams[self.current_testbed].get(\ 'masternodestartcmd', self.def_mexpstart) gwstart = tbparams[self.current_testbed].get(\ 'slaveconnectorstartcmd', self.def_gwstart) expstart = tbparams[self.current_testbed].get(\ 'slavenodestartcmd', self.def_expstart) project = tbparams[self.current_testbed].get('project') gwcmd = tbparams[self.current_testbed].get(\ 'slaveconnectorcmd', self.def_gwcmd) gwcmdparams = tbparams[self.current_testbed].get(\ 'slaveconnectorcmdparams', self.def_gwcmdparams) mgwcmd = tbparams[self.current_testbed].get(\ 'masterconnectorcmd', self.def_gwcmd) mgwcmdparams = tbparams[self.current_testbed].get(\ 'masterconnectorcmdparams', self.def_gwcmdparams) line = re.sub("GWTYPE", gwtype, line) line = re.sub("GWIMAGE", gwimage, line) if self.current_testbed == master: line = re.sub("GWSTART", mgwstart, line) line = re.sub("EXPSTART", mexpstart, line) # NB GWCMDPARAMS is a prefix of GWCMD, so expand first line = re.sub("GWCMDPARAMS", mgwcmdparams, line) line = re.sub("(#\s*)?GWCMD", mgwcmd, line) else: line = re.sub("GWSTART", gwstart, line) line = re.sub("EXPSTART", expstart, line) # NB GWCMDPARAMS is a prefix of GWCMD, so expand first line = re.sub("GWCMDPARAMS", gwcmdparams, line) line = re.sub("(#\s*)?GWCMD", gwcmd, line) #These expansions contain EID and PROJDIR. NB these are # local fedkit and gatewaykit, which are strings. if self.fedkit: line = re.sub("FEDKIT", self.fedkit, line) if self.gatewaykit: line = re.sub("GATEWAYKIT", self.gatewaykit, line) line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line) line = re.sub("PROJDIR", "/proj/%s/" % project, line) line = re.sub("EID", self.eid, line) line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \ (project, self.eid), line) print >>self.testbed_file, line return True class allbeds: """ Process the Allbeds section. Get access to each federant and save the parameters in tbparams """ def __init__(self, get_access): self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds") self.end_allbeds = re.compile("^#\s+End\s+Allbeds") self.in_allbeds = False self.get_access = get_access def __call__(self, line, user, tbparams, master, export_project, access_user): # Testbed access parameters if not self.in_allbeds: if self.begin_allbeds.match(line): self.in_allbeds = True return True else: return False else: if self.end_allbeds.match(line): self.in_allbeds = False else: nodes = line.split('|') tb = nodes.pop(0) self.get_access(tb, nodes, user, tbparams, master, export_project, access_user) return True class gateways: def __init__(self, eid, master, tmpdir, gw_pubkey, gw_secretkey, copy_file, fedkit): self.begin_gateways = \ re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)") self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)") self.current_gateways = None self.control_gateway = None self.active_end = { } self.eid = eid self.master = master self.tmpdir = tmpdir self.gw_pubkey_base = gw_pubkey self.gw_secretkey_base = gw_secretkey self.copy_file = copy_file self.fedkit = fedkit def gateway_conf_file(self, gw, master, eid, pubkey, privkey, active_end, tbparams, dtb, myname, desthost, type): """ Produce a gateway configuration file from a gateways line. """ sproject = tbparams[gw].get('project', 'project') dproject = tbparams[dtb].get('project', 'project') sdomain = ".%s.%s%s" % (eid, sproject, tbparams[gw].get('domain', ".example.com")) ddomain = ".%s.%s%s" % (eid, dproject, tbparams[dtb].get('domain', ".example.com")) boss = tbparams[master].get('boss', "boss") fs = tbparams[master].get('fs', "fs") event_server = "%s%s" % \ (tbparams[gw].get('eventserver', "event_server"), tbparams[gw].get('domain', "example.com")) remote_event_server = "%s%s" % \ (tbparams[dtb].get('eventserver', "event_server"), tbparams[dtb].get('domain', "example.com")) seer_control = "%s%s" % \ (tbparams[gw].get('control', "control"), sdomain) tunnel_iface = tbparams[gw].get("tunnelinterface", None) if self.fedkit: remote_script_dir = "/usr/local/federation/bin" local_script_dir = "/usr/local/federation/bin" else: remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) tunnel_cfg = tbparams[gw].get("tunnelcfg", "false") conf_file = "%s%s.gw.conf" % (myname, sdomain) remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) # translate to lower case so the `hostname` hack for specifying # configuration files works. conf_file = conf_file.lower(); remote_conf_file = remote_conf_file.lower(); if dtb == master: active = "false" elif gw == master: active = "true" elif active_end.has_key('%s-%s' % (dtb, gw)): active = "false" else: active_end['%s-%s' % (gw, dtb)] = 1 active = "true" gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w") print >>gwconfig, "Active: %s" % active print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg if tunnel_iface: print >>gwconfig, "Interface: %s" % tunnel_iface print >>gwconfig, "BossName: %s" % boss print >>gwconfig, "FsName: %s" % fs print >>gwconfig, "EventServerName: %s" % event_server print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server print >>gwconfig, "SeerControl: %s" % seer_control print >>gwconfig, "Type: %s" % type print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \ local_script_dir print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid) print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid) print >>gwconfig, "RemoteConfigFile: %s/%s" % \ (remote_conf_dir, remote_conf_file) print >>gwconfig, "Peer: %s%s" % (desthost, ddomain) print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey) print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey) gwconfig.close() return active == "true" def __call__(self, line, allocated, tbparams): # Process gateways if not self.current_gateways: m = self.begin_gateways.match(line) if m: self.current_gateways = m.group(1) if allocated.has_key(self.current_gateways): # This test should always succeed tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways) if not os.path.exists(tb_dir): try: os.mkdir(tb_dir) except IOError: raise service_error(service_error.internal, "Cannot create %s" % tb_dir) else: # XXX self.log.error("[gateways]: Ignoring gateways for " + \ "unknown testbed %s" % self.current_gateways) self.current_gateways = None return True else: return False else: m = self.end_gateways.match(line) if m : if m.group(1) != self.current_gateways: raise service_error(service_error.internal, "Mismatched gateway markers!?") if self.control_gateway: try: cc = open("%s/%s/client.conf" % (self.tmpdir, self.current_gateways), 'w') print >>cc, "ControlGateway: %s" % \ self.control_gateway if tbparams[self.master].has_key('smbshare'): print >>cc, "SMBSHare: %s" % \ tbparams[self.master]['smbshare'] print >>cc, "ProjectUser: %s" % \ tbparams[self.master]['user'] print >>cc, "ProjectName: %s" % \ tbparams[self.master]['project'] print >>cc, "ExperimentID: %s/%s" % \ ( tbparams[self.master]['project'], \ self.eid ) cc.close() except IOError: raise service_error(service_error.internal, "Error creating client config") # XXX: This seer specific file should disappear try: cc = open("%s/%s/seer.conf" % (self.tmpdir, self.current_gateways), 'w') if self.current_gateways != self.master: print >>cc, "ControlNode: %s" % \ self.control_gateway print >>cc, "ExperimentID: %s/%s" % \ ( tbparams[self.master]['project'], \ self.eid ) cc.close() except IOError: raise service_error(service_error.internal, "Error creating seer config") else: debug.error("[gateways]: No control gateway for %s" %\ self.current_gateways) self.current_gateways = None else: dtb, myname, desthost, type = line.split(" ") if type == "control" or type == "both": self.control_gateway = "%s.%s.%s%s" % (myname, self.eid, tbparams[self.current_gateways]['project'], tbparams[self.current_gateways]['domain']) try: active = self.gateway_conf_file(self.current_gateways, self.master, self.eid, self.gw_pubkey_base, self.gw_secretkey_base, self.active_end, tbparams, dtb, myname, desthost, type) except IOError, e: raise service_error(service_error.internal, "Failed to write config file for %s" % \ self.current_gateway) gw_pubkey = "%s/keys/%s" % \ (self.tmpdir, self.gw_pubkey_base) gw_secretkey = "%s/keys/%s" % \ (self.tmpdir, self.gw_secretkey_base) pkfile = "%s/%s/%s" % \ ( self.tmpdir, self.current_gateways, self.gw_pubkey_base) skfile = "%s/%s/%s" % \ ( self.tmpdir, self.current_gateways, self.gw_secretkey_base) if not os.path.exists(pkfile): try: self.copy_file(gw_pubkey, pkfile) except IOError: service_error(service_error.internal, "Failed to copy pubkey file") if active and not os.path.exists(skfile): try: self.copy_file(gw_secretkey, skfile) except IOError: service_error(service_error.internal, "Failed to copy secretkey file") return True class shunt_to_file: """ Simple class to write data between two regexps to a file. """ def __init__(self, begin, end, filename): """ Begin shunting on a match of begin, stop on end, send data to filename. """ self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.file = None self.filename = filename def __call__(self, line): """ Call this on each line in the input that may be shunted. """ if not self.in_shunt: if self.begin.match(line): self.in_shunt = True try: self.file = open(self.filename, "w") except: self.file = None raise return True else: return False else: if self.end.match(line): if self.file: self.file.close() self.file = None self.in_shunt = False else: if self.file: print >>self.file, line return True class shunt_to_list: """ Same interface as shunt_to_file. Data collected in self.list, one list element per line. """ def __init__(self, begin, end): self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.list = [ ] def __call__(self, line): if not self.in_shunt: if self.begin.match(line): self.in_shunt = True return True else: return False else: if self.end.match(line): self.in_shunt = False else: self.list.append(line) return True class shunt_to_string: """ Same interface as shunt_to_file. Data collected in self.str, all in one string. """ def __init__(self, begin, end): self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.str = "" def __call__(self, line): if not self.in_shunt: if self.begin.match(line): self.in_shunt = True return True else: return False else: if self.end.match(line): self.in_shunt = False else: self.str += line return True def allocate_resources(self, allocated, master, eid, expid, expcert, tbparams, tmpdir, alloc_log=None): started = { } # Testbeds where a sub-experiment started # successfully # XXX fail_soft = False log = alloc_log or self.log thread_pool = self.thread_pool(self.nthreads) threads = [ ] for tb in [ k for k in allocated.keys() if k != master]: # Create and start a thread to start the segment, and save it to # get the return value later thread_pool.wait_for_slot() t = self.pooled_thread(\ target=self.start_segment(log=log, keyfile=self.ssh_privkey_file, debug=self.debug), args=(tb, eid, tbparams, tmpdir, 0), name=tb, pdata=thread_pool, trace_file=self.trace_file) threads.append(t) t.start() # Wait until all finish thread_pool.wait_for_all_done() # If none failed, start the master failed = [ t.getName() for t in threads if not t.rv ] if len(failed) == 0: starter = self.start_segment(log=log, keyfile=self.ssh_privkey_file, debug=self.debug) if not starter(master, eid, tbparams, tmpdir): failed.append(master) succeeded = [tb for tb in allocated.keys() if tb not in failed] # If one failed clean up, unless fail_soft is set if failed: if not fail_soft: thread_pool.clear() for tb in succeeded: # Create and start a thread to stop the segment thread_pool.wait_for_slot() t = self.pooled_thread(\ target=self.stop_segment(log=log, keyfile=self.ssh_privkey_file, debug=self.debug), args=(tb, eid, tbparams), name=tb, pdata=thread_pool, trace_file=self.trace_file) t.start() # Wait until all finish thread_pool.wait_for_all_done() # release the allocations for tb in tbparams.keys(): self.release_access(tb, tbparams[tb]['allocID']) # Remove the placeholder self.state_lock.acquire() self.state[eid]['experimentStatus'] = 'failed' if self.state_filename: self.write_state() self.state_lock.release() #raise service_error(service_error.federant, # "Swap in failed on %s" % ",".join(failed)) log.error("Swap in failed on %s" % ",".join(failed)) return else: log.info("[start_segment]: Experiment %s active" % eid) log.debug("[start_experiment]: removing %s" % tmpdir) # Walk up tmpdir, deleting as we go for path, dirs, files in os.walk(tmpdir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(tmpdir) # Insert the experiment into our state and update the disk copy self.state_lock.acquire() self.state[expid]['experimentStatus'] = 'active' self.state[eid] = self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() return def create_experiment(self, req, fid): """ The external interface to experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the avrious subsections using the lcasses above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ if not self.auth.check_attribute(fid, 'create'): raise service_error(service_error.access, "Create access denied") try: tmpdir = tempfile.mkdtemp(prefix="split-") except IOError: raise service_error(service_error.internal, "Cannot create tmp dir") gw_pubkey_base = "fed.%s.pub" % self.ssh_type gw_secretkey_base = "fed.%s" % self.ssh_type gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base tclfile = tmpdir + "/experiment.tcl" tbparams = { } try: access_user = self.accessdb[fid] except KeyError: raise service_error(service_error.internal, "Access map and authorizer out of sync in " + \ "create_experiment for fedid %s" % fid) pid = "dummy" gid = "dummy" try: os.mkdir(tmpdir+"/keys") except OSError: raise service_error(service_error.internal, "Can't make temporary dir") req = req.get('CreateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no CreateRequestBody)") # The tcl parser needs to read a file so put the content into that file descr=req.get('experimentdescription', None) if descr: file_content=descr.get('ns2description', None) if file_content: try: f = open(tclfile, 'w') f.write(file_content) f.close() except IOError: raise service_error(service_error.internal, "Cannot write temp experiment description") else: raise service_error(service_error.req, "Only ns2descriptions supported") else: raise service_error(service_error.req, "No experiment description") # Generate an ID for the experiment (slice) and a certificate that the # allocator can use to prove they own it. We'll ship it back through # the encrypted connection. (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) if req.has_key('experimentID') and \ req['experimentID'].has_key('localname'): overwrite = False eid = req['experimentID']['localname'] # If there's an old failed experiment here with the same local name # and accessible by this user, we'll overwrite it, otherwise we'll # fall through and do the collision avoidance. old_expid = self.get_experiment_fedid(eid) if old_expid and self.check_experiment_access(fid, old_expid): self.state_lock.acquire() status = self.state[eid].get('experimentStatus', None) if status and status == 'failed': # remove the old access attribute self.auth.unset_attribute(fid, old_expid) overwrite = True del self.state[eid] del self.state[old_expid] self.state_lock.release() self.state_lock.acquire() while (self.state.has_key(eid) and not overwrite): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() else: eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state_lock.acquire() while (self.state.has_key(eid)): eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() try: # This catches exceptions to clear the placeholder if necessary try: self.generate_ssh_keys(gw_secretkey, self.ssh_type) except ValueError: raise service_error(service_error.server_config, "Bad key type (%s)" % self.ssh_type) user = req.get('user', None) if user == None: raise service_error(service_error.req, "No user") master = req.get('master', None) if not master: raise service_error(service_error.req, "No master testbed label") export_project = req.get('exportProject', None) if not export_project: raise service_error(service_error.req, "No export project") if self.splitter_url: self.log.debug("Calling remote splitter at %s" % \ self.splitter_url) split_data = self.remote_splitter(self.splitter_url, file_content, master) else: tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', str(self.muxmax), '-m', master] if self.fedkit: tclcmd.append('-k') if self.gatewaykit: tclcmd.append('-K') tclcmd.extend([pid, gid, eid, tclfile]) self.log.debug("running local splitter %s", " ".join(tclcmd)) # This is just fantastic. As a side effect the parser copies # tb_compat.tcl into the current directory, so that directory # must be writable by the fedd user. Doing this in the # temporary subdir ensures this is the case. tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, cwd=tmpdir) split_data = tclparser.stdout allocated = { } # Testbeds we can access # Objects to parse the splitter output (defined above) parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit, self.gatewaykit) parse_allbeds = self.allbeds(self.get_access) parse_gateways = self.gateways(eid, master, tmpdir, gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit) parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", "^#\s+End\s+Vtopo") parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", "^#\s+End\s+hostnames", tmpdir + "/hosts") parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", "^#\s+End\s+tarfiles") parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", "^#\s+End\s+rpms") # Working on the split data for line in split_data: line = line.rstrip() if parse_current_testbed(line, master, allocated, tbparams): continue elif parse_allbeds(line, user, tbparams, master, export_project, access_user): continue elif parse_gateways(line, allocated, tbparams): continue elif parse_vtopo(line): continue elif parse_hostnames(line): continue elif parse_tarfiles(line): continue elif parse_rpms(line): continue else: raise service_error(service_error.internal, "Bad tcl parse? %s" % line) # Virtual topology and visualization vtopo = self.gentopo(parse_vtopo.str) if not vtopo: raise service_error(service_error.internal, "Failed to generate virtual topology") vis = self.genviz(vtopo) if not vis: raise service_error(service_error.internal, "Failed to generate visualization") # save federant information for k in allocated.keys(): tbparams[k]['federant'] = {\ 'name': [ { 'localname' : eid} ],\ 'emulab': tbparams[k]['emulab'],\ 'allocID' : tbparams[k]['allocID'],\ 'master' : k == master,\ } self.state_lock.acquire() self.state[eid]['vtopo'] = vtopo self.state[eid]['vis'] = vis self.state[expid]['federant'] = \ [ tbparams[tb]['federant'] for tb in tbparams.keys() \ if tbparams[tb].has_key('federant') ] if self.state_filename: self.write_state() self.state_lock.release() # Copy tarfiles and rpms needed at remote sites into a staging area try: if self.fedkit: for t in self.fedkit: parse_tarfiles.list.append(t[1]) if self.gatewaykit: for t in self.gatewaykit: parse_tarfiles.list.append(t[1]) for t in parse_tarfiles.list: if not os.path.exists("%s/tarfiles" % tmpdir): os.mkdir("%s/tarfiles" % tmpdir) self.copy_file(t, "%s/tarfiles/%s" % \ (tmpdir, os.path.basename(t))) for r in parse_rpms.list: if not os.path.exists("%s/rpms" % tmpdir): os.mkdir("%s/rpms" % tmpdir) self.copy_file(r, "%s/rpms/%s" % \ (tmpdir, os.path.basename(r))) # A null experiment file in case we need to create a remote # experiment from scratch f = open("%s/null.tcl" % tmpdir, "w") print >>f, """ set ns [new Simulator] source tb_compat.tcl set a [$ns node] $ns rtproto Session $ns run """ f.close() except IOError, e: raise service_error(service_error.internal, "Cannot stage tarfile/rpm: %s" % e.strerror) except service_error, e: # If something goes wrong in the parse (usually an access error) # clear the placeholder state. From here on out the code delays # exceptions. Failing at this point returns a fault to the remote # caller. self.state_lock.acquire() del self.state[eid] del self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() raise e # Start the background swapper and return the starting state. From # here on out, the state will stick around a while. # Let users touch the state self.auth.set_attribute(fid, expid) self.auth.set_attribute(expid, expid) # Override fedids can manipulate state as well for o in self.overrides: self.auth.set_attribute(o, expid) # Create a logger that logs to the experiment's state object as well as # to the main log file. alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) # Start a thread to do the resource allocation t = Thread(target=self.allocate_resources, args=(allocated, master, eid, expid, expcert, tbparams, tmpdir, alloc_log), name=eid) t.start() rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert } } return rv def new_create_experiment(self, req, fid): """ The external interface to experiment creation called from the dispatcher. Creates a working directory, splits the incoming description using the splitter script and parses out the avrious subsections using the lcasses above. Once each sub-experiment is created, use pooled threads to instantiate them and start it all up. """ if not self.auth.check_attribute(fid, 'create'): raise service_error(service_error.access, "Create access denied") try: tmpdir = tempfile.mkdtemp(prefix="split-") except IOError: raise service_error(service_error.internal, "Cannot create tmp dir") gw_pubkey_base = "fed.%s.pub" % self.ssh_type gw_secretkey_base = "fed.%s" % self.ssh_type gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base tclfile = tmpdir + "/experiment.tcl" tbparams = { } try: access_user = self.accessdb[fid] except KeyError: raise service_error(service_error.internal, "Access map and authorizer out of sync in " + \ "create_experiment for fedid %s" % fid) pid = "dummy" gid = "dummy" try: os.mkdir(tmpdir+"/keys") except OSError: raise service_error(service_error.internal, "Can't make temporary dir") req = req.get('CreateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no CreateRequestBody)") # The tcl parser needs to read a file so put the content into that file descr=req.get('experimentdescription', None) if descr: file_content=descr.get('ns2description', None) if file_content: try: f = open(tclfile, 'w') f.write(file_content) f.close() except IOError: raise service_error(service_error.internal, "Cannot write temp experiment description") else: raise service_error(service_error.req, "Only ns2descriptions supported") else: raise service_error(service_error.req, "No experiment description") # Generate an ID for the experiment (slice) and a certificate that the # allocator can use to prove they own it. We'll ship it back through # the encrypted connection. (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) if req.has_key('experimentID') and \ req['experimentID'].has_key('localname'): overwrite = False eid = req['experimentID']['localname'] # If there's an old failed experiment here with the same local name # and accessible by this user, we'll overwrite it, otherwise we'll # fall through and do the collision avoidance. old_expid = self.get_experiment_fedid(eid) if old_expid and self.check_experiment_access(fid, old_expid): self.state_lock.acquire() status = self.state[eid].get('experimentStatus', None) if status and status == 'failed': # remove the old access attribute self.auth.unset_attribute(fid, old_expid) overwrite = True del self.state[eid] del self.state[old_expid] self.state_lock.release() self.state_lock.acquire() while (self.state.has_key(eid) and not overwrite): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() else: eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state_lock.acquire() while (self.state.has_key(eid)): eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) # Initial state self.state[eid] = { 'experimentID' : \ [ { 'localname' : eid }, {'fedid': expid } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert }, 'owner': fid, 'log' : [], } self.state[expid] = self.state[eid] if self.state_filename: self.write_state() self.state_lock.release() try: # This catches exceptions to clear the placeholder if necessary try: self.generate_ssh_keys(gw_secretkey, self.ssh_type) except ValueError: raise service_error(service_error.server_config, "Bad key type (%s)" % self.ssh_type) user = req.get('user', None) if user == None: raise service_error(service_error.req, "No user") master = req.get('master', None) if not master: raise service_error(service_error.req, "No master testbed label") export_project = req.get('exportProject', None) if not export_project: raise service_error(service_error.req, "No export project") if self.splitter_url: self.log.debug("Calling remote splitter at %s" % \ self.splitter_url) split_data = self.remote_splitter(self.splitter_url, file_content, master) else: tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', str(self.muxmax), '-m', master] if self.fedkit: tclcmd.append('-k') if self.gatewaykit: tclcmd.append('-K') tclcmd.extend([pid, gid, eid, tclfile]) self.log.debug("running local splitter %s", " ".join(tclcmd)) # This is just fantastic. As a side effect the parser copies # tb_compat.tcl into the current directory, so that directory # must be writable by the fedd user. Doing this in the # temporary subdir ensures this is the case. tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, cwd=tmpdir) split_data = tclparser.stdout allocated = { } # Testbeds we can access # XXX here's where we're working def out_topo(filename, t): try: f = open("/tmp/%s" % filename, "w") print >> f, "%s" % \ topdl.topology_to_xml(t, top="experiment") f.close() except IOError, e: raise service_error(service_error.internal, "Can't open file") try: top = topdl.topology_from_xml(file=split_data, top="experiment") subs = sorted(top.substrates, cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), reverse=True) ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) for s in subs: a = ips.allocate(len(s.interfaces)+2) if a : base, num = a if num < len(s.interfaces) +2 : raise service_error(service_error.internal, "Allocator returned wrong number of IPs??") else: raise service_error(service_error.req, "Cannot allocate IP addresses") base += 1 for i in s.interfaces: i.attribute.append( topdl.Attribute('ip4_address', "%s" % ip_addr(base))) base += 1 testbeds = set([ a.value for e in top.elements \ for a in e.attribute \ if a.attribute == 'testbed'] ) topo ={ } for tb in testbeds: self.get_access(tb, None, user, tbparams, master, export_project, access_user) topo[tb] = top.clone() to_delete = [ ] for e in topo[tb].elements: etb = e.get_attribute('testbed') if etb and etb != tb: for i in e.interface: for s in i.subs: try: s.interfaces.remove(i) except ValueError: raise service_error(service_error.internal, "Can't remove interface??") to_delete.append(e) for e in to_delete: topo[tb].elements.remove(e) topo[tb].make_indices() for s in top.substrates: tests = { } for i in s.interfaces: e = i.element tb = e.get_attribute('testbed') if tb and not tests.has_key(tb): for i in e.interface: if s in i.subs: tests[tb]= \ i.get_attribute('ip4_address') if len(tests) < 2: continue # More than one testbed is on this substrate. Insert # some gateways into the subtopologies. for st in tests.keys(): for dt in [ t for t in tests.keys() if t != st]: myname = "%stunnel" % dt desthost = "%stunnel" % st sproject = tbparams[st].get('project', 'project') dproject = tbparams[dt].get('project', 'project') sdomain = ".%s.%s%s" % (eid, sproject, tbparams[st].get('domain', ".example.com")) ddomain = ".%s.%s%s" % (eid, dproject, tbparams[dt].get('domain', ".example.com")) boss = tbparams[master].get('boss', "boss") fs = tbparams[master].get('fs', "fs") event_server = "%s%s" % \ (tbparams[st].get('eventserver', "event_server"), tbparams[dt].get('domain', "example.com")) remote_event_server = "%s%s" % \ (tbparams[dt].get('eventserver', "event_server"), tbparams[dt].get('domain', "example.com")) seer_control = "%s%s" % \ (tbparams[st].get('control', "control"), sdomain) local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) conf_file = "%s%s.gw.conf" % (myname, sdomain) remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) # translate to lower case so the `hostname` hack for specifying # configuration files works. conf_file = conf_file.lower(); remote_conf_file = remote_conf_file.lower(); active = ("%s" % (st == master)) portal = topdl.Computer(**{ 'name': "%stunnel" % dt, 'attribute' : [{ 'attribute': n, 'value': v, } for n, v in (\ ('gateway', 'true'), ('boss', boss), ('fs', fs), ('event_server', event_server), ('remote_event_server', remote_event_server), ('seer_control', seer_control), ('local_key_dir', local_key_dir), ('remote_conf_dir', remote_conf_dir), ('conf_file', conf_file), ('remote_conf_file', remote_conf_file), ('remote_script_dir', "/usr/local/federation/bin"), ('local_script_dir', "/usr/local/federation/bin"), )], 'interface': [{ 'substrate': s.name, 'attribute': [ { 'attribute': 'ip4_addreess', 'value': tests[dt], }, ], }, ], }) topo[st].elements.append(portal) # Connect the gateway nodes into the topologies and clear out # substrates that are not in the topologies for tb in testbeds: topo[tb].incorporate_elements() topo[tb].substrates = \ [s for s in topo[tb].substrates \ if len(s.interfaces) >0] softdir ="%s/software" % tmpdir softmap = { } os.mkdir(softdir) pkgs = set([fedkit, gatewaykit]) pkgs.update([x.location for e in top.elements \ for x in e.software]) for pkg in pkgs: loc = pkg scheme, host, path = urlparse(loc)[0:3] dest = os.path.basename(path) if not scheme: if not loc.startswith('/'): loc = "/%s" % loc loc = "file://%s" %loc try: u = urlopen(loc) except Exception, e: raise service_error(service_error.req, "Cannot open %s: %s" % (loc, e)) try: f = open("%s/%s" % (softdir, dest) , "w") data = u.read(4096) while data: f.write(data) data = u.read(4096) f.close() u.close() except Exception, e: raise service_error(service_error.internal, "Could not copy %s: %s" % (loc, e)) path = re.sub("/tmp", "", softdir) # XXX softmap[pkg] = \ "https://users.isi.deterlab.net:23232/%s/%s" %\ ( path, dest) # Convert the software locations in the segments into the local # copies on this host for soft in [ s for tb in topo.values() \ for e in tb.elements \ for s in e.software ]: if softmap.has_key(soft.location): soft.location = softmap[soft.location] for tb in testbeds: out_topo("%s.xml" %tb, topo[tb]) vtopo = topdl.topology_to_vtopo(top) vis = self.genviz(vtopo) except Exception, e: traceback.print_exc() raise service_error(service_error.internal, "%s" % e) # Build the testbed topologies: if True: raise service_error(service_error.internal, "Developing") # XXX old code # Objects to parse the splitter output (defined above) parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit, self.gatewaykit) parse_allbeds = self.allbeds(self.get_access) parse_gateways = self.gateways(eid, master, tmpdir, gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit) parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", "^#\s+End\s+Vtopo") parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", "^#\s+End\s+hostnames", tmpdir + "/hosts") parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", "^#\s+End\s+tarfiles") parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", "^#\s+End\s+rpms") # Working on the split data for line in split_data: line = line.rstrip() if parse_current_testbed(line, master, allocated, tbparams): continue elif parse_allbeds(line, user, tbparams, master, export_project, access_user): continue elif parse_gateways(line, allocated, tbparams): continue elif parse_vtopo(line): continue elif parse_hostnames(line): continue elif parse_tarfiles(line): continue elif parse_rpms(line): continue else: raise service_error(service_error.internal, "Bad tcl parse? %s" % line) # Virtual topology and visualization vtopo = self.gentopo(parse_vtopo.str) if not vtopo: raise service_error(service_error.internal, "Failed to generate virtual topology") vis = self.genviz(vtopo) if not vis: raise service_error(service_error.internal, "Failed to generate visualization") # save federant information for k in allocated.keys(): tbparams[k]['federant'] = {\ 'name': [ { 'localname' : eid} ],\ 'emulab': tbparams[k]['emulab'],\ 'allocID' : tbparams[k]['allocID'],\ 'master' : k == master,\ } self.state_lock.acquire() self.state[eid]['vtopo'] = vtopo self.state[eid]['vis'] = vis self.state[expid]['federant'] = \ [ tbparams[tb]['federant'] for tb in tbparams.keys() \ if tbparams[tb].has_key('federant') ] if self.state_filename: self.write_state() self.state_lock.release() # Copy tarfiles and rpms needed at remote sites into a staging area try: if self.fedkit: for t in self.fedkit: parse_tarfiles.list.append(t[1]) if self.gatewaykit: for t in self.gatewaykit: parse_tarfiles.list.append(t[1]) for t in parse_tarfiles.list: if not os.path.exists("%s/tarfiles" % tmpdir): os.mkdir("%s/tarfiles" % tmpdir) self.copy_file(t, "%s/tarfiles/%s" % \ (tmpdir, os.path.basename(t))) for r in parse_rpms.list: if not os.path.exists("%s/rpms" % tmpdir): os.mkdir("%s/rpms" % tmpdir) self.copy_file(r, "%s/rpms/%s" % \ (tmpdir, os.path.basename(r))) # A null experiment file in case we need to create a remote # experiment from scratch f = open("%s/null.tcl" % tmpdir, "w") print >>f, """ set ns [new Simulator] source tb_compat.tcl set a [$ns node] $ns rtproto Session $ns run """ f.close() except IOError, e: raise service_error(service_error.internal, "Cannot stage tarfile/rpm: %s" % e.strerror) except service_error, e: # If something goes wrong in the parse (usually an access error) # clear the placeholder state. From here on out the code delays # exceptions. Failing at this point returns a fault to the remote # caller. self.state_lock.acquire() del self.state[eid] del self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() raise e # Start the background swapper and return the starting state. From # here on out, the state will stick around a while. # Let users touch the state self.auth.set_attribute(fid, expid) self.auth.set_attribute(expid, expid) # Override fedids can manipulate state as well for o in self.overrides: self.auth.set_attribute(o, expid) # Create a logger that logs to the experiment's state object as well as # to the main log file. alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) alloc_log.addHandler(h) # Start a thread to do the resource allocation t = Thread(target=self.allocate_resources, args=(allocated, master, eid, expid, expcert, tbparams, tmpdir, alloc_log), name=eid) t.start() rv = { 'experimentID': [ {'localname' : eid }, { 'fedid': copy.copy(expid) } ], 'experimentStatus': 'starting', 'experimentAccess': { 'X509' : expcert } } return rv def get_experiment_fedid(self, key): """ find the fedid associated with the localname key in the state database. """ rv = None self.state_lock.acquire() if self.state.has_key(key): if isinstance(self.state[key], dict): try: kl = [ f['fedid'] for f in \ self.state[key]['experimentID']\ if f.has_key('fedid') ] except KeyError: self.state_lock.release() raise service_error(service_error.internal, "No fedid for experiment %s when getting "+\ "fedid(!?)" % key) if len(kl) == 1: rv = kl[0] else: self.state_lock.release() raise service_error(service_error.internal, "multiple fedids for experiment %s when " +\ "getting fedid(!?)" % key) else: self.state_lock.release() raise service_error(service_error.internal, "Unexpected state for %s" % key) self.state_lock.release() return rv def check_experiment_access(self, fid, key): """ Confirm that the fid has access to the experiment. Though a request may be made in terms of a local name, the access attribute is always the experiment's fedid. """ if not isinstance(key, fedid): key = self.get_experiment_fedid(key) if self.auth.check_attribute(fid, key): return True else: raise service_error(service_error.access, "Access Denied") def get_handler(self, path, fid): print "in get_handler %s %s" % (path, fid) return ("/users/faber/test.html", "text/html") def get_vtopo(self, req, fid): """ Return the stored virtual topology for this experiment """ rv = None state = None req = req.get('VtopoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VtopoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) self.state_lock.acquire() if self.state.has_key(key): if self.state[key].has_key('vtopo'): rv = { 'experiment' : {keytype: key },\ 'vtopo': self.state[key]['vtopo'],\ } else: state = self.state[key]['experimentStatus'] self.state_lock.release() if rv: return rv else: if state: raise service_error(service_error.partial, "Not ready: %s" % state) else: raise service_error(service_error.req, "No such experiment") def get_vis(self, req, fid): """ Return the stored visualization for this experiment """ rv = None state = None req = req.get('VisRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VisRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) self.state_lock.acquire() if self.state.has_key(key): if self.state[key].has_key('vis'): rv = { 'experiment' : {keytype: key },\ 'vis': self.state[key]['vis'],\ } else: state = self.state[key]['experimentStatus'] self.state_lock.release() if rv: return rv else: if state: raise service_error(service_error.partial, "Not ready: %s" % state) else: raise service_error(service_error.req, "No such experiment") def clean_info_response(self, rv): """ Remove the information in the experiment's state object that is not in the info response. """ # Remove the owner info (should always be there, but...) if rv.has_key('owner'): del rv['owner'] # Convert the log into the allocationLog parameter and remove the # log entry (with defensive programming) if rv.has_key('log'): rv['allocationLog'] = "".join(rv['log']) del rv['log'] else: rv['allocationLog'] = "" if rv['experimentStatus'] != 'active': if rv.has_key('federant'): del rv['federant'] else: # remove the allocationID info from each federant for f in rv.get('federant', []): if f.has_key('allocID'): del f['allocID'] return rv def get_info(self, req, fid): """ Return all the stored info about this experiment """ rv = None req = req.get('InfoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no InfoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) # The state may be massaged by the service function that called # get_info (e.g., encoded for XMLRPC transport) so send a copy of the # state. self.state_lock.acquire() if self.state.has_key(key): rv = copy.deepcopy(self.state[key]) self.state_lock.release() if rv: return self.clean_info_response(rv) else: raise service_error(service_error.req, "No such experiment") def get_multi_info(self, req, fid): """ Return all the stored info that this fedid can access """ rv = { 'info': [ ] } self.state_lock.acquire() for key in [ k for k in self.state.keys() if isinstance(k, fedid)]: self.check_experiment_access(fid, key) if self.state.has_key(key): e = copy.deepcopy(self.state[key]) e = self.clean_info_response(e) rv['info'].append(e) self.state_lock.release() return rv def terminate_experiment(self, req, fid): """ Swap this experiment out on the federants and delete the shared information """ tbparams = { } req = req.get('TerminateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no TerminateRequestBody)") force = req.get('force', False) exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = exp['fedid'] keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.check_experiment_access(fid, key) dealloc_list = [ ] # Create a logger that logs to the dealloc_list as well as to the main # log file. dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) h = logging.StreamHandler(self.list_log(dealloc_list)) # XXX: there should be a global one of these rather than repeating the # code. h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S')) dealloc_log.addHandler(h) self.state_lock.acquire() fed_exp = self.state.get(key, None) if fed_exp: # This branch of the conditional holds the lock to generate a # consistent temporary tbparams variable to deallocate experiments. # It releases the lock to do the deallocations and reacquires it to # remove the experiment state when the termination is complete. # First make sure that the experiment creation is complete. status = fed_exp.get('experimentStatus', None) if status: if status in ('starting', 'terminating'): if not force: self.state_lock.release() raise service_error(service_error.partial, 'Experiment still being created or destroyed') else: self.log.warning('Experiment in %s state ' % status + \ 'being terminated by force.') else: # No status??? trouble self.state_lock.release() raise service_error(service_error.internal, "Experiment has no status!?") ids = [] # experimentID is a list of dicts that are self-describing # identifiers. This finds all the fedids and localnames - the # keys of self.state - and puts them into ids. for id in fed_exp.get('experimentID', []): if id.has_key('fedid'): ids.append(id['fedid']) if id.has_key('localname'): ids.append(id['localname']) # Construct enough of the tbparams to make the stop_segment calls # work for fed in fed_exp.get('federant', []): try: for e in fed['name']: eid = e.get('localname', None) if eid: break else: continue p = fed['emulab']['project'] project = p['name']['localname'] tb = p['testbed']['localname'] user = p['user'][0]['userID']['localname'] domain = fed['emulab']['domain'] host = fed['emulab']['ops'] aid = fed['allocID'] except KeyError, e: continue tbparams[tb] = {\ 'user': user,\ 'domain': domain,\ 'project': project,\ 'host': host,\ 'eid': eid,\ 'aid': aid,\ } fed_exp['experimentStatus'] = 'terminating' if self.state_filename: self.write_state() self.state_lock.release() # Stop everyone. NB, wait_for_all waits until a thread starts and # then completes, so we can't wait if nothing starts. So, no # tbparams, no start. if len(tbparams) > 0: thread_pool = self.thread_pool(self.nthreads) for tb in tbparams.keys(): # Create and start a thread to stop the segment thread_pool.wait_for_slot() t = self.pooled_thread(\ target=self.stop_segment(log=dealloc_log, keyfile=self.ssh_privkey_file, debug=self.debug), args=(tb, tbparams[tb]['eid'], tbparams), name=tb, pdata=thread_pool, trace_file=self.trace_file) t.start() # Wait for completions thread_pool.wait_for_all_done() # release the allocations (failed experiments have done this # already, and starting experiments may be in odd states, so we # ignore errors releasing those allocations try: for tb in tbparams.keys(): self.release_access(tb, tbparams[tb]['aid']) except service_error, e: if status != 'failed' and not force: raise e # Remove the terminated experiment self.state_lock.acquire() for id in ids: if self.state.has_key(id): del self.state[id] if self.state_filename: self.write_state() self.state_lock.release() return { 'experiment': exp , 'deallocationLog': "".join(dealloc_list), } else: # Don't forget to release the lock self.state_lock.release() raise service_error(service_error.req, "No saved state")