#!/usr/local/bin/python import os,sys from ZSI import * from M2Crypto import SSL from M2Crypto.SSL.SSLServer import SSLServer import M2Crypto.httpslib import xml.parsers.expat import re import random import string import subprocess import tempfile import copy import pickle import traceback from threading import * from subprocess import * from fedd_services import * from fedd_internal_services import * from fedd_util import * import parse_detail from service_error import * import logging class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.experiment_control") fl.addHandler(nullHandler()) class fedd_experiment_control_local: scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl", "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl", "fed_evrepeater", "rc.accounts.patch"] class thread_pool: def __init__(self): self.changed = Condition() self.started = 0 self.terminated = 0 def acquire(self): self.changed.acquire() def release(self): self.changed.release() def wait(self, timeout = None): self.changed.wait(timeout) def start(self): self.changed.acquire() self.started += 1 self.changed.notifyAll() self.changed.release() def terminate(self): self.changed.acquire() self.terminated += 1 self.changed.notifyAll() self.changed.release() def clear(self): self.changed.acquire() self.started = 0 self.terminated =0 self.changed.notifyAll() self.changed.release() class pooled_thread(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, pdata=None, trace_file=None): Thread.__init__(self, group, target, name, args, kwargs) self.rv = None self.exception = None self.target=target self.args = args self.kwargs = kwargs self.pdata = pdata self.trace_file = trace_file def run(self): if self.pdata: self.pdata.start() if self.target: try: self.rv = self.target(*self.args, **self.kwargs) except service_error, s: self.exception = s if self.trace_file: logging.error("Thread exception: %s %s" % \ (s.code_string(), s.desc)) except: self.exception = sys.exc_info()[1] if self.trace_file: logging.error(("Unexpected thread exception: %s" +\ "Trace %s") % (self.exception,\ traceback.format_exc())) if self.pdata: self.pdata.terminate() def __init__(self, config=None): self.scripts = fedd_experiment_control_local.scripts self.thread_with_rv = fedd_experiment_control_local.pooled_thread self.thread_pool = fedd_experiment_control_local.thread_pool self.cert_file = None self.cert_pwd = None self.trusted_certs = None # Walk through the various relevant certificat specifying config # attributes until the local certificate attributes can be resolved. # The walk is from omst specific to most general specification. for p in ("create_experiment_", "proxy_", ""): filen = "%scert_file" % p pwn = "%scert_pwd" % p trustn = "%strusted_certs" % p if getattr(config, filen, None): if not self.cert_file: self.cert_file = getattr(config, filen, None) self.cert_pwd = getattr(config, pwn, None) if getattr(config, trustn, None): if not self.trusted_certs: self.trusted_certs = getattr(config, trustn, None) self.exp_stem = "fed-stem" self.debug = config.create_debug self.log = logging.getLogger("fedd.experiment_control") self.muxmax = 2 self.nthreads = 2 self.randomize_experiments = False self.scp_exec = "/usr/bin/scp" self.scripts_dir = "/users/faber/testbed/federation" self.splitter = None self.ssh_exec="/usr/bin/ssh" self.ssh_keygen = "/usr/bin/ssh-keygen" self.ssh_identity_file = None # XXX self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub" self.ssh_type = "rsa" self.state = { } self.state_filename = config.experiment_state_file self.state_lock = Lock() self.tclsh = "/usr/local/bin/otclsh" self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl" self.tbmap = { 'deter':'https://users.isi.deterlab.net:23235', 'emulab':'https://users.isi.deterlab.net:23236', 'ucb':'https://users.isi.deterlab.net:23237', } self.trace_file = sys.stderr self.def_expstart = \ "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate"; self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts"; self.def_gwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log"; self.def_mgwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; if self.ssh_pubkey_file: try: f = open(self.ssh_pubkey_file, 'r') self.ssh_pubkey = f.read() f.close() except IOError: raise service_error(service_error.internal, "Cannot read sshpubkey") # Set the logging level to the value passed in. The getattr slieght of # hand finds the logging level constant corrersponding to the string. # We're a little paranoid to avoid user mayhem. if config.experiment_log: try: level = int(getattr(logging, config.experiment_log.upper(),-1)) if logging.DEBUG <= level <= logging.CRITICAL: self.log.setLevel(level) else: self.log.error("Bad experiment_log value: %s" % \ config.experiment_log) except ValueError: self.log.error("Bad experiment_log value: %s" % \ config.experiment_log) # Grab saved state if self.state_filename: self.read_state() # Confirm federation scripts in the right place for s in self.scripts: if not os.path.exists(self.scripts_dir + "/" + s): raise service_error(service_error.server_config, "%s/%s not in local script dir" % (self.scripts_dir, s)) self.soap_handlers = {\ 'Create': make_soap_handler(\ CreateRequestMessage.typecode, getattr(self, "create_experiment"), CreateResponseMessage, "CreateResponseBody"), 'Vtopo': make_soap_handler(\ VtopoRequestMessage.typecode, getattr(self, "get_vtopo"), VtopoResponseMessage, "VtopoResponseBody"), 'Vis': make_soap_handler(\ VisRequestMessage.typecode, getattr(self, "get_vis"), VisResponseMessage, "VisResponseBody"), 'Info': make_soap_handler(\ InfoRequestMessage.typecode, getattr(self, "get_info"), InfoResponseMessage, "InfoResponseBody"), 'Terminate': make_soap_handler(\ TerminateRequestMessage.typecode, getattr(self, "terminate_experiment"), TerminateResponseMessage, "TerminateResponseBody"), } self.xmlrpc_handlers = {\ 'Create': make_xmlrpc_handler(\ getattr(self, "create_experiment"), "CreateResponseBody"), 'Vtopo': make_xmlrpc_handler(\ getattr(self, "get_vtopo"), "VtopoResponseBody"), 'Vis': make_xmlrpc_handler(\ getattr(self, "get_vis"), "VisResponseBody"), 'Info': make_xmlrpc_handler(\ getattr(self, "get_info"), "InfoResponseBody"), 'Terminate': make_xmlrpc_handler(\ getattr(self, "terminate_experiment"), "TerminateResponseBody"), } def get_soap_services(self): return self.soap_handlers def get_xmlrpc_services(self): return self.xmlrpc_handlers def copy_file(self, src, dest, size=1024): """ Exceedingly simple file copy. """ s = open(src,'r') d = open(dest, 'w') buf = "x" while buf != "": buf = s.read(size) d.write(buf) s.close() d.close() # Call while holding self.state_lock def write_state(self): if os.access(self.state_filename, os.W_OK): self.copy_file(self.state_filename, \ "%s.bak" % self.state_filename) try: f = open(self.state_filename, 'w') pickle.dump(self.state, f) except IOError, e: self.log.error("Can't write file %s: %s" % \ (self.state_filename, e)) except pickle.PicklingError, e: self.log.error("Pickling problem: %s" % e) # Call while holding self.state_lock def read_state(self): try: f = open(self.state_filename, "r") self.state = pickle.load(f) except IOError, e: self.log.warning("No saved state: Can't open %s: %s" % \ (self.state_filename, e)) except pickle.UnpicklingError, e: self.log.warning("No saved state: Unpickling failed: %s" % e) def scp_file(self, file, user, host, dest=""): """ scp a file to the remote host. """ scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)] rv = 0 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) if not self.debug: rv = call(scp_cmd, stdout=trace, stderr=trace) return rv == 0 def ssh_cmd(self, user, host, cmd, wname=None): sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd) self.log.debug("[ssh_cmd]: %s" % sh_str) if not self.debug: sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace) return sub.wait() == 0 else: return True def ship_scripts(self, host, user, dest_dir): if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir): for s in self.scripts: if not self.scp_file("%s/%s" % (self.scripts_dir, s), user, host, dest_dir): return False return True else: return False def ship_configs(self, host, user, src_dir, dest_dir): if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir): return False if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir): return False for f in os.listdir(src_dir): if os.path.isdir(f): if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), "%s/%s" % (dest_dir, f)): return False else: if not self.scp_file("%s/%s" % (src_dir, f), user, host, dest_dir): return False return True def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) user = tbparams[tb]['user'] pid = tbparams[tb]['project'] # XXX base_confs = ( "hosts",) tclfile = "%s.%s.tcl" % (eid, tb) expinfo_exec = "/usr/testbed/bin/expinfo" proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) state_re = re.compile("State:\s+(\w+)") no_exp_re = re.compile("^No\s+such\s+experiment") state = None cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid] self.log.debug("[start_segment]: %s"% " ".join(cmd)) dev_null = None try: dev_null = open("/dev/null", "a") except IOError, e: self.log.error("[start_segment]: can't open /dev/null: %s" %e) status = Popen(cmd, stdout=PIPE, stderr=dev_null) for line in status.stdout: m = state_re.match(line) if m: state = m.group(1) else: m = no_exp_re.match(line) if m: state = "none" rv = status.wait() # If the experiment is not present the subcommand returns a non-zero # return value. If we successfully parsed a "none" outcome, ignore the # return code. if rv != 0 and state != "none": raise service_error(service_error.internal, "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) self.log.debug("[start_segment]: %s: %s" % (tb, state)) self.log.info("[start_segment]:transferring experiment to %s" % tb) if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): return False # Clear the federation files if not self.ssh_cmd(user, host, "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir): return False if not self.ssh_cmd(user, host, "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir): return False # Clear and create the tarfiles and rpm directories for d in (tarfiles_dir, rpms_dir): if not self.ssh_cmd(user, host, "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d): return False if not self.ssh_cmd(user, host, "mkdir -p %s" % d, "create tarfiles"): return False if state == 'active': # Remote experiment is active. Modify it. for f in base_confs: if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if not self.ship_scripts(host, user, proj_dir): return False if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), proj_dir): return False if os.path.isdir("%s/tarfiles" % tmpdir): if not self.ship_configs(host, user, "%s/tarfiles" % tmpdir, tarfiles_dir): return False if os.path.isdir("%s/rpms" % tmpdir): if not self.ship_configs(host, user, "%s/rpms" % tmpdir, tarfiles_dir): return False self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) if not self.ssh_cmd(user, host, "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ (pid, eid, tclfile), "modexp"): return False return True elif state == "swapped": # Remote experiment swapped out. Modify it and swap it in. for f in base_confs: if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if not self.ship_scripts(host, user, proj_dir): return False if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), proj_dir): return False if os.path.isdir("%s/tarfiles" % tmpdir): if not self.ship_configs(host, user, "%s/tarfiles" % tmpdir, tarfiles_dir): return False if os.path.isdir("%s/rpms" % tmpdir): if not self.ship_configs(host, user, "%s/rpms" % tmpdir, tarfiles_dir): return False self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) if not self.ssh_cmd(user, host, "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile), "modexp"): return False self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) if not self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp"): return False return True elif state == "none": # No remote experiment. Create one. We do this in 2 steps so we # can put the configuration files and scripts into the new # experiment directories. # Tarfiles must be present for creation to work if os.path.isdir("%s/tarfiles" % tmpdir): if not self.ship_configs(host, user, "%s/tarfiles" % tmpdir, tarfiles_dir): return False if os.path.isdir("%s/rpms" % tmpdir): if not self.ship_configs(host, user, "%s/rpms" % tmpdir, tarfiles_dir): return False self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) if not self.ssh_cmd(user, host, "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \ (pid, eid, tclfile), "startexp"): return False # After startexp the per-experiment directories exist for f in base_confs: if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if not self.ship_scripts(host, user, proj_dir): return False if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), proj_dir): return False self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) if not self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp"): return False return True else: self.log.debug("[start_segment]:unknown state %s" % state) return False def stop_segment(self, tb, eid, tbparams): user = tbparams[tb]['user'] host = tbparams[tb]['host'] pid = tbparams[tb]['project'] self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) return self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) def generate_ssh_keys(self, dest, type="rsa" ): """ Generate a set of keys for the gateways to use to talk. Keys are of type type and are stored in the required dest file. """ valid_types = ("rsa", "dsa") t = type.lower(); if t not in valid_types: raise ValueError cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] try: trace = open("/dev/null", "w") except IOError: raise service_error(service_error.internal, "Cannot open /dev/null??"); # May raise CalledProcessError self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) rv = call(cmd, stdout=trace, stderr=trace) if rv != 0: raise service_error(service_error.internal, "Cannot generate nonce ssh keys. %s return code %d" \ % (self.ssh_keygen, rv)) def gentopo(self, str): class topo_parse: def __init__(self): self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') self.int_subelements = ( 'bandwidth',) self.float_subelements = ( 'delay',) self.nodes = [ ] self.lans = [ ] self.element = { } self.topo = { \ 'node': self.nodes,\ 'lan' : self.lans,\ } self.chars = "" def end_element(self, name): if name == 'node': self.nodes.append(self.element) self.element = { } elif name == 'lan': self.lans.append(self.element) self.element = { } elif name in self.str_subelements: self.element[name] = self.chars self.chars = "" elif name in self.int_subelements: self.element[name] = int(self.chars) self.chars = "" elif name in self.float_subelements: self.element[name] = float(self.chars) self.chars = "" def found_chars(self, data): self.chars += data.rstrip() tp = topo_parse(); parser = xml.parsers.expat.ParserCreate() parser.EndElementHandler = tp.end_element parser.CharacterDataHandler = tp.found_chars parser.Parse(str) return tp.topo def genviz(self, topo): """ Generate the visualization the virtual topology """ neato = "/usr/local/bin/neato" # These are used to parse neato output and to create the visualization # file. vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"') vis_fmt = "%s%s%s" + \ "%s" try: # Node names nodes = [ n['vname'] for n in topo['node'] ] topo_lans = topo['lan'] except KeyError: raise service_error(service_error.internal, "Bad topology") lans = { } links = { } # Walk through the virtual topology, organizing the connections into # 2-node connections (links) and more-than-2-node connections (lans). # When a lan is created, it's added to the list of nodes (there's a # node in the visualization for the lan). for l in topo_lans: if links.has_key(l['vname']): if len(links[l['vname']]) < 2: links[l['vname']].append(l['vnode']) else: nodes.append(l['vname']) lans[l['vname']] = links[l['vname']] del links[l['vname']] lans[l['vname']].append(l['vnode']) elif lans.has_key(l['vname']): lans[l['vname']].append(l['vnode']) else: links[l['vname']] = [ l['vnode'] ] # Open up a temporary file for dot to turn into a visualization try: df, dotname = tempfile.mkstemp() dotfile = os.fdopen(df, 'w') except IOError: raise service_error(service_error.internal, "Failed to open file in genviz") # Generate a dot/neato input file from the links, nodes and lans try: print >>dotfile, "graph G {" for n in nodes: print >>dotfile, '\t"%s"' % n for l in links.keys(): print >>dotfile, '\t"%s" -- "%s"' % tuple(links[l]) for l in lans.keys(): for n in lans[l]: print >>dotfile, '\t "%s" -- "%s"' % (n,l) print >>dotfile, "}" dotfile.close() except TypeError: raise service_error(service_error.internal, "Single endpoint link in vtopo") except IOError: raise service_error(service_error.internal, "Cannot write dot file") # Use dot to create a visualization dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE) # Translate dot to vis format vis_nodes = [ ] vis = { 'node': vis_nodes } for line in dot.stdout: m = vis_re.match(line) if m: vn = m.group(1) vis_node = {'name': vn, \ 'x': float(m.group(2)),\ 'y' : float(m.group(3)),\ } if vn in links.keys() or vn in lans.keys(): vis_node['type'] = 'lan' else: vis_node['type'] = 'node' vis_nodes.append(vis_node) rv = dot.wait() os.remove(dotname) if rv == 0 : return vis else: return None def get_access(self, tb, nodes, user, tbparam): """ Get access to testbed through fedd and set the parameters for that tb """ translate_attr = { 'slavenodestartcmd': 'expstart', 'slaveconnectorstartcmd': 'gwstart', 'masternodestartcmd': 'mexpstart', 'masterconnectorstartcmd': 'mgwstart', 'connectorimage': 'gwimage', 'connectortype': 'gwtype', 'tunnelcfg': 'tun', 'smbshare': 'smbshare', } # XXX multi-level access uri = self.tbmap.get(tb, None) if not uri: raise service_error(serice_error.server_config, "Unknown testbed: %s" % tb) # The basic request req = {\ 'destinationTestbed' : { 'uri' : uri }, 'user': user, 'allocID' : { 'localname': 'test' }, 'access' : [ { 'sshPubkey' : self.ssh_pubkey } ] } # node resources if any if nodes != None and len(nodes) > 0: rnodes = [ ] for n in nodes: rn = { } image, hw, count = n.split(":") if image: rn['image'] = [ image ] if hw: rn['hardware'] = [ hw ] if count: rn['count'] = int(count) rnodes.append(rn) req['resources']= { } req['resources']['node'] = rnodes # No retry loop here. Proxy servers must correctly authenticate # themselves without help try: ctx = fedd_ssl_context(self.cert_file, self.trusted_certs, password=self.cert_pwd) except SSL.SSLError: raise service_error(service_error.server_config, "Server certificates misconfigured") loc = feddServiceLocator(); port = loc.getfeddPortType(uri, transport=M2Crypto.httpslib.HTTPSConnection, transdict={ 'ssl_context' : ctx }) # Reconstruct the full request message msg = RequestAccessRequestMessage() msg.set_element_RequestAccessRequestBody( pack_soap(msg, "RequestAccessRequestBody", req)) try: resp = port.RequestAccess(msg) except ZSI.ParseException, e: raise service_error(service_error.req, "Bad format message (XMLRPC??): %s" % str(e)) r = unpack_soap(resp) if r.has_key('RequestAccessResponseBody'): r = r['RequestAccessResponseBody'] else: raise service_error(service_error.proxy, "Bad proxy response") e = r['emulab'] p = e['project'] tbparam[tb] = { "boss": e['boss'], "host": e['ops'], "domain": e['domain'], "fs": e['fileServer'], "eventserver": e['eventServer'], "project": unpack_id(p['name']), "emulab" : e } # Make the testbed name be the label the user applied p['testbed'] = {'localname': tb } for u in p['user']: tbparam[tb]['user'] = unpack_id(u['userID']) for a in e['fedAttr']: if a['attribute']: key = translate_attr.get(a['attribute'].lower(), None) if key: tbparam[tb][key]= a['value'] class current_testbed: def __init__(self, eid, tmpdir): self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)") self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)") self.current_testbed = None self.testbed_file = None self.def_expstart = \ "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate"; self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts"; self.def_gwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log"; self.def_mgwstart = \ "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log"; self.def_gwimage = "FBSD61-TUNNEL2"; self.def_gwtype = "pc"; self.eid = eid self.tmpdir = tmpdir def __call__(self, line, master, allocated, tbparams): # Capture testbed topology descriptions if self.current_testbed == None: m = self.begin_testbed.match(line) if m != None: self.current_testbed = m.group(1) if self.current_testbed == None: raise service_error(service_error.req, "Bad request format (unnamed testbed)") allocated[self.current_testbed] = \ allocated.get(self.current_testbed,0) + 1 tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed) if not os.path.exists(tb_dir): try: os.mkdir(tb_dir) except IOError: raise service_error(service_error.internal, "Cannot create %s" % tb_dir) try: self.testbed_file = open("%s/%s.%s.tcl" % (tb_dir, self.eid, self.current_testbed), 'w') except IOError: self.testbed_file = None return True else: return False else: m = self.end_testbed.match(line) if m != None: if m.group(1) != self.current_testbed: raise service_error(service_error.internal, "Mismatched testbed markers!?") if self.testbed_file != None: self.testbed_file.close() self.testbed_file = None self.current_testbed = None elif self.testbed_file: # Substitute variables and put the line into the local # testbed file. gwtype = tbparams[self.current_testbed].get('gwtype', self.def_gwtype) gwimage = tbparams[self.current_testbed].get('gwimage', self.def_gwimage) mgwstart = tbparams[self.current_testbed].get('mgwstart', self.def_mgwstart) mexpstart = tbparams[self.current_testbed].get('mexpstart', self.def_mexpstart) gwstart = tbparams[self.current_testbed].get('gwstart', self.def_gwstart) expstart = tbparams[self.current_testbed].get('expstart', self.def_expstart) project = tbparams[self.current_testbed].get('project') line = re.sub("GWTYPE", gwtype, line) line = re.sub("GWIMAGE", gwimage, line) if self.current_testbed == master: line = re.sub("GWSTART", mgwstart, line) line = re.sub("EXPSTART", mexpstart, line) else: line = re.sub("GWSTART", gwstart, line) line = re.sub("EXPSTART", expstart, line) # XXX: does `` embed without doing enything else? line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line) line = re.sub("PROJDIR", "/proj/%s/" % project, line) line = re.sub("EID", self.eid, line) line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \ (project, self.eid), line) print >>self.testbed_file, line return True class allbeds: def __init__(self, get_access): self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds") self.end_allbeds = re.compile("^#\s+End\s+Allbeds") self.in_allbeds = False self.get_access = get_access def __call__(self, line, user, tbparams): # Testbed access parameters if not self.in_allbeds: if self.begin_allbeds.match(line): self.in_allbeds = True return True else: return False else: if self.end_allbeds.match(line): self.in_allbeds = False else: nodes = line.split('|') tb = nodes.pop(0) self.get_access(tb, nodes, user, tbparams) return True class gateways: def __init__(self, eid, master, tmpdir, gw_pubkey, gw_secretkey, copy_file): self.begin_gateways = \ re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)") self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)") self.current_gateways = None self.control_gateway = None self.active_end = { } self.eid = eid self.master = master self.tmpdir = tmpdir self.gw_pubkey_base = gw_pubkey self.gw_secretkey_base = gw_secretkey self.copy_file = copy_file def gateway_conf_file(self, gw, master, eid, pubkey, privkey, active_end, tbparams, dtb, myname, desthost, type): """ Produce a gateway configuration file from a gateways line. """ sproject = tbparams[gw].get('project', 'project') dproject = tbparams[dtb].get('project', 'project') sdomain = ".%s.%s%s" % (eid, sproject, tbparams[gw].get('domain', ".example.com")) ddomain = ".%s.%s%s" % (eid, dproject, tbparams[dtb].get('domain', ".example.com")) boss = tbparams[master].get('boss', "boss") fs = tbparams[master].get('fs', "fs") event_server = "%s%s" % \ (tbparams[gw].get('eventserver', "event_server"), tbparams[gw].get('domain', "example.com")) remote_event_server = "%s%s" % \ (tbparams[dtb].get('eventserver', "event_server"), tbparams[dtb].get('domain', "example.com")) seer_control = "%s%s" % \ (tbparams[gw].get('control', "control"), sdomain) remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) tunnel_cfg = tbparams[gw].get("tun", "false") conf_file = "%s%s.gw.conf" % (myname, sdomain) remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) # translate to lower case so the `hostname` hack for specifying # configuration files works. conf_file = conf_file.lower(); remote_conf_file = remote_conf_file.lower(); if dtb == master: active = "false" elif gw == master: active = "true" elif active_end.has_key['%s-%s' % (dtb, gw)]: active = "false" else: active_end['%s-%s' % (gw, dtb)] = 1 active = "true" gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w") print >>gwconfig, "Active: %s" % active print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg print >>gwconfig, "BossName: %s" % boss print >>gwconfig, "FsName: %s" % fs print >>gwconfig, "EventServerName: %s" % event_server print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server print >>gwconfig, "SeerControl: %s" % seer_control print >>gwconfig, "Type: %s" % type print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \ local_script_dir print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid) print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid) print >>gwconfig, "RemoteConfigFile: %s/%s" % \ (remote_script_dir, remote_conf_file) print >>gwconfig, "Peer: %s%s" % (desthost, ddomain) print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey) print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey) gwconfig.close() return active == "true" def __call__(self, line, allocated, tbparams): # Process gateways if not self.current_gateways: m = self.begin_gateways.match(line) if m: self.current_gateways = m.group(1) if allocated.has_key(self.current_gateways): # This test should always succeed tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways) if not os.path.exists(tb_dir): try: os.mkdir(tb_dir) except IOError: raise service_error(service_error.internal, "Cannot create %s" % tb_dir) else: # XXX self.log.error("[gateways]: Ignoring gateways for " + \ "unknown testbed %s" % self.current_gateways) self.current_gateways = None return True else: return False else: m = self.end_gateways.match(line) if m : if m.group(1) != self.current_gateways: raise service_error(service_error.internal, "Mismatched gateway markers!?") if self.control_gateway: try: cc = open("%s/%s/client.conf" % (self.tmpdir, self.current_gateways), 'w') print >>cc, "ControlGateway: %s" % \ self.control_gateway if tbparams[self.master].has_key('smbshare'): print >>cc, "SMBSHare: %s" % \ tbparams[self.master]['smbshare'] print >>cc, "ProjectUser: %s" % \ tbparams[self.master]['user'] print >>cc, "ProjectName: %s" % \ tbparams[self.master]['project'] cc.close() except IOError: raise service_error(service_error.internal, "Error creating client config") try: cc = open("%s/%s/seer.conf" % (self.tmpdir, self.current_gateways), 'w') if self.current_gateways != self.master: print >>cc, "ControlNode: %s" % \ self.control_gateway print >>cc, "ExperimentID: %s/%s" % \ ( tbparams[self.master]['project'], \ self.eid ) cc.close() except IOError: raise service_error(service_error.internal, "Error creating seer config") else: debug.error("[gateways]: No control gateway for %s" %\ self.current_gateways) self.current_gateways = None else: dtb, myname, desthost, type = line.split(" ") if type == "control" or type == "both": self.control_gateway = "%s.%s.%s%s" % (myname, self.eid, tbparams[self.current_gateways]['project'], tbparams[self.current_gateways]['domain']) try: active = self.gateway_conf_file(self.current_gateways, self.master, self.eid, self.gw_pubkey_base, self.gw_secretkey_base, self.active_end, tbparams, dtb, myname, desthost, type) except IOError, e: raise service_error(service_error.internal, "Failed to write config file for %s" % \ self.current_gateway) gw_pubkey = "%s/keys/%s" % \ (self.tmpdir, self.gw_pubkey_base) gw_secretkey = "%s/keys/%s" % \ (self.tmpdir, self.gw_secretkey_base) pkfile = "%s/%s/%s" % \ ( self.tmpdir, self.current_gateways, self.gw_pubkey_base) skfile = "%s/%s/%s" % \ ( self.tmpdir, self.current_gateways, self.gw_secretkey_base) if not os.path.exists(pkfile): try: self.copy_file(gw_pubkey, pkfile) except IOError: service_error(service_error.internal, "Failed to copy pubkey file") if active and not os.path.exists(skfile): try: self.copy_file(gw_secretkey, skfile) except IOError: service_error(service_error.internal, "Failed to copy secretkey file") return True class shunt_to_file: def __init__(self, begin, end, filename): self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.file = None self.filename = filename def __call__(self, line): if not self.in_shunt: if self.begin.match(line): self.in_shunt = True try: self.file = open(self.filename, "w") except: self.file = None raise return True else: return False else: if self.end.match(line): if self.file: self.file.close() self.file = None self.in_shunt = False else: if self.file: print >>self.file, line return True class shunt_to_list: def __init__(self, begin, end): self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.list = [ ] def __call__(self, line): if not self.in_shunt: if self.begin.match(line): self.in_shunt = True return True else: return False else: if self.end.match(line): self.in_shunt = False else: self.list.append(line) return True class shunt_to_string: def __init__(self, begin, end): self.begin = re.compile(begin) self.end = re.compile(end) self.in_shunt = False self.str = "" def __call__(self, line): if not self.in_shunt: if self.begin.match(line): self.in_shunt = True return True else: return False else: if self.end.match(line): self.in_shunt = False else: self.str += line return True def create_experiment(self, req, fid): try: tmpdir = tempfile.mkdtemp(prefix="split-") except IOError: raise service_error(service_error.internal, "Cannot create tmp dir") gw_pubkey_base = "fed.%s.pub" % self.ssh_type gw_secretkey_base = "fed.%s" % self.ssh_type gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base tclfile = tmpdir + "/experiment.tcl" tbparams = { } pid = "dummy" gid = "dummy" # XXX fail_soft = False try: os.mkdir(tmpdir+"/keys") except OSError: raise service_error(service_error.internal, "Can't make temporary dir") req = req.get('CreateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no CreateRequestBody)") # The tcl parser needs to read a file so put the content into that file file_content=req.get('experimentdescription', None) if file_content: try: f = open(tclfile, 'w') f.write(file_content) f.close() except IOError: raise service_error(service_error.internal, "Cannot write temp experiment description") else: raise service_error(service_error.req, "No experiment description") if req.has_key('experimentID') and \ req['experimentID'].has_key('localname'): eid = req['experimentID']['localname'] self.state_lock.acquire() while (self.state.has_key(eid)): eid += random.choice(string.ascii_letters) self.state[eid] = "placeholder" self.state_lock.release() else: eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state_lock.acquire() while (self.state.has_key(eid)): eid = self.exp_stem for i in range(0,5): eid += random.choice(string.ascii_letters) self.state[eid] = "placeholder" self.state_lock.release() try: self.generate_ssh_keys(gw_secretkey, self.ssh_type) except ValueError: raise service_error(service_error.server_config, "Bad key type (%s)" % self.ssh_type) user = req.get('user', None) if user == None: raise service_error(service_error.req, "No user") master = req.get('master', None) if master == None: raise service_error(service_error.req, "No master testbed label") tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', str(self.muxmax), '-m', master, pid, gid, eid, tclfile] tclparser = Popen(tclcmd, stdout=PIPE) allocated = { } started = { } parse_current_testbed = self.current_testbed(eid, tmpdir) parse_allbeds = self.allbeds(self.get_access) parse_gateways = self.gateways(eid, master, tmpdir, gw_pubkey_base, gw_secretkey_base, self.copy_file) parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", "^#\s+End\s+Vtopo") parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", "^#\s+End\s+hostnames", tmpdir + "/hosts") parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", "^#\s+End\s+tarfiles") parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", "^#\s+End\s+rpms") for line in tclparser.stdout: line = line.rstrip() if parse_current_testbed(line, master, allocated, tbparams): continue elif parse_allbeds(line, user, tbparams): continue elif parse_gateways(line, allocated, tbparams): continue elif parse_vtopo(line): continue elif parse_hostnames(line): continue elif parse_tarfiles(line): continue elif parse_rpms(line): continue else: raise service_error(service_error.internal, "Bad tcl parse? %s" % line) vtopo = self.gentopo(parse_vtopo.str) if not vtopo: raise service_error(service_error.internal, "Failed to generate virtual topology") vis = self.genviz(vtopo) if not vis: raise service_error(service_error.internal, "Failed to generate visualization") # save federant information for k in allocated.keys(): tbparams[k]['federant'] = {\ 'name': [ { 'localname' : eid} ],\ 'emulab': tbparams[k]['emulab'],\ 'master' : k == master,\ } # Copy tarfiles and rpms needed at remote sites into a staging area try: for t in parse_tarfiles.list: if not os.path.exists("%s/tarfiles" % tmpdir): os.mkdir("%s/tarfiles" % tmpdir) self.copy_file(t, "%s/tarfiles/%s" % \ (tmpdir, os.path.basename(t))) for r in parse_rpms.list: if not os.path.exists("%s/rpms" % tmpdir): os.mkdir("%s/rpms" % tmpdir) self.copy_file(r, "%s/rpms/%s" % \ (tmpdir, os.path.basename(r))) except IOError, e: raise service_error(service_error.internal, "Cannot stage tarfile/rpm: %s" % e.strerror) thread_pool_info = self.thread_pool() threads = [ ] for tb in [ k for k in allocated.keys() if k != master]: # Wait until we have a free slot to start the next testbed load thread_pool_info.acquire() while thread_pool_info.started - \ thread_pool_info.terminated >= self.nthreads: thread_pool_info.wait() thread_pool_info.release() # Create and start a thread to start the segment, and save it to # get the return value later t = self.pooled_thread(target=self.start_segment, args=(tb, eid, tbparams, tmpdir, 0), name=tb, pdata=thread_pool_info, trace_file=self.trace_file) threads.append(t) t.start() # Wait until all finish (the first clause of the while is to make sure # one starts) thread_pool_info.acquire() while thread_pool_info.started == 0 or \ thread_pool_info.started > thread_pool_info.terminated: thread_pool_info.wait() thread_pool_info.release() # If none failed, start the master failed = [ t.getName() for t in threads if not t.rv ] if len(failed) == 0: if not self.start_segment(master, eid, tbparams, tmpdir): failed.append(master) succeeded = [tb for tb in allocated.keys() if tb not in failed] # If one failed clean up, unless fail_soft is set if failed: if not fail_soft: for tb in succeeded: self.stop_segment(tb, eid, tbparams) # Remove the placeholder self.state_lock.acquire() del self.state[eid] self.state_lock.release() raise service_error(service_error.federant, "Swap in failed on %s" % ",".join(failed)) else: self.log.info("[start_segment]: Experiment %s started" % eid) # Generate an ID for the experiment (slice) and a certificate that the # allocator can use to prove they own it. We'll ship it back through # the encrypted connection. (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) self.log.debug("[start_experiment]: removing %s" % tmpdir) # Walk up tmpdir, deleting as we go for path, dirs, files in os.walk(tmpdir, topdown=False): for f in files: os.remove(os.path.join(path, f)) for d in dirs: os.rmdir(os.path.join(path, d)) os.rmdir(tmpdir) resp = { 'federant' : [ tbparams[tb]['federant'] \ for tb in tbparams.keys() \ if tbparams[tb].has_key('federant') ],\ 'vtopo': vtopo,\ 'vis' : vis, 'experimentID' : [\ { 'fedid': copy.copy(expid) }, \ { 'localname': eid },\ ],\ 'experimentAccess': { 'X509' : expcert },\ } self.state_lock.acquire() self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ for tb in tbparams.keys() \ if tbparams[tb].has_key('federant') ],\ 'vtopo': vtopo,\ 'vis' : vis, 'experimentID' : [\ { 'fedid': expid }, { 'localname': eid },\ ],\ } self.state[eid] = self.state[expid] if self.state_filename: self.write_state() self.state_lock.release() if not failed: return resp else: raise service_error(service_error.partial, \ "Partial swap in on %s" % ",".join(succeeded)) def get_vtopo(self, req, fid): rv = None req = req.get('VtopoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VtopoRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = fedid(bits=exp['fedid']) keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.state_lock.acquire() if self.state.has_key(key): rv = { 'experiment' : {keytype: key },\ 'vtopo': self.state[key]['vtopo'],\ } self.state_lock.release() if rv: return rv else: raise service_error(service_error.req, "No such experiment") def get_vis(self, req, fid): rv = None req = req.get('VisRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VisRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = fedid(bits=exp['fedid']) keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.state_lock.acquire() if self.state.has_key(key): rv = { 'experiment' : {keytype: key },\ 'vis': self.state[key]['vis'],\ } self.state_lock.release() if rv: return rv else: raise service_error(service_error.req, "No such experiment") def get_info(self, req, fid): rv = None req = req.get('InfoRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no VisRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = fedid(bits=exp['fedid']) keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") # The state may be massaged by the service function that called # get_info (e.g., encoded for XMLRPC transport) so send a copy of the # state. self.state_lock.acquire() if self.state.has_key(key): rv = copy.deepcopy(self.state[key]) self.state_lock.release() if rv: return rv else: raise service_error(service_error.req, "No such experiment") def terminate_experiment(self, req, fid): tbparams = { } req = req.get('TerminateRequestBody', None) if not req: raise service_error(service_error.req, "Bad request format (no TerminateRequestBody)") exp = req.get('experiment', None) if exp: if exp.has_key('fedid'): key = fedid(bits=exp['fedid']) keytype = "fedid" elif exp.has_key('localname'): key = exp['localname'] keytype = "localname" else: raise service_error(service_error.req, "Unknown lookup type") else: raise service_error(service_error.req, "No request?") self.state_lock.acquire() fed_exp = self.state.get(key, None) if fed_exp: # This branch of the conditional holds the lock to generate a # consistent temporary tbparams variable to deallocate experiments. # It releases the lock to do the deallocations and reacquires it to # remove the experiment state when the termination is complete. ids = [] # experimentID is a list of dicts that are self-describing # identifiers. This finds all the fedids and localnames - the # keys of self.state - and puts them into ids. for id in fed_exp.get('experimentID', []): if id.has_key('fedid'): ids.append(id['fedid']) if id.has_key('localname'): ids.append(id['localname']) # Construct enough of the tbparams to make the stop_segment calls # work for fed in fed_exp['federant']: try: for e in fed['name']: eid = e.get('localname', None) if eid: break else: continue p = fed['emulab']['project'] project = p['name']['localname'] tb = p['testbed']['localname'] user = p['user'][0]['userID']['localname'] domain = fed['emulab']['domain'] host = "%s%s" % (fed['emulab']['ops'], domain) except KeyError, e: continue tbparams[tb] = {\ 'user': user,\ 'domain': domain,\ 'project': project,\ 'host': host,\ 'eid': eid,\ } self.state_lock.release() # Stop everyone. for tb in tbparams.keys(): self.stop_segment(tb, tbparams[tb]['eid'], tbparams) # Remove teh terminated experiment self.state_lock.acquire() for id in ids: if self.state.has_key(id): del self.state[id] if self.state_filename: self.write_state() self.state_lock.release() return { 'experiment': exp } else: # Don't forget to release the lock self.state_lock.release() raise service_error(service_error.req, "No saved state") if __name__ == '__main__': from optparse import OptionParser parser = OptionParser() parser.add_option('-d', '--debug', dest='debug', default=False, action='store_true', help='print actions rather than take them') parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse') parser.add_option('-m', '--master', dest='master', help='testbed label for matster testbd') parser.add_option('-t', '--trace', dest='trace', default=None, help='file to print intermediate messages to') parser.add_option('-T', '--trace-stderr', dest='trace', action='store_const',const=sys.stderr, help='file to print intermediate messages to') opts, args = parser.parse_args() trace_file = None if opts.trace: try: trace_file = open(opts.trace, 'w') except IOError: print >>sys.stderr, "Can't open trace file" if opts.debug: if not trace_file: trace_file = sys.stderr if opts.tcl != None: try: f = open(opts.tcl, 'r') content = ''.join(f) f.close() except IOError, e: sys.exit("Can't read %s: %s" % (opts.tcl, e)) else: sys.exit("Must specify a file name") if not opts.master: sys.exit("Must supply master tb label (--master)"); obj = fedd_create_experiment_local( debug=opts.debug, scripts_dir="/users/faber/testbed/federation", cert_file="./fedd_client.pem", cert_pwd="faber", ssh_pubkey_file='/users/faber/.ssh/id_rsa.pub', trusted_certs="./cacert.pem", tbmap = { 'deter':'https://users.isi.deterlab.net:23235', 'emulab':'https://users.isi.deterlab.net:23236', 'ucb':'https://users.isi.deterlab.net:23237', }, trace_file=trace_file ) rv = obj.create_experiment( {\ 'experimentdescription' : content, 'master' : opts.master, 'user': [ {'userID' : { 'localname' : 'faber' } } ], }, None) print rv