Changeset 11860f52


Ignore:
Timestamp:
Sep 9, 2009 10:20:07 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
3c6dbec
Parents:
40dd8c1
Message:

Emulab access element now supports local and remote operation.

Location:
fedd/federation
Files:
2 added
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r40dd8c1 r11860f52  
    99
    1010from threading import *
    11 import subprocess
    12 import signal
    13 import time
    1411
    1512from util import *
     
    2118from remote_service import xmlrpc_handler, soap_handler, service_caller
    2219
    23 import topdl
    24 import list_log
    2520import httplib
    2621import tempfile
    2722from urlparse import urlparse
     23
     24import topdl
     25import list_log
     26import proxy_emulab_segment
     27import local_emulab_segment
    2828
    2929
     
    6969        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
    7070        self.create_debug = config.getboolean("access", "create_debug")
     71        self.access_type = config.get("access", "type")
     72
     73        self.access_type = self.access_type.lower()
     74        if self.access_type == 'remote_emulab':
     75            self.start_segment = proxy_emulab_segment.start_segment
     76            self.stop_segment = proxy_emulab_segment.stop_segment
     77        elif self.access_type == 'local_emulab':
     78            self.start_segment = local_emulab_segment.start_segment
     79            self.stop_segment = local_emulab_segment.stop_segment
     80        else:
     81            self.start_segment = None
     82            self.stop_segment = None
    7183
    7284        self.attrs = { }
     
    804816                        "Access proxying denied")
    805817
    806 
    807 
    808     class proxy_emulab_segment:
    809         class ssh_cmd_timeout(RuntimeError): pass
    810 
    811         def __init__(self, log=None, keyfile=None, debug=False):
    812             self.log = log or logging.getLogger(\
    813                     'fedd.access.proxy_emulab_segment')
    814             self.ssh_privkey_file = keyfile
    815             self.debug = debug
    816             self.ssh_exec="/usr/bin/ssh"
    817             self.scp_exec = "/usr/bin/scp"
    818             self.ssh_cmd_timeout = access.proxy_emulab_segment.ssh_cmd_timeout
    819 
    820         def scp_file(self, file, user, host, dest=""):
    821             """
    822             scp a file to the remote host.  If debug is set the action is only
    823             logged.
    824             """
    825 
    826             scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
    827                     '-o', 'StrictHostKeyChecking yes', '-i',
    828                     self.ssh_privkey_file, file,
    829                     "%s@%s:%s" % (user, host, dest)]
    830             rv = 0
    831 
    832             try:
    833                 dnull = open("/dev/null", "w")
    834             except IOError:
    835                 self.log.debug("[ssh_file]: failed to open " + \
    836                         "/dev/null for redirect")
    837                 dnull = Null
    838 
    839             self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
    840             if not self.debug:
    841                 rv = subprocess.call(scp_cmd, stdout=dnull,
    842                         stderr=dnull, close_fds=True, close_fds=True)
    843 
    844             return rv == 0
    845 
    846         def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
    847             """
    848             Run a remote command on host as user.  If debug is set, the action
    849             is only logged.  Commands are run without stdin, to avoid stray
    850             SIGTTINs.
    851             """
    852             sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
    853                     "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
    854                     (self.ssh_exec, self.ssh_privkey_file,
    855                             user, host, cmd)
    856 
    857             try:
    858                 dnull = open("/dev/null", "w")
    859             except IOError:
    860                 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
    861                         "for redirect")
    862                 dnull = Null
    863 
    864             self.log.debug("[ssh_cmd]: %s" % sh_str)
    865             if not self.debug:
    866                 if dnull:
    867                     sub = subprocess.Popen(sh_str, shell=True, stdout=dnull,
    868                             stderr=dnull, close_fds=True)
    869                 else:
    870                     sub = subprocess.Popen(sh_str, shell=True, close_fds=True)
    871                 if timeout:
    872                     i = 0
    873                     rv = sub.poll()
    874                     while i < timeout:
    875                         if rv is not None: break
    876                         else:
    877                             time.sleep(1)
    878                             rv = sub.poll()
    879                             i += 1
    880                     else:
    881                         self.log.debug("Process exceeded runtime: %s" % sh_str)
    882                         os.kill(sub.pid, signal.SIGKILL)
    883                         raise self.ssh_cmd_timeout();
    884                     return rv == 0
    885                 else:
    886                     return sub.wait() == 0
    887             else:
    888                 if timeout == 0:
    889                     self.log.debug("debug timeout raised on %s " % sh_str)
    890                     raise self.ssh_cmd_timeout()
    891                 else:
    892                     return True
    893 
    894     class start_segment(proxy_emulab_segment):
    895         def __init__(self, log=None, keyfile=None, debug=False):
    896             access.proxy_emulab_segment.__init__(self, log=log,
    897                     keyfile=keyfile, debug=debug)
    898             self.null = """
    899 set ns [new Simulator]
    900 source tb_compat.tcl
    901 
    902 set a [$ns node]
    903 
    904 $ns rtproto Session
    905 $ns run
    906 """
    907 
    908         def get_state(self, user, host, pid, eid):
    909             # command to test experiment state
    910             expinfo_exec = "/usr/testbed/bin/expinfo" 
    911             # Regular expressions to parse the expinfo response
    912             state_re = re.compile("State:\s+(\w+)")
    913             no_exp_re = re.compile("^No\s+such\s+experiment")
    914             swapping_re = re.compile("^No\s+information\s+available.")
    915             state = None    # Experiment state parsed from expinfo
    916             # The expinfo ssh command.  Note the identity restriction to use
    917             # only the identity provided in the pubkey given.
    918             cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    919                     'StrictHostKeyChecking yes', '-i',
    920                     self.ssh_privkey_file, "%s@%s" % (user, host),
    921                     expinfo_exec, pid, eid]
    922 
    923             dev_null = None
    924             try:
    925                 dev_null = open("/dev/null", "a")
    926             except IOError, e:
    927                 self.log.error("[get_state]: can't open /dev/null: %s" %e)
    928 
    929             if self.debug:
    930                 state = 'swapped'
    931                 rv = 0
    932             else:
    933                 self.log.debug("Checking state")
    934                 status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
    935                         stderr=dev_null, close_fds=True)
    936                 for line in status.stdout:
    937                     m = state_re.match(line)
    938                     if m: state = m.group(1)
    939                     else:
    940                         for reg, st in ((no_exp_re, "none"),
    941                                 (swapping_re, "swapping")):
    942                             m = reg.match(line)
    943                             if m: state = st
    944                 rv = status.wait()
    945 
    946             # If the experiment is not present the subcommand returns a
    947             # non-zero return value.  If we successfully parsed a "none"
    948             # outcome, ignore the return code.
    949             if rv != 0 and state != 'none':
    950                 raise service_error(service_error.internal,
    951                         "Cannot get status of segment:%s/%s" % (pid, eid))
    952             elif state not in ('active', 'swapped', 'swapping', 'none'):
    953                 raise service_error(service_error.internal,
    954                         "Cannot get status of segment:%s/%s" % (pid, eid))
    955             else:
    956                 self.log.debug("State is %s" % state)
    957                 return state
    958 
    959 
    960         def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
    961             """
    962             Start a sub-experiment on a federant.
    963 
    964             Get the current state, modify or create as appropriate, ship data
    965             and configs and start the experiment.  There are small ordering
    966             differences based on the initial state of the sub-experiment.
    967             """
    968             # ops node in the federant
    969             host = "%s%s" % (parent.ops, parent.domain)
    970             # Configuration directories on the remote machine
    971             proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    972             softdir = "/proj/%s/software/%s" % (pid, eid)
    973             # Local software dir
    974             lsoftdir = "%s/software" % tmpdir
    975 
    976             state = self.get_state(user, host, pid, eid)
    977 
    978             if not self.scp_file(tclfile, user, host):
    979                 return False
    980            
    981             if state == 'none':
    982                 # Create a null copy of the experiment so that we capture any
    983                 # logs there if the modify fails.  Emulab software discards the
    984                 # logs from a failed startexp
    985                 try:
    986                     f = open("%s/null.tcl" % tmpdir, "w")
    987                     print >>f, self.null
    988                     f.close()
    989                 except IOError, e:
    990                     raise service_error(service_error.internal,
    991                             "Cannot stage tarfile/rpm: %s" % e.strerror)
    992 
    993                 if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
    994                     return False
    995                 self.log.info("[start_segment]: Creating %s" % eid)
    996                 timedout = False
    997                 try:
    998                     if not self.ssh_cmd(user, host,
    999                             ("/usr/testbed/bin/startexp -i -f -w -p %s " +
    1000                             "-e %s null.tcl") % (pid, eid), "startexp",
    1001                             timeout=60 * 10):
    1002                         return False
    1003                 except self.ssh_cmd_timeout:
    1004                     timedout = True
    1005 
    1006                 if timedout:
    1007                     state = self.get_state(user, host, pid, eid)
    1008                     if state != "swapped":
    1009                         return False
    1010            
    1011             # Open up a temporary file to contain a script for setting up the
    1012             # filespace for the new experiment.
    1013             self.log.info("[start_segment]: creating script file")
    1014             try:
    1015                 sf, scriptname = tempfile.mkstemp()
    1016                 scriptfile = os.fdopen(sf, 'w')
    1017             except IOError:
    1018                 return False
    1019 
    1020             scriptbase = os.path.basename(scriptname)
    1021 
    1022             # Script the filesystem changes
    1023             print >>scriptfile, "/bin/rm -rf %s" % proj_dir
    1024             # Clear and create the software directory
    1025             print >>scriptfile, "/bin/rm -rf %s/*" % softdir
    1026             print >>scriptfile, 'mkdir -p %s' % proj_dir
    1027             if os.path.isdir(lsoftdir):
    1028                 print >>scriptfile, 'mkdir -p %s' % softdir
    1029             print >>scriptfile, "rm -f %s" % scriptbase
    1030             scriptfile.close()
    1031 
    1032             # Move the script to the remote machine
    1033             # XXX: could collide tempfile names on the remote host
    1034             if self.scp_file(scriptname, user, host, scriptbase):
    1035                 os.remove(scriptname)
    1036             else:
    1037                 return False
    1038 
    1039             # Execute the script (and the script's last line deletes it)
    1040             if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
    1041                 return False
    1042 
    1043             for f in os.listdir(tmpdir):
    1044                 if not os.path.isdir("%s/%s" % (tmpdir, f)):
    1045                     if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
    1046                             "%s/%s" % (proj_dir, f)):
    1047                         return False
    1048             if os.path.isdir(lsoftdir):
    1049                 for f in os.listdir(lsoftdir):
    1050                     if not os.path.isdir("%s/%s" % (lsoftdir, f)):
    1051                         if not self.scp_file("%s/%s" % (lsoftdir, f),
    1052                                 user, host, "%s/%s" % (softdir, f)):
    1053                             return False
    1054             # Stage the new configuration (active experiments will stay swapped
    1055             # in now)
    1056             self.log.info("[start_segment]: Modifying %s" % eid)
    1057             try:
    1058                 if not self.ssh_cmd(user, host,
    1059                         "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    1060                                 (pid, eid, tclfile.rpartition('/')[2]),
    1061                         "modexp", timeout= 60 * 10):
    1062                     return False
    1063             except self.ssh_cmd_timeout:
    1064                 self.log.error("Modify command failed to complete in time")
    1065                 # There's really no way to see if this succeeded or failed, so
    1066                 # if it hangs, assume the worst.
    1067                 return False
    1068             # Active experiments are still swapped, this swaps the others in.
    1069             if state != 'active':
    1070                 self.log.info("[start_segment]: Swapping %s" % eid)
    1071                 timedout = False
    1072                 try:
    1073                     if not self.ssh_cmd(user, host,
    1074                             "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    1075                             "swapexp", timeout=10*60):
    1076                         return False
    1077                 except self.ssh_cmd_timeout:
    1078                     timedout = True
    1079                
    1080                 # If the command was terminated, but completed successfully,
    1081                 # report success.
    1082                 if timedout:
    1083                     self.log.debug("[start_segment]: swapin timed out " +\
    1084                             "checking state")
    1085                     state = self.get_state(user, host, pid, eid)
    1086                     self.log.debug("[start_segment]: state is %s" % state)
    1087                     return state == 'active'
    1088             # Everything has gone OK.
    1089             return True
    1090 
    1091     class stop_segment(proxy_emulab_segment):
    1092         def __init__(self, log=None, keyfile=None, debug=False):
    1093             access.proxy_emulab_segment.__init__(self,
    1094                     log=log, keyfile=keyfile, debug=debug)
    1095 
    1096         def __call__(self, parent, user, pid, eid):
    1097             """
    1098             Stop a sub experiment by calling swapexp on the federant
    1099             """
    1100             host = "%s%s" % (parent.ops, parent.domain)
    1101             self.log.info("[stop_segment]: Stopping %s" % eid)
    1102             rv = False
    1103             try:
    1104                 # Clean out tar files: we've gone over quota in the past
    1105                 self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
    1106                         (pid, eid))
    1107                 rv = self.ssh_cmd(user, host,
    1108                         "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    1109             except self.ssh_cmd_timeout:
    1110                 rv = False
    1111             return rv
    1112 
    1113818    def generate_portal_configs(self, topo, pubkey_base, secretkey_base,
    1114819            tmpdir, master):
Note: See TracChangeset for help on using the changeset viewer.