Changeset cc8d8e9 for fedd/federation/access.py
- Timestamp:
- Aug 28, 2009 6:07:42 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 6c57fe9
- Parents:
- 4c8a0b7
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r4c8a0b7 rcc8d8e9 105 105 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 106 106 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 107 'StartSegment': soap_handler("StartSegment", self.StartSegment), 107 108 } 108 109 self.xmlrpc_services = {\ … … 111 112 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', 112 113 self.ReleaseAccess), 114 'StartSegment': xmlrpc_handler('StartSegment', 115 self.StartSegment), 113 116 } 114 117 … … 772 775 773 776 777 778 class emulab_segment: 779 class ssh_cmd_timeout(RuntimeError): pass 780 781 def __init__(self, log=None, keyfile=None, debug=False): 782 self.log = log or logging.getLogger(\ 783 'fedd.experiment_control.emulab_segment') 784 self.ssh_privkey_file = keyfile 785 self.debug = debug 786 self.ssh_exec="/usr/bin/ssh" 787 self.scp_exec = "/usr/bin/scp" 788 self.ssh_cmd_timeout = emulab_segment.ssh_cmd_timeout 789 790 def scp_file(self, file, user, host, dest=""): 791 """ 792 scp a file to the remote host. If debug is set the action is only 793 logged. 794 """ 795 796 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 797 '-o', 'StrictHostKeyChecking yes', '-i', 798 self.ssh_privkey_file, file, 799 "%s@%s:%s" % (user, host, dest)] 800 rv = 0 801 802 try: 803 dnull = open("/dev/null", "w") 804 except IOError: 805 self.log.debug("[ssh_file]: failed to open " + \ 806 "/dev/null for redirect") 807 dnull = Null 808 809 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 810 if not self.debug: 811 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True, 812 close_fds=True) 813 814 return rv == 0 815 816 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 817 """ 818 Run a remote command on host as user. If debug is set, the action 819 is only logged. Commands are run without stdin, to avoid stray 820 SIGTTINs. 821 """ 822 sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \ 823 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 824 (self.ssh_exec, self.ssh_privkey_file, 825 user, host, cmd) 826 827 try: 828 dnull = open("/dev/null", "w") 829 except IOError: 830 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ 831 "for redirect") 832 dnull = Null 833 834 self.log.debug("[ssh_cmd]: %s" % sh_str) 835 if not self.debug: 836 if dnull: 837 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull, 838 close_fds=True) 839 else: 840 sub = Popen(sh_str, shell=True, 841 close_fds=True) 842 if timeout: 843 i = 0 844 rv = sub.poll() 845 while i < timeout: 846 if rv is not None: break 847 else: 848 time.sleep(1) 849 rv = sub.poll() 850 i += 1 851 else: 852 self.log.debug("Process exceeded runtime: %s" % sh_str) 853 os.kill(sub.pid, signal.SIGKILL) 854 raise self.ssh_cmd_timeout(); 855 return rv == 0 856 else: 857 return sub.wait() == 0 858 else: 859 if timeout == 0: 860 self.log.debug("debug timeout raised on %s " % sh_str) 861 raise self.ssh_cmd_timeout() 862 else: 863 return True 864 865 class start_segment(emulab_segment): 866 def __init__(self, log=None, keyfile=None, debug=False): 867 experiment_control_local.emulab_segment.__init__(self, 868 log=log, keyfile=keyfile, debug=debug) 869 870 def create_config_tree(self, src_dir, dest_dir, script): 871 """ 872 Append commands to script that will create the directory hierarchy 873 on the remote federant. 874 """ 875 876 if os.path.isdir(src_dir): 877 print >>script, "mkdir -p %s" % dest_dir 878 print >>script, "chmod 770 %s" % dest_dir 879 880 for f in os.listdir(src_dir): 881 if os.path.isdir(f): 882 self.create_config_tree("%s/%s" % (src_dir, f), 883 "%s/%s" % (dest_dir, f), script) 884 else: 885 self.log.debug("[create_config_tree]: Not a directory: %s" \ 886 % src_dir) 887 888 def ship_configs(self, host, user, src_dir, dest_dir): 889 """ 890 Copy federant-specific configuration files to the federant. 891 """ 892 for f in os.listdir(src_dir): 893 if os.path.isdir(f): 894 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 895 "%s/%s" % (dest_dir, f)): 896 return False 897 else: 898 if not self.scp_file("%s/%s" % (src_dir, f), 899 user, host, dest_dir): 900 return False 901 return True 902 903 def get_state(self, user, host, tb, pid, eid): 904 # command to test experiment state 905 expinfo_exec = "/usr/testbed/bin/expinfo" 906 # Regular expressions to parse the expinfo response 907 state_re = re.compile("State:\s+(\w+)") 908 no_exp_re = re.compile("^No\s+such\s+experiment") 909 swapping_re = re.compile("^No\s+information\s+available.") 910 state = None # Experiment state parsed from expinfo 911 # The expinfo ssh command. Note the identity restriction to use 912 # only the identity provided in the pubkey given. 913 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 914 'StrictHostKeyChecking yes', '-i', 915 self.ssh_privkey_file, "%s@%s" % (user, host), 916 expinfo_exec, pid, eid] 917 918 dev_null = None 919 try: 920 dev_null = open("/dev/null", "a") 921 except IOError, e: 922 self.log.error("[get_state]: can't open /dev/null: %s" %e) 923 924 if self.debug: 925 state = 'swapped' 926 rv = 0 927 else: 928 status = Popen(cmd, stdout=PIPE, stderr=dev_null, 929 close_fds=True) 930 for line in status.stdout: 931 m = state_re.match(line) 932 if m: state = m.group(1) 933 else: 934 for reg, st in ((no_exp_re, "none"), 935 (swapping_re, "swapping")): 936 m = reg.match(line) 937 if m: state = st 938 rv = status.wait() 939 940 # If the experiment is not present the subcommand returns a 941 # non-zero return value. If we successfully parsed a "none" 942 # outcome, ignore the return code. 943 if rv != 0 and state != 'none': 944 raise service_error(service_error.internal, 945 "Cannot get status of segment %s:%s/%s" % \ 946 (tb, pid, eid)) 947 elif state not in ('active', 'swapped', 'swapping', 'none'): 948 raise service_error(service_error.internal, 949 "Cannot get status of segment %s:%s/%s" % \ 950 (tb, pid, eid)) 951 else: return state 952 953 954 def __call__(self, tb, eid, tbparams, tmpdir, timeout=0): 955 """ 956 Start a sub-experiment on a federant. 957 958 Get the current state, modify or create as appropriate, ship data 959 and configs and start the experiment. There are small ordering 960 differences based on the initial state of the sub-experiment. 961 """ 962 # ops node in the federant 963 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 964 user = tbparams[tb]['user'] # federant user 965 pid = tbparams[tb]['project'] # federant project 966 # XXX 967 base_confs = ( "hosts",) 968 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 969 # Configuration directories on the remote machine 970 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 971 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 972 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 973 974 state = self.get_state(user, host, tb, pid, eid) 975 976 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 977 self.log.info("[start_segment]:transferring experiment to %s" % tb) 978 979 if not self.scp_file("%s/%s/%s" % \ 980 (tmpdir, tb, tclfile), user, host): 981 return False 982 983 if state == 'none': 984 # Create a null copy of the experiment so that we capture any 985 # logs there if the modify fails. Emulab software discards the 986 # logs from a failed startexp 987 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 988 return False 989 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 990 timedout = False 991 try: 992 if not self.ssh_cmd(user, host, 993 ("/usr/testbed/bin/startexp -i -f -w -p %s " + 994 "-e %s null.tcl") % (pid, eid), "startexp", 995 timeout=60 * 10): 996 return False 997 except self.ssh_cmd_timeout: 998 timedout = True 999 1000 if timedout: 1001 state = self.get_state(user, host, tb, pid, eid) 1002 if state != "swapped": 1003 return False 1004 1005 1006 # Open up a temporary file to contain a script for setting up the 1007 # filespace for the new experiment. 1008 self.log.info("[start_segment]: creating script file") 1009 try: 1010 sf, scriptname = tempfile.mkstemp() 1011 scriptfile = os.fdopen(sf, 'w') 1012 except IOError: 1013 return False 1014 1015 scriptbase = os.path.basename(scriptname) 1016 1017 # Script the filesystem changes 1018 print >>scriptfile, "/bin/rm -rf %s" % proj_dir 1019 # Clear and create the tarfiles and rpm directories 1020 for d in (tarfiles_dir, rpms_dir): 1021 print >>scriptfile, "/bin/rm -rf %s/*" % d 1022 print >>scriptfile, "mkdir -p %s" % d 1023 print >>scriptfile, 'mkdir -p %s' % proj_dir 1024 self.create_config_tree("%s/%s" % (tmpdir, tb), 1025 proj_dir, scriptfile) 1026 if os.path.isdir("%s/tarfiles" % tmpdir): 1027 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, 1028 scriptfile) 1029 if os.path.isdir("%s/rpms" % tmpdir): 1030 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 1031 scriptfile) 1032 print >>scriptfile, "rm -f %s" % scriptbase 1033 scriptfile.close() 1034 1035 # Move the script to the remote machine 1036 # XXX: could collide tempfile names on the remote host 1037 if self.scp_file(scriptname, user, host, scriptbase): 1038 os.remove(scriptname) 1039 else: 1040 return False 1041 1042 # Execute the script (and the script's last line deletes it) 1043 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 1044 return False 1045 1046 for f in base_confs: 1047 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 1048 "%s/%s" % (proj_dir, f)): 1049 return False 1050 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 1051 proj_dir): 1052 return False 1053 if os.path.isdir("%s/tarfiles" % tmpdir): 1054 if not self.ship_configs(host, user, 1055 "%s/tarfiles" % tmpdir, tarfiles_dir): 1056 return False 1057 if os.path.isdir("%s/rpms" % tmpdir): 1058 if not self.ship_configs(host, user, 1059 "%s/rpms" % tmpdir, tarfiles_dir): 1060 return False 1061 # Stage the new configuration (active experiments will stay swapped 1062 # in now) 1063 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 1064 try: 1065 if not self.ssh_cmd(user, host, 1066 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 1067 (pid, eid, tclfile), 1068 "modexp", timeout= 60 * 10): 1069 return False 1070 except self.ssh_cmd_timeout: 1071 self.log.error("Modify command failed to complete in time") 1072 # There's really no way to see if this succeeded or failed, so 1073 # if it hangs, assume the worst. 1074 return False 1075 # Active experiments are still swapped, this swaps the others in. 1076 if state != 'active': 1077 self.log.info("[start_segment]: Swapping %s in on %s" % \ 1078 (eid, tb)) 1079 timedout = False 1080 try: 1081 if not self.ssh_cmd(user, host, 1082 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 1083 "swapexp", timeout=10*60): 1084 return False 1085 except self.ssh_cmd_timeout: 1086 timedout = True 1087 1088 # If the command was terminated, but completed successfully, 1089 # report success. 1090 if timedout: 1091 self.log.debug("[start_segment]: swapin timed out " +\ 1092 "checking state") 1093 state = self.get_state(user, host, tb, pid, eid) 1094 self.log.debug("[start_segment]: state is %s" % state) 1095 return state == 'active' 1096 # Everything has gone OK. 1097 return True 1098 1099 class stop_segment(emulab_segment): 1100 def __init__(self, log=None, keyfile=None, debug=False): 1101 experiment_control_local.emulab_segment.__init__(self, 1102 log=log, keyfile=keyfile, debug=debug) 1103 1104 def __call__(self, tb, eid, tbparams): 1105 """ 1106 Stop a sub experiment by calling swapexp on the federant 1107 """ 1108 user = tbparams[tb]['user'] 1109 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 1110 pid = tbparams[tb]['project'] 1111 1112 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 1113 rv = False 1114 try: 1115 # Clean out tar files: we've gone over quota in the past 1116 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid)) 1117 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \ 1118 (pid, eid)) 1119 rv = self.ssh_cmd(user, host, 1120 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 1121 except self.ssh_cmd_timeout: 1122 rv = False 1123 return rv 1124 1125 def StartSegment(self, req, fid): 1126 try: 1127 req = req['StartSegmentRequestBody'] 1128 except KeyError: 1129 raise service_error(server_error.req, "Badly formed request") 1130 auth_attr = req['allocID']['fedid'] 1131 if self.auth.check_attribute(fid, auth_attr): 1132 print "OK" 1133 else: 1134 print "Fail" 1135 return { 'allocID': req['allocID'] }
Note: See TracChangeset
for help on using the changeset viewer.