#!/usr/local/bin/python import sys, os import re import tempfile import subprocess import logging import time import signal from proxy_segment import proxy_segment from service_error import service_error class start_segment(proxy_segment): """ This starts an experiment on an emulab accessed remotely via ssh. Most of the experiment constuction has been done by the emulab_access object. This just does the wrangling of the emulab commands and collected the node to physical mapping. The routine throws service errors. """ def __init__(self, log=None, keyfile=None, debug=False, boss=None, cert=None): proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) self.null = """ set ns [new Simulator] source tb_compat.tcl set a [$ns node] $ns rtproto Session $ns run """ self.node = { } def get_state(self, user, host, pid, eid): """ Return the state of the experiment as reported by emulab """ # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # Regular expressions to parse the expinfo response state_re = re.compile("State:\s+(\w+)") no_exp_re = re.compile("^No\s+such\s+experiment") swapping_re = re.compile("^No\s+information\s+available.") state = None # Experiment state parsed from expinfo # The expinfo ssh command. Note the identity restriction to use # only the identity provided in the pubkey given. cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking no', '-i', self.ssh_privkey_file, "%s@%s" % (user, host), expinfo_exec, pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except EnvironmentError, e: self.log.error("[get_state]: can't open /dev/null: %s" %e) if self.debug: state = 'swapped' rv = 0 else: self.log.debug("Checking state") status = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=dev_null, close_fds=True) for line in status.stdout: m = state_re.match(line) if m: state = m.group(1) else: for reg, st in ((no_exp_re, "none"), (swapping_re, "swapping")): m = reg.match(line) if m: state = st rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 and state != 'none': raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) elif state not in ('active', 'swapped', 'swapping', 'none'): raise service_error(service_error.internal, "Cannot get status of segment:%s/%s" % (pid, eid)) else: self.log.debug("State is %s" % state) return state def get_mapping(self, user, host, pid, eid): """ Get the physical to virtual mapping from the expinfo command and save it in the self.map member. """ # command to test experiment state expinfo_exec = "/usr/testbed/bin/expinfo" # The expinfo ssh command. Note the identity restriction to use # only the identity provided in the pubkey given. cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 'StrictHostKeyChecking no', '-i', self.ssh_privkey_file, "%s@%s" % (user, host), expinfo_exec, '-m', pid, eid] dev_null = None try: dev_null = open("/dev/null", "a") except EnvironmentError, e: self.log.error("[get_mapping]: can't open /dev/null: %s" %e) if self.debug: rv = 0 else: self.log.debug("Getting mapping for %s %s" % (pid, eid)) phys_start = re.compile('^Physical\s+Node\s+Mapping') phys_line = re.compile('(\S+)(\s+\S+)*\s+(\S+)') phys_end = re.compile('^$') status = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=dev_null, close_fds=True) # Parse the info output. Format: # # stuff # Physical Node Mapping: # ID Type OS Physical # --------------- ------------ --------------- ------------ # virtual dummy dummy physical # foundit = False skip = 0 for line in status.stdout: if phys_start.match(line): skip = 2 foundit = True elif not foundit: continue elif skip > 0: skip -= 1 elif phys_end.match(line): break else: m = phys_line.match(line.strip()) if m: self.node[m.group(1)] = m.group(3) else: self.log.warn( "Matching failed while parsing node mapping: " +\ "line %s" % line) rv = status.wait() # If the experiment is not present the subcommand returns a # non-zero return value. If we successfully parsed a "none" # outcome, ignore the return code. if rv != 0 : raise service_error(service_error.internal, "Cannot get node mapping of segment:%s/%s" % (pid, eid)) else: return True def make_null_experiment(self, user, host, pid, eid, tmpdir, gid=None): """ Create a null copy of the experiment so that we capture any logs there if the modify fails. Emulab software discards the logs from a failed startexp """ if gid is not None: gparam = '-g %s' % gid else: gparam = '' try: f = open("%s/null.tcl" % tmpdir, "w") print >>f, self.null f.close() except EnvironmentError, e: raise service_error(service_error.internal, "Cannot stage tarfile/rpm: %s" % e.strerror) if not self.scp_file("%s/null.tcl" % tmpdir, user, host): return False self.log.info("[start_segment]: Creating %s" % eid) timedout = False try: if not self.ssh_cmd(user, host, ("/usr/testbed/bin/startexp -i -f -w -p %s " + "-e %s %s null.tcl") % (pid, eid, gparam), "startexp", timeout=60 * 10): return False except self.ssh_cmd_timeout: timedout = True if timedout: state = self.get_state(user, host, pid, eid) if state != "swapped": return False return True def set_up_experiment_filespace(self, user, host, pid, eid, tmpdir): """ Send all the software and configuration files into the experiment's file space. To reduce the number of ssh connections, we script many changes and execute the script. """ # Configuration directories on the remote machine proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) softdir = "/proj/%s/software/%s" % (pid, eid) # Local software dir lsoftdir = "%s/software" % tmpdir # Open up a temporary file to contain a script for setting up the # filespace for the new experiment. self.log.info("[start_segment]: creating script file") try: sf, scriptname = tempfile.mkstemp() scriptfile = os.fdopen(sf, 'w') except EnvironmentError: return False scriptbase = os.path.basename(scriptname) # Script the filesystem changes print >>scriptfile, "/bin/rm -rf %s" % proj_dir # Clear and create the software directory print >>scriptfile, "/bin/rm -rf %s/*" % softdir print >>scriptfile, 'mkdir -p %s' % proj_dir if os.path.isdir(lsoftdir): print >>scriptfile, 'mkdir -p %s' % softdir print >>scriptfile, "rm -f %s" % scriptbase scriptfile.close() # Move the script to the remote machine # XXX: could collide tempfile names on the remote host if self.scp_file(scriptname, user, host, scriptbase): os.remove(scriptname) else: return False # Execute the script (and the script's last line deletes it) if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): return False for f in os.listdir(tmpdir): if not os.path.isdir("%s/%s" % (tmpdir, f)): if not self.scp_file("%s/%s" % (tmpdir, f), user, host, "%s/%s" % (proj_dir, f)): return False if os.path.isdir(lsoftdir): for f in os.listdir(lsoftdir): if not os.path.isdir("%s/%s" % (lsoftdir, f)): if not self.scp_file("%s/%s" % (lsoftdir, f), user, host, "%s/%s" % (softdir, f)): return False return True def swap_in(self, user, host, pid, eid): """ Swap experiment in. This includes code to cope with the experiment swaping command timing out, but the experiment being swapped in successfully. """ self.log.info("[start_segment]: Swapping %s in" % eid) timedout = False try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), "swapexp", timeout=25*60): return False except self.ssh_cmd_timeout: timedout = True # If the command was terminated, but completed successfully, # report success. if timedout: self.log.debug("[start_segment]: swapin timed out " +\ "checking state") state = self.get_state(user, host, pid, eid) self.log.debug("[start_segment]: state is %s" % state) return state == 'active' return True def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0, gid=None): """ Start a sub-experiment on a federant. Get the current state, and terminate the experiment if it exists. The group membership of the experiment is difficult to determine or change, so start with a clean slate. Create a new one and ship data and configs and start the experiment. There are small ordering """ # ops node in the federant host = "%s%s" % (parent.ops, parent.domain) state = self.get_state(user, host, pid, eid) if not self.scp_file(tclfile, user, host): return False if state != 'none': self.ssh_cmd(user, host, "/usr/testbed/bin/endexp -w %s %s" % (pid, eid)) # Put a dummy in place to capture logs, and establish an experiment # directory. if not self.make_null_experiment(user, host, pid, eid, tmpdir, gid): return False if not self.set_up_experiment_filespace(user, host, pid, eid, tmpdir): return False # With the filespace in place, we can modify and swap in. self.log.info("[start_segment]: Modifying %s" % eid) try: if not self.ssh_cmd(user, host, "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ (pid, eid, tclfile.rpartition('/')[2]), "modexp", timeout= 60 * 10): return False except self.ssh_cmd_timeout: self.log.error("Modify command failed to complete in time") # There's really no way to see if this succeeded or failed, so # if it hangs, assume the worst. return False if not self.swap_in(user, host, pid, eid): self.log.error("swap in failed") return False # Everything has gone OK. self.get_mapping(user, host, pid,eid) return True class stop_segment(proxy_segment): def __init__(self, log=None, keyfile=None, debug=False, boss=None, cert=None): proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) def __call__(self, parent, user, pid, eid, gid=None, terminate=False): """ Stop a sub experiment by calling swapexp on the federant """ host = "%s%s" % (parent.ops, parent.domain) self.log.info("[stop_segment]: Stopping %s" % eid) rv = False try: # Clean out tar files: we've gone over quota in the past self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \ (pid, eid)) rv = self.ssh_cmd(user, host, "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) if terminate: rv = self.ssh_cmd(user, host, "/usr/testbed/bin/endexp -w %s %s" % (pid, eid)) except self.ssh_cmd_timeout: rv = False return rv