#!/usr/local/bin/python import sys, os import re import tempfile import subprocess import logging import time import signal import util class local_emulab_segment: class cmd_timeout(RuntimeError): pass def __init__(self, log=None, keyfile=None, debug=False): self.log = log or logging.getLogger(\ 'fedd.access.proxy_emulab_segment') self.certfile = keyfile self.debug = debug self.cmd_timeout = local_emulab_segment.cmd_timeout def copy_file(self, src, dest, size=1024): """ Exceedingly simple file copy. """ if not self.debug: util.copy_file(src, dest, size) else: self.log.debug("Copy %s to %s" % (src, dest)) def cmd_with_timeout(self, cmd, wname=None, timeout=None): """ Run a command. If debug is set, the action is only logged. Commands are run without stdin, to avoid stray SIGTTINs. If timeout is given and the command runs longer, a cmd_timeout exception is thrown. """ try: dnull = open("/dev/null", "w") except EnvironmentError: self.log.debug("[cmd_with_timeout]: failed to open /dev/null " + \ "for redirect") dnull = Null self.log.debug("[cmd_with_timeout]: %s" % cmd) if not self.debug: if dnull: sub = subprocess.Popen(cmd, shell=True, stdout=dnull, stderr=dnull, close_fds=True) else: sub = subprocess.Popen(cmd, shell=True, close_fds=True) if timeout: i = 0 rv = sub.poll() while i < timeout: if rv is not None: break else: time.sleep(1) rv = sub.poll() i += 1 else: self.log.debug("Process exceeded runtime: %s" % cmd) os.kill(sub.pid, signal.SIGKILL) raise self.cmd_timeout(); return rv == 0 else: return sub.wait() == 0 else: if timeout == 0: self.log.debug("debug timeout raised on %s " % cmd) raise self.cmd_timeout() else: return True class start_segment(local_emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): local_emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.null = """ set ns [new Simulator] source tb_compat.tcl set a [$ns node] $ns rtproto Session $ns run """ self.node = { } def get_state(self, pid, eid): # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # Regular expressions to parse the expinfo response state_re = re.compile("State:\s+(\w+)") no_exp_re = re.compile("^No\s+such\s+experiment") swapping_re = re.compile("^No\s+information\s+available.") state = None # Experiment state parsed from expinfo # The expinfo ssh command. Note the identity restriction to use # only the identity provided in the pubkey given. cmd = [ expinfo_exec, pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except EnvironmentError, e: self.log.error("[get_state]: can't open /dev/null: %s" %e) if self.debug: state = 'swapped' rv = 0 else: self.log.debug("Checking state") status = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=dev_null, close_fds=True) for line in status.stdout: m = state_re.match(line) if m: state = m.group(1) else: for reg, st in ((no_exp_re, "none"), (swapping_re, "swapping")): m = reg.match(line) if m: state = st rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 and state != 'none': raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) elif state not in ('active', 'swapped', 'swapping', 'none'): raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) else: self.log.debug("State is %s" % state) return state def get_mapping(self, pid, eid): # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # The expinfo command. cmd = [ expinfo_exec, '-m', pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except EnvironmentError, e: self.log.error("[get_state]: can't open /dev/null: %s" %e) if self.debug: rv = 0 else: self.log.debug("Getting mapping for %s %s" % (pid, eid)) phys_start = re.compile('^Physical\s+Node\s+Mapping') phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)') phys_end = re.compile('^$') status = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=dev_null, close_fds=True) # Parse the info output. Format: # # stuff # Physical Node Mapping: # ID Type OS Physical # --------------- ------------ --------------- ------------ # virtual dummy dummy physical # foundit = False skip = 0 for line in status.stdout: if phys_start.match(line): skip = 2 foundit = True elif not foundit: continue elif skip > 0: skip -= 1 elif phys_end.match(line): break else: m = phys_line.match(line.strip()) if m: self.node[m.group(1)] = m.group(3) else: self.log.warn( "Matching failed while parsing node mapping: " +\ "line %s" % line) rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 : raise service_error(service_error.internal, "Cannot get node mapping of segment:%s/%s" % (pid, eid)) else: return True def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0): """ Start a sub-experiment on a federant. Get the current state, modify or create as appropriate, ship data and configs and start the experiment. There are small ordering differences based on the initial state of the sub-experiment. """ # Configuration directories on this machine proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) softdir = "/proj/%s/software/%s" % (pid, eid) # Softwrae staging directory software dir lsoftdir = "%s/software" % tmpdir state = self.get_state(pid, eid) if state == 'none': # Create a null copy of the experiment so that we capture any # logs there if the modify fails. Emulab software discards the # logs from a failed startexp try: f = open("%s/null.tcl" % tmpdir, "w") print >>f, self.null f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot stage null.tcl: %s" % e.strerror) timedout = False try: if not self.cmd_with_timeout( ("/usr/testbed/bin/startexp -i -f -w -p %s " + "-e %s %s/null.tcl") % (pid, eid, tmpdir), "startexp", timeout=60 * 10): return False except self.cmd_timeout: timedout = True if timedout: state = self.get_state(pid, eid) if state != "swapped": return False # Set up the experiment's file space if not self.cmd_with_timeout("/bin/rm -rf %s" % proj_dir): return False # Clear and create the software and configuration directories if not self.cmd_with_timeout("/bin/rm -rf %s/*" % softdir): return False if not self.cmd_with_timeout('mkdir -p %s' % proj_dir): return False if os.path.isdir(lsoftdir): if not self.cmd_with_timeout('mkdir -p %s' % softdir): return False try: for f in os.listdir(tmpdir): if not os.path.isdir("%s/%s" % (tmpdir, f)): self.copy_file("%s/%s" % (tmpdir, f), "%s/%s" % (proj_dir, f)) if os.path.isdir(lsoftdir): for f in os.listdir(lsoftdir): if not os.path.isdir("%s/%s" % (lsoftdir, f)): self.copy_file("%s/%s" % (lsoftdir, f), "%s/%s" % (softdir, f)) except EnvironmentError, e: self.log.error("Error copying file: %s" %e) return False # Stage the new configuration (active experiments will stay swapped # in now) self.log.info("[start_segment]: Modifying %s" % eid) try: if not self.cmd_with_timeout( "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ (pid, eid, tclfile), "modexp", timeout= 60 * 10): return False except self.cmd_timeout: self.log.error("Modify command failed to complete in time") # There's really no way to see if this succeeded or failed, so # if it hangs, assume the worst. return False # Active experiments are still swapped, this swaps the others in. if state != 'active': self.log.info("[start_segment]: Swapping %s" % eid) timedout = False try: if not self.cmd_with_timeout( "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp", timeout=25*60): return False except self.cmd_timeout: timedout = True # If the command was terminated, but completed successfully, # report success. if timedout: self.log.debug("[start_segment]: swapin timed out " +\ "checking state") state = self.get_state(pid, eid) self.log.debug("[start_segment]: state is %s" % state) if state != 'active': return False # Everything has gone OK. self.get_mapping(pid,eid) return True class stop_segment(local_emulab_segment): def __init__(self, log=None, keyfile=None, debug=False): local_emulab_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) def __call__(self, parent, user, pid, eid): """ Stop a sub experiment by calling swapexp on the federant """ self.log.info("[stop_segment]: Stopping %s" % eid) rv = False try: # Clean out tar files: we've gone over quota in the past self.cmd_with_timeout("rm -rf /proj/%s/software/%s" % (pid, eid)) rv = self.cmd_with_timeout( "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid), timeout = 60*10) except self.cmd_timeout: rv = False return rv