- Timestamp:
- Sep 9, 2009 10:20:07 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 3c6dbec
- Parents:
- 40dd8c1
- Location:
- fedd/federation
- Files:
-
- 2 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r40dd8c1 r11860f52 9 9 10 10 from threading import * 11 import subprocess12 import signal13 import time14 11 15 12 from util import * … … 21 18 from remote_service import xmlrpc_handler, soap_handler, service_caller 22 19 23 import topdl24 import list_log25 20 import httplib 26 21 import tempfile 27 22 from urlparse import urlparse 23 24 import topdl 25 import list_log 26 import proxy_emulab_segment 27 import local_emulab_segment 28 28 29 29 … … 69 69 self.ssh_privkey_file = config.get("access","ssh_privkey_file") 70 70 self.create_debug = config.getboolean("access", "create_debug") 71 self.access_type = config.get("access", "type") 72 73 self.access_type = self.access_type.lower() 74 if self.access_type == 'remote_emulab': 75 self.start_segment = proxy_emulab_segment.start_segment 76 self.stop_segment = proxy_emulab_segment.stop_segment 77 elif self.access_type == 'local_emulab': 78 self.start_segment = local_emulab_segment.start_segment 79 self.stop_segment = local_emulab_segment.stop_segment 80 else: 81 self.start_segment = None 82 self.stop_segment = None 71 83 72 84 self.attrs = { } … … 804 816 "Access proxying denied") 805 817 806 807 808 class proxy_emulab_segment:809 class ssh_cmd_timeout(RuntimeError): pass810 811 def __init__(self, log=None, keyfile=None, debug=False):812 self.log = log or logging.getLogger(\813 'fedd.access.proxy_emulab_segment')814 self.ssh_privkey_file = keyfile815 self.debug = debug816 self.ssh_exec="/usr/bin/ssh"817 self.scp_exec = "/usr/bin/scp"818 self.ssh_cmd_timeout = access.proxy_emulab_segment.ssh_cmd_timeout819 820 def scp_file(self, file, user, host, dest=""):821 """822 scp a file to the remote host. If debug is set the action is only823 logged.824 """825 826 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',827 '-o', 'StrictHostKeyChecking yes', '-i',828 self.ssh_privkey_file, file,829 "%s@%s:%s" % (user, host, dest)]830 rv = 0831 832 try:833 dnull = open("/dev/null", "w")834 except IOError:835 self.log.debug("[ssh_file]: failed to open " + \836 "/dev/null for redirect")837 dnull = Null838 839 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))840 if not self.debug:841 rv = subprocess.call(scp_cmd, stdout=dnull,842 stderr=dnull, close_fds=True, close_fds=True)843 844 return rv == 0845 846 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):847 """848 Run a remote command on host as user. If debug is set, the action849 is only logged. Commands are run without stdin, to avoid stray850 SIGTTINs.851 """852 sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \853 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \854 (self.ssh_exec, self.ssh_privkey_file,855 user, host, cmd)856 857 try:858 dnull = open("/dev/null", "w")859 except IOError:860 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \861 "for redirect")862 dnull = Null863 864 self.log.debug("[ssh_cmd]: %s" % sh_str)865 if not self.debug:866 if dnull:867 sub = subprocess.Popen(sh_str, shell=True, stdout=dnull,868 stderr=dnull, close_fds=True)869 else:870 sub = subprocess.Popen(sh_str, shell=True, close_fds=True)871 if timeout:872 i = 0873 rv = sub.poll()874 while i < timeout:875 if rv is not None: break876 else:877 time.sleep(1)878 rv = sub.poll()879 i += 1880 else:881 self.log.debug("Process exceeded runtime: %s" % sh_str)882 os.kill(sub.pid, signal.SIGKILL)883 raise self.ssh_cmd_timeout();884 return rv == 0885 else:886 return sub.wait() == 0887 else:888 if timeout == 0:889 self.log.debug("debug timeout raised on %s " % sh_str)890 raise self.ssh_cmd_timeout()891 else:892 return True893 894 class start_segment(proxy_emulab_segment):895 def __init__(self, log=None, keyfile=None, debug=False):896 access.proxy_emulab_segment.__init__(self, log=log,897 keyfile=keyfile, debug=debug)898 self.null = """899 set ns [new Simulator]900 source tb_compat.tcl901 902 set a [$ns node]903 904 $ns rtproto Session905 $ns run906 """907 908 def get_state(self, user, host, pid, eid):909 # command to test experiment state910 expinfo_exec = "/usr/testbed/bin/expinfo"911 # Regular expressions to parse the expinfo response912 state_re = re.compile("State:\s+(\w+)")913 no_exp_re = re.compile("^No\s+such\s+experiment")914 swapping_re = re.compile("^No\s+information\s+available.")915 state = None # Experiment state parsed from expinfo916 # The expinfo ssh command. Note the identity restriction to use917 # only the identity provided in the pubkey given.918 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',919 'StrictHostKeyChecking yes', '-i',920 self.ssh_privkey_file, "%s@%s" % (user, host),921 expinfo_exec, pid, eid]922 923 dev_null = None924 try:925 dev_null = open("/dev/null", "a")926 except IOError, e:927 self.log.error("[get_state]: can't open /dev/null: %s" %e)928 929 if self.debug:930 state = 'swapped'931 rv = 0932 else:933 self.log.debug("Checking state")934 status = subprocess.Popen(cmd, stdout=subprocess.PIPE,935 stderr=dev_null, close_fds=True)936 for line in status.stdout:937 m = state_re.match(line)938 if m: state = m.group(1)939 else:940 for reg, st in ((no_exp_re, "none"),941 (swapping_re, "swapping")):942 m = reg.match(line)943 if m: state = st944 rv = status.wait()945 946 # If the experiment is not present the subcommand returns a947 # non-zero return value. If we successfully parsed a "none"948 # outcome, ignore the return code.949 if rv != 0 and state != 'none':950 raise service_error(service_error.internal,951 "Cannot get status of segment:%s/%s" % (pid, eid))952 elif state not in ('active', 'swapped', 'swapping', 'none'):953 raise service_error(service_error.internal,954 "Cannot get status of segment:%s/%s" % (pid, eid))955 else:956 self.log.debug("State is %s" % state)957 return state958 959 960 def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):961 """962 Start a sub-experiment on a federant.963 964 Get the current state, modify or create as appropriate, ship data965 and configs and start the experiment. There are small ordering966 differences based on the initial state of the sub-experiment.967 """968 # ops node in the federant969 host = "%s%s" % (parent.ops, parent.domain)970 # Configuration directories on the remote machine971 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)972 softdir = "/proj/%s/software/%s" % (pid, eid)973 # Local software dir974 lsoftdir = "%s/software" % tmpdir975 976 state = self.get_state(user, host, pid, eid)977 978 if not self.scp_file(tclfile, user, host):979 return False980 981 if state == 'none':982 # Create a null copy of the experiment so that we capture any983 # logs there if the modify fails. Emulab software discards the984 # logs from a failed startexp985 try:986 f = open("%s/null.tcl" % tmpdir, "w")987 print >>f, self.null988 f.close()989 except IOError, e:990 raise service_error(service_error.internal,991 "Cannot stage tarfile/rpm: %s" % e.strerror)992 993 if not self.scp_file("%s/null.tcl" % tmpdir, user, host):994 return False995 self.log.info("[start_segment]: Creating %s" % eid)996 timedout = False997 try:998 if not self.ssh_cmd(user, host,999 ("/usr/testbed/bin/startexp -i -f -w -p %s " +1000 "-e %s null.tcl") % (pid, eid), "startexp",1001 timeout=60 * 10):1002 return False1003 except self.ssh_cmd_timeout:1004 timedout = True1005 1006 if timedout:1007 state = self.get_state(user, host, pid, eid)1008 if state != "swapped":1009 return False1010 1011 # Open up a temporary file to contain a script for setting up the1012 # filespace for the new experiment.1013 self.log.info("[start_segment]: creating script file")1014 try:1015 sf, scriptname = tempfile.mkstemp()1016 scriptfile = os.fdopen(sf, 'w')1017 except IOError:1018 return False1019 1020 scriptbase = os.path.basename(scriptname)1021 1022 # Script the filesystem changes1023 print >>scriptfile, "/bin/rm -rf %s" % proj_dir1024 # Clear and create the software directory1025 print >>scriptfile, "/bin/rm -rf %s/*" % softdir1026 print >>scriptfile, 'mkdir -p %s' % proj_dir1027 if os.path.isdir(lsoftdir):1028 print >>scriptfile, 'mkdir -p %s' % softdir1029 print >>scriptfile, "rm -f %s" % scriptbase1030 scriptfile.close()1031 1032 # Move the script to the remote machine1033 # XXX: could collide tempfile names on the remote host1034 if self.scp_file(scriptname, user, host, scriptbase):1035 os.remove(scriptname)1036 else:1037 return False1038 1039 # Execute the script (and the script's last line deletes it)1040 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):1041 return False1042 1043 for f in os.listdir(tmpdir):1044 if not os.path.isdir("%s/%s" % (tmpdir, f)):1045 if not self.scp_file("%s/%s" % (tmpdir, f), user, host,1046 "%s/%s" % (proj_dir, f)):1047 return False1048 if os.path.isdir(lsoftdir):1049 for f in os.listdir(lsoftdir):1050 if not os.path.isdir("%s/%s" % (lsoftdir, f)):1051 if not self.scp_file("%s/%s" % (lsoftdir, f),1052 user, host, "%s/%s" % (softdir, f)):1053 return False1054 # Stage the new configuration (active experiments will stay swapped1055 # in now)1056 self.log.info("[start_segment]: Modifying %s" % eid)1057 try:1058 if not self.ssh_cmd(user, host,1059 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \1060 (pid, eid, tclfile.rpartition('/')[2]),1061 "modexp", timeout= 60 * 10):1062 return False1063 except self.ssh_cmd_timeout:1064 self.log.error("Modify command failed to complete in time")1065 # There's really no way to see if this succeeded or failed, so1066 # if it hangs, assume the worst.1067 return False1068 # Active experiments are still swapped, this swaps the others in.1069 if state != 'active':1070 self.log.info("[start_segment]: Swapping %s" % eid)1071 timedout = False1072 try:1073 if not self.ssh_cmd(user, host,1074 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),1075 "swapexp", timeout=10*60):1076 return False1077 except self.ssh_cmd_timeout:1078 timedout = True1079 1080 # If the command was terminated, but completed successfully,1081 # report success.1082 if timedout:1083 self.log.debug("[start_segment]: swapin timed out " +\1084 "checking state")1085 state = self.get_state(user, host, pid, eid)1086 self.log.debug("[start_segment]: state is %s" % state)1087 return state == 'active'1088 # Everything has gone OK.1089 return True1090 1091 class stop_segment(proxy_emulab_segment):1092 def __init__(self, log=None, keyfile=None, debug=False):1093 access.proxy_emulab_segment.__init__(self,1094 log=log, keyfile=keyfile, debug=debug)1095 1096 def __call__(self, parent, user, pid, eid):1097 """1098 Stop a sub experiment by calling swapexp on the federant1099 """1100 host = "%s%s" % (parent.ops, parent.domain)1101 self.log.info("[stop_segment]: Stopping %s" % eid)1102 rv = False1103 try:1104 # Clean out tar files: we've gone over quota in the past1105 self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \1106 (pid, eid))1107 rv = self.ssh_cmd(user, host,1108 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))1109 except self.ssh_cmd_timeout:1110 rv = False1111 return rv1112 1113 818 def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 1114 819 tmpdir, master):
Note: See TracChangeset
for help on using the changeset viewer.