Changeset cc8d8e9
- Timestamp:
- Aug 28, 2009 6:07:42 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 6c57fe9
- Parents:
- 4c8a0b7
- Location:
- fedd/federation
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r4c8a0b7 rcc8d8e9 105 105 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 106 106 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 107 'StartSegment': soap_handler("StartSegment", self.StartSegment), 107 108 } 108 109 self.xmlrpc_services = {\ … … 111 112 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', 112 113 self.ReleaseAccess), 114 'StartSegment': xmlrpc_handler('StartSegment', 115 self.StartSegment), 113 116 } 114 117 … … 772 775 773 776 777 778 class emulab_segment: 779 class ssh_cmd_timeout(RuntimeError): pass 780 781 def __init__(self, log=None, keyfile=None, debug=False): 782 self.log = log or logging.getLogger(\ 783 'fedd.experiment_control.emulab_segment') 784 self.ssh_privkey_file = keyfile 785 self.debug = debug 786 self.ssh_exec="/usr/bin/ssh" 787 self.scp_exec = "/usr/bin/scp" 788 self.ssh_cmd_timeout = emulab_segment.ssh_cmd_timeout 789 790 def scp_file(self, file, user, host, dest=""): 791 """ 792 scp a file to the remote host. If debug is set the action is only 793 logged. 794 """ 795 796 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 797 '-o', 'StrictHostKeyChecking yes', '-i', 798 self.ssh_privkey_file, file, 799 "%s@%s:%s" % (user, host, dest)] 800 rv = 0 801 802 try: 803 dnull = open("/dev/null", "w") 804 except IOError: 805 self.log.debug("[ssh_file]: failed to open " + \ 806 "/dev/null for redirect") 807 dnull = Null 808 809 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 810 if not self.debug: 811 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True, 812 close_fds=True) 813 814 return rv == 0 815 816 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 817 """ 818 Run a remote command on host as user. If debug is set, the action 819 is only logged. Commands are run without stdin, to avoid stray 820 SIGTTINs. 821 """ 822 sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \ 823 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 824 (self.ssh_exec, self.ssh_privkey_file, 825 user, host, cmd) 826 827 try: 828 dnull = open("/dev/null", "w") 829 except IOError: 830 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ 831 "for redirect") 832 dnull = Null 833 834 self.log.debug("[ssh_cmd]: %s" % sh_str) 835 if not self.debug: 836 if dnull: 837 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull, 838 close_fds=True) 839 else: 840 sub = Popen(sh_str, shell=True, 841 close_fds=True) 842 if timeout: 843 i = 0 844 rv = sub.poll() 845 while i < timeout: 846 if rv is not None: break 847 else: 848 time.sleep(1) 849 rv = sub.poll() 850 i += 1 851 else: 852 self.log.debug("Process exceeded runtime: %s" % sh_str) 853 os.kill(sub.pid, signal.SIGKILL) 854 raise self.ssh_cmd_timeout(); 855 return rv == 0 856 else: 857 return sub.wait() == 0 858 else: 859 if timeout == 0: 860 self.log.debug("debug timeout raised on %s " % sh_str) 861 raise self.ssh_cmd_timeout() 862 else: 863 return True 864 865 class start_segment(emulab_segment): 866 def __init__(self, log=None, keyfile=None, debug=False): 867 experiment_control_local.emulab_segment.__init__(self, 868 log=log, keyfile=keyfile, debug=debug) 869 870 def create_config_tree(self, src_dir, dest_dir, script): 871 """ 872 Append commands to script that will create the directory hierarchy 873 on the remote federant. 874 """ 875 876 if os.path.isdir(src_dir): 877 print >>script, "mkdir -p %s" % dest_dir 878 print >>script, "chmod 770 %s" % dest_dir 879 880 for f in os.listdir(src_dir): 881 if os.path.isdir(f): 882 self.create_config_tree("%s/%s" % (src_dir, f), 883 "%s/%s" % (dest_dir, f), script) 884 else: 885 self.log.debug("[create_config_tree]: Not a directory: %s" \ 886 % src_dir) 887 888 def ship_configs(self, host, user, src_dir, dest_dir): 889 """ 890 Copy federant-specific configuration files to the federant. 891 """ 892 for f in os.listdir(src_dir): 893 if os.path.isdir(f): 894 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 895 "%s/%s" % (dest_dir, f)): 896 return False 897 else: 898 if not self.scp_file("%s/%s" % (src_dir, f), 899 user, host, dest_dir): 900 return False 901 return True 902 903 def get_state(self, user, host, tb, pid, eid): 904 # command to test experiment state 905 expinfo_exec = "/usr/testbed/bin/expinfo" 906 # Regular expressions to parse the expinfo response 907 state_re = re.compile("State:\s+(\w+)") 908 no_exp_re = re.compile("^No\s+such\s+experiment") 909 swapping_re = re.compile("^No\s+information\s+available.") 910 state = None # Experiment state parsed from expinfo 911 # The expinfo ssh command. Note the identity restriction to use 912 # only the identity provided in the pubkey given. 913 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 914 'StrictHostKeyChecking yes', '-i', 915 self.ssh_privkey_file, "%s@%s" % (user, host), 916 expinfo_exec, pid, eid] 917 918 dev_null = None 919 try: 920 dev_null = open("/dev/null", "a") 921 except IOError, e: 922 self.log.error("[get_state]: can't open /dev/null: %s" %e) 923 924 if self.debug: 925 state = 'swapped' 926 rv = 0 927 else: 928 status = Popen(cmd, stdout=PIPE, stderr=dev_null, 929 close_fds=True) 930 for line in status.stdout: 931 m = state_re.match(line) 932 if m: state = m.group(1) 933 else: 934 for reg, st in ((no_exp_re, "none"), 935 (swapping_re, "swapping")): 936 m = reg.match(line) 937 if m: state = st 938 rv = status.wait() 939 940 # If the experiment is not present the subcommand returns a 941 # non-zero return value. If we successfully parsed a "none" 942 # outcome, ignore the return code. 943 if rv != 0 and state != 'none': 944 raise service_error(service_error.internal, 945 "Cannot get status of segment %s:%s/%s" % \ 946 (tb, pid, eid)) 947 elif state not in ('active', 'swapped', 'swapping', 'none'): 948 raise service_error(service_error.internal, 949 "Cannot get status of segment %s:%s/%s" % \ 950 (tb, pid, eid)) 951 else: return state 952 953 954 def __call__(self, tb, eid, tbparams, tmpdir, timeout=0): 955 """ 956 Start a sub-experiment on a federant. 957 958 Get the current state, modify or create as appropriate, ship data 959 and configs and start the experiment. There are small ordering 960 differences based on the initial state of the sub-experiment. 961 """ 962 # ops node in the federant 963 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 964 user = tbparams[tb]['user'] # federant user 965 pid = tbparams[tb]['project'] # federant project 966 # XXX 967 base_confs = ( "hosts",) 968 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 969 # Configuration directories on the remote machine 970 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 971 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 972 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 973 974 state = self.get_state(user, host, tb, pid, eid) 975 976 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 977 self.log.info("[start_segment]:transferring experiment to %s" % tb) 978 979 if not self.scp_file("%s/%s/%s" % \ 980 (tmpdir, tb, tclfile), user, host): 981 return False 982 983 if state == 'none': 984 # Create a null copy of the experiment so that we capture any 985 # logs there if the modify fails. Emulab software discards the 986 # logs from a failed startexp 987 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 988 return False 989 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 990 timedout = False 991 try: 992 if not self.ssh_cmd(user, host, 993 ("/usr/testbed/bin/startexp -i -f -w -p %s " + 994 "-e %s null.tcl") % (pid, eid), "startexp", 995 timeout=60 * 10): 996 return False 997 except self.ssh_cmd_timeout: 998 timedout = True 999 1000 if timedout: 1001 state = self.get_state(user, host, tb, pid, eid) 1002 if state != "swapped": 1003 return False 1004 1005 1006 # Open up a temporary file to contain a script for setting up the 1007 # filespace for the new experiment. 1008 self.log.info("[start_segment]: creating script file") 1009 try: 1010 sf, scriptname = tempfile.mkstemp() 1011 scriptfile = os.fdopen(sf, 'w') 1012 except IOError: 1013 return False 1014 1015 scriptbase = os.path.basename(scriptname) 1016 1017 # Script the filesystem changes 1018 print >>scriptfile, "/bin/rm -rf %s" % proj_dir 1019 # Clear and create the tarfiles and rpm directories 1020 for d in (tarfiles_dir, rpms_dir): 1021 print >>scriptfile, "/bin/rm -rf %s/*" % d 1022 print >>scriptfile, "mkdir -p %s" % d 1023 print >>scriptfile, 'mkdir -p %s' % proj_dir 1024 self.create_config_tree("%s/%s" % (tmpdir, tb), 1025 proj_dir, scriptfile) 1026 if os.path.isdir("%s/tarfiles" % tmpdir): 1027 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, 1028 scriptfile) 1029 if os.path.isdir("%s/rpms" % tmpdir): 1030 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 1031 scriptfile) 1032 print >>scriptfile, "rm -f %s" % scriptbase 1033 scriptfile.close() 1034 1035 # Move the script to the remote machine 1036 # XXX: could collide tempfile names on the remote host 1037 if self.scp_file(scriptname, user, host, scriptbase): 1038 os.remove(scriptname) 1039 else: 1040 return False 1041 1042 # Execute the script (and the script's last line deletes it) 1043 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 1044 return False 1045 1046 for f in base_confs: 1047 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 1048 "%s/%s" % (proj_dir, f)): 1049 return False 1050 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 1051 proj_dir): 1052 return False 1053 if os.path.isdir("%s/tarfiles" % tmpdir): 1054 if not self.ship_configs(host, user, 1055 "%s/tarfiles" % tmpdir, tarfiles_dir): 1056 return False 1057 if os.path.isdir("%s/rpms" % tmpdir): 1058 if not self.ship_configs(host, user, 1059 "%s/rpms" % tmpdir, tarfiles_dir): 1060 return False 1061 # Stage the new configuration (active experiments will stay swapped 1062 # in now) 1063 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 1064 try: 1065 if not self.ssh_cmd(user, host, 1066 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 1067 (pid, eid, tclfile), 1068 "modexp", timeout= 60 * 10): 1069 return False 1070 except self.ssh_cmd_timeout: 1071 self.log.error("Modify command failed to complete in time") 1072 # There's really no way to see if this succeeded or failed, so 1073 # if it hangs, assume the worst. 1074 return False 1075 # Active experiments are still swapped, this swaps the others in. 1076 if state != 'active': 1077 self.log.info("[start_segment]: Swapping %s in on %s" % \ 1078 (eid, tb)) 1079 timedout = False 1080 try: 1081 if not self.ssh_cmd(user, host, 1082 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 1083 "swapexp", timeout=10*60): 1084 return False 1085 except self.ssh_cmd_timeout: 1086 timedout = True 1087 1088 # If the command was terminated, but completed successfully, 1089 # report success. 1090 if timedout: 1091 self.log.debug("[start_segment]: swapin timed out " +\ 1092 "checking state") 1093 state = self.get_state(user, host, tb, pid, eid) 1094 self.log.debug("[start_segment]: state is %s" % state) 1095 return state == 'active' 1096 # Everything has gone OK. 1097 return True 1098 1099 class stop_segment(emulab_segment): 1100 def __init__(self, log=None, keyfile=None, debug=False): 1101 experiment_control_local.emulab_segment.__init__(self, 1102 log=log, keyfile=keyfile, debug=debug) 1103 1104 def __call__(self, tb, eid, tbparams): 1105 """ 1106 Stop a sub experiment by calling swapexp on the federant 1107 """ 1108 user = tbparams[tb]['user'] 1109 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 1110 pid = tbparams[tb]['project'] 1111 1112 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 1113 rv = False 1114 try: 1115 # Clean out tar files: we've gone over quota in the past 1116 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid)) 1117 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \ 1118 (pid, eid)) 1119 rv = self.ssh_cmd(user, host, 1120 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 1121 except self.ssh_cmd_timeout: 1122 rv = False 1123 return rv 1124 1125 def StartSegment(self, req, fid): 1126 try: 1127 req = req['StartSegmentRequestBody'] 1128 except KeyError: 1129 raise service_error(server_error.req, "Badly formed request") 1130 auth_attr = req['allocID']['fedid'] 1131 if self.auth.check_attribute(fid, auth_attr): 1132 print "OK" 1133 else: 1134 print "Fail" 1135 return { 'allocID': req['allocID'] } -
fedd/federation/deter_impl.py
r4c8a0b7 rcc8d8e9 41 41 raise RuntimeError( 42 42 "Error reading accessDB %s: %s" % (access_db, e)) 43 except ValueError :43 except ValueError, e: 44 44 raise RuntimeError("%s" % e) 45 45 … … 62 62 self.get_handler = self.experiment.get_handler 63 63 else: 64 self.get_handler = self.experiment.get_handler64 self.get_handler = None 65 65 66 66 if config.has_section("splitter"): -
fedd/federation/experiment_control.py
r4c8a0b7 rcc8d8e9 196 196 call_RequestAccess = service_caller('RequestAccess') 197 197 call_ReleaseAccess = service_caller('ReleaseAccess') 198 call_StartSegment = service_caller('StartSegment') 198 199 call_Ns2Split = service_caller('Ns2Split') 199 200 … … 230 231 self.trusted_certs = config.get("experiment_control", "trusted_certs") \ 231 232 or config.get("globals", "trusted_certs") 233 234 # XXX: 235 self.repodir = '/usr/local/etc/fedd/repo' 232 236 233 237 self.exp_stem = "fed-stem" … … 403 407 State format is a simple pickling of the state dictionary. 404 408 """ 409 410 def get_experiment_id(state): 411 """ 412 Pull the fedid experimentID out of the saved state. This is kind 413 of a gross walk through the dict. 414 """ 415 416 if state.has_key('experimentID'): 417 for e in state['experimentID']: 418 if e.has_key('fedid'): 419 return e['fedid'] 420 else: 421 return None 422 else: 423 return None 424 425 def get_alloc_ids(state): 426 """ 427 Pull the fedids of the identifiers of each allocation from the 428 state. Again, a dict dive that's best isolated. 429 """ 430 431 return [ f['allocID']['fedid'] 432 for f in state.get('federant',[]) \ 433 if f.has_key('allocID') and \ 434 f['allocID'].has_key('fedid')] 435 436 405 437 try: 406 438 f = open(self.state_filename, "r") … … 415 447 "Unpickling failed: %s") % e) 416 448 417 for k in self.state.keys():449 for s in self.state.values(): 418 450 try: 419 # This list should only have one element in it, but phrasing it 420 # as a for loop doesn't cost much, really. We have to find the 421 # fedid elements anyway. 422 for eid in [ f['fedid'] \ 423 for f in self.state[k]['experimentID']\424 if f.has_key('fedid') ]: 425 self.auth.set_attribute( self.state[k]['owner'], eid)451 452 eid = get_experiment_id(s) 453 if eid : 454 # Give the owner rights to the experiment 455 self.auth.set_attribute(s['owner'], eid) 456 # And holders of the eid as well 457 self.auth.set_attribute(eid, eid) 426 458 # allow overrides to control experiments as well 427 459 for o in self.overrides: 428 460 self.auth.set_attribute(o, eid) 461 # Set permissions to allow reading of the software repo, if 462 # any, as well. 463 for a in get_alloc_ids(s): 464 self.auth.set_attribute(a, 'repo/%s' % eid) 465 else: 466 raise KeyError("No experiment id") 429 467 except KeyError, e: 430 468 self.log.warning("[read_state]: State ownership or identity " +\ … … 970 1008 nodes = [ n['vname'] for n in topo['node'] ] 971 1009 topo_lans = topo['lan'] 972 except KeyError :973 raise service_error(service_error.internal, "Bad topology")1010 except KeyError, e: 1011 raise service_error(service_error.internal, "Bad topology: %s" %e) 974 1012 975 1013 lans = { } … … 2103 2141 2104 2142 return rv 2143 2144 class new_start_segment: 2145 def __init__(self, debug=False, log=None, cert_file=None, 2146 cert_pwd=None, trusted_certs=None, caller=None): 2147 self.log = log 2148 self.debug = debug 2149 self.cert_file = cert_file 2150 self.cert_pwd = cert_pwd 2151 self.trusted_certs = None 2152 self.caller = caller 2153 2154 def __call__(self, uri, aid, topo): 2155 req = { 2156 'allocID': { 'fedid' : aid }, 2157 'segmentdescription': { 2158 'topdldescription': topo.to_dict(), 2159 }, 2160 } 2161 2162 print "calling %s" % uri 2163 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 2164 self.trusted_certs) 2165 print r 2166 return True 2167 2168 2105 2169 2170 2171 def new_allocate_resources(self, allocated, master, eid, expid, expcert, 2172 tbparams, topo, tmpdir, alloc_log=None): 2173 started = { } # Testbeds where a sub-experiment started 2174 # successfully 2175 2176 # XXX 2177 fail_soft = False 2178 2179 log = alloc_log or self.log 2180 2181 thread_pool = self.thread_pool(self.nthreads) 2182 threads = [ ] 2183 2184 for tb in [ k for k in allocated.keys() if k != master]: 2185 # Create and start a thread to start the segment, and save it to 2186 # get the return value later 2187 thread_pool.wait_for_slot() 2188 uri = self.tbmap.get(tb, None) 2189 if not uri: 2190 raise service_error(service_error.internal, 2191 "Unknown testbed %s !?" % tb) 2192 2193 if tbparams[tb].has_key('allocID') and \ 2194 tbparams[tb]['allocID'].has_key('fedid'): 2195 aid = tbparams[tb]['allocID']['fedid'] 2196 else: 2197 raise service_error(service_error.internal, 2198 "No alloc id for testbed %s !?" % tb) 2199 2200 t = self.pooled_thread(\ 2201 target=self.new_start_segment(log=log, debug=self.debug, 2202 cert_file=self.cert_file, cert_pwd=self.cert_pwd, 2203 trusted_certs=self.trusted_certs, 2204 caller=self.call_StartSegment), 2205 args=(uri, aid, topo[tb]), name=tb, 2206 pdata=thread_pool, trace_file=self.trace_file) 2207 threads.append(t) 2208 t.start() 2209 2210 # Wait until all finish 2211 thread_pool.wait_for_all_done() 2212 2213 # If none failed, start the master 2214 failed = [ t.getName() for t in threads if not t.rv ] 2215 2216 if len(failed) == 0: 2217 uri = self.tbmap.get(master, None) 2218 if not uri: 2219 raise service_error(service_error.internal, 2220 "Unknown testbed %s !?" % master) 2221 2222 if tbparams[master].has_key('allocID') and \ 2223 tbparams[master]['allocID'].has_key('fedid'): 2224 aid = tbparams[master]['allocID']['fedid'] 2225 else: 2226 raise service_error(service_error.internal, 2227 "No alloc id for testbed %s !?" % master) 2228 starter = self.new_start_segment(log=log, debug=self.debug, 2229 cert_file=self.cert_file, cert_pwd=self.cert_pwd, 2230 trusted_certs=self.trusted_certs, 2231 caller=self.call_StartSegment) 2232 if not starter(uri, aid, topo[master]): 2233 failed.append(master) 2234 2235 succeeded = [tb for tb in allocated.keys() if tb not in failed] 2236 # If one failed clean up, unless fail_soft is set 2237 if failed and False: 2238 if not fail_soft: 2239 thread_pool.clear() 2240 for tb in succeeded: 2241 # Create and start a thread to stop the segment 2242 thread_pool.wait_for_slot() 2243 t = self.pooled_thread(\ 2244 target=self.stop_segment(log=log, 2245 keyfile=self.ssh_privkey_file, 2246 debug=self.debug), 2247 args=(tb, eid, tbparams), name=tb, 2248 pdata=thread_pool, trace_file=self.trace_file) 2249 t.start() 2250 # Wait until all finish 2251 thread_pool.wait_for_all_done() 2252 2253 # release the allocations 2254 for tb in tbparams.keys(): 2255 self.release_access(tb, tbparams[tb]['allocID']) 2256 # Remove the placeholder 2257 self.state_lock.acquire() 2258 self.state[eid]['experimentStatus'] = 'failed' 2259 if self.state_filename: self.write_state() 2260 self.state_lock.release() 2261 2262 log.error("Swap in failed on %s" % ",".join(failed)) 2263 return 2264 else: 2265 log.info("[start_segment]: Experiment %s active" % eid) 2266 2267 log.debug("[start_experiment]: removing %s" % tmpdir) 2268 2269 # Walk up tmpdir, deleting as we go 2270 for path, dirs, files in os.walk(tmpdir, topdown=False): 2271 for f in files: 2272 os.remove(os.path.join(path, f)) 2273 for d in dirs: 2274 os.rmdir(os.path.join(path, d)) 2275 os.rmdir(tmpdir) 2276 2277 # Insert the experiment into our state and update the disk copy 2278 self.state_lock.acquire() 2279 self.state[expid]['experimentStatus'] = 'active' 2280 self.state[eid] = self.state[expid] 2281 if self.state_filename: self.write_state() 2282 self.state_lock.release() 2283 return 2284 2106 2285 2107 2286 def new_create_experiment(self, req, fid): … … 2274 2453 2275 2454 allocated = { } # Testbeds we can access 2276 # XXX here's where we're working 2277 def out_topo(filename, t): 2278 try: 2279 f = open("/tmp/%s" % filename, "w") 2280 print >> f, "%s" % \ 2281 topdl.topology_to_xml(t, top="experiment") 2282 f.close() 2283 except IOError, e: 2284 raise service_error(service_error.internal, "Can't open file") 2285 2286 try: 2287 2288 top = topdl.topology_from_xml(file=split_data, top="experiment") 2289 subs = sorted(top.substrates, 2290 cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), 2291 reverse=True) 2292 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2293 for s in subs: 2294 a = ips.allocate(len(s.interfaces)+2) 2295 if a : 2296 base, num = a 2297 if num < len(s.interfaces) +2 : 2298 raise service_error(service_error.internal, 2299 "Allocator returned wrong number of IPs??") 2300 else: 2301 raise service_error(service_error.req, 2302 "Cannot allocate IP addresses") 2303 2304 base += 1 2305 for i in s.interfaces: 2306 i.attribute.append( 2307 topdl.Attribute('ip4_address', 2308 "%s" % ip_addr(base))) 2309 base += 1 2310 2311 testbeds = set([ a.value for e in top.elements \ 2312 for a in e.attribute \ 2313 if a.attribute == 'testbed'] ) 2314 topo ={ } 2315 for tb in testbeds: 2316 self.get_access(tb, None, user, tbparams, master, 2317 export_project, access_user) 2318 topo[tb] = top.clone() 2319 to_delete = [ ] 2320 for e in topo[tb].elements: 2321 etb = e.get_attribute('testbed') 2322 if etb and etb != tb: 2323 for i in e.interface: 2324 for s in i.subs: 2325 try: 2326 s.interfaces.remove(i) 2327 except ValueError: 2328 raise service_error(service_error.internal, 2329 "Can't remove interface??") 2330 to_delete.append(e) 2331 for e in to_delete: 2332 topo[tb].elements.remove(e) 2333 topo[tb].make_indices() 2334 2335 2336 2337 for s in top.substrates: 2338 tests = { } 2339 for i in s.interfaces: 2340 e = i.element 2341 tb = e.get_attribute('testbed') 2342 if tb and not tests.has_key(tb): 2343 for i in e.interface: 2344 if s in i.subs: 2345 tests[tb]= \ 2346 i.get_attribute('ip4_address') 2347 if len(tests) < 2: 2348 continue 2349 2350 # More than one testbed is on this substrate. Insert 2351 # some gateways into the subtopologies. 2352 2353 for st in tests.keys(): 2354 for dt in [ t for t in tests.keys() if t != st]: 2355 myname = "%stunnel" % dt 2356 desthost = "%stunnel" % st 2357 sproject = tbparams[st].get('project', 'project') 2358 dproject = tbparams[dt].get('project', 'project') 2359 sdomain = ".%s.%s%s" % (eid, sproject, 2360 tbparams[st].get('domain', ".example.com")) 2361 ddomain = ".%s.%s%s" % (eid, dproject, 2362 tbparams[dt].get('domain', ".example.com")) 2363 boss = tbparams[master].get('boss', "boss") 2364 fs = tbparams[master].get('fs', "fs") 2365 event_server = "%s%s" % \ 2366 (tbparams[st].get('eventserver', "event_server"), 2367 tbparams[dt].get('domain', "example.com")) 2368 remote_event_server = "%s%s" % \ 2369 (tbparams[dt].get('eventserver', "event_server"), 2370 tbparams[dt].get('domain', "example.com")) 2371 seer_control = "%s%s" % \ 2372 (tbparams[st].get('control', "control"), sdomain) 2373 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2374 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2375 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2376 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2377 # translate to lower case so the `hostname` hack for specifying 2378 # configuration files works. 2379 conf_file = conf_file.lower(); 2380 remote_conf_file = remote_conf_file.lower(); 2381 active = ("%s" % (st == master)) 2382 portal = topdl.Computer(**{ 2383 'name': "%stunnel" % dt, 2384 'attribute' : [{ 2385 'attribute': n, 2386 'value': v, 2387 } for n, v in (\ 2388 ('gateway', 'true'), 2389 ('boss', boss), 2390 ('fs', fs), 2391 ('event_server', event_server), 2392 ('remote_event_server', remote_event_server), 2393 ('seer_control', seer_control), 2394 ('local_key_dir', local_key_dir), 2395 ('remote_conf_dir', remote_conf_dir), 2396 ('conf_file', conf_file), 2397 ('remote_conf_file', remote_conf_file), 2398 ('remote_script_dir', "/usr/local/federation/bin"), 2399 ('local_script_dir', "/usr/local/federation/bin"), 2400 )], 2401 'interface': [{ 2402 'substrate': s.name, 2403 'attribute': [ { 2404 'attribute': 'ip4_addreess', 2405 'value': tests[dt], 2406 }, ], 2407 }, ], 2408 }) 2409 topo[st].elements.append(portal) 2410 # Connect the gateway nodes into the topologies and clear out 2411 # substrates that are not in the topologies 2412 for tb in testbeds: 2413 topo[tb].incorporate_elements() 2414 topo[tb].substrates = \ 2415 [s for s in topo[tb].substrates \ 2416 if len(s.interfaces) >0] 2417 2418 softdir ="%s/software" % tmpdir 2419 softmap = { } 2420 os.mkdir(softdir) 2421 pkgs = set([fedkit, gatewaykit]) 2422 pkgs.update([x.location for e in top.elements \ 2423 for x in e.software]) 2424 for pkg in pkgs: 2425 loc = pkg 2426 2427 scheme, host, path = urlparse(loc)[0:3] 2428 dest = os.path.basename(path) 2429 if not scheme: 2430 if not loc.startswith('/'): 2431 loc = "/%s" % loc 2432 loc = "file://%s" %loc 2433 try: 2434 u = urlopen(loc) 2435 except Exception, e: 2436 raise service_error(service_error.req, 2437 "Cannot open %s: %s" % (loc, e)) 2438 try: 2439 f = open("%s/%s" % (softdir, dest) , "w") 2440 data = u.read(4096) 2441 while data: 2442 f.write(data) 2443 data = u.read(4096) 2444 f.close() 2445 u.close() 2446 except Exception, e: 2447 raise service_error(service_error.internal, 2448 "Could not copy %s: %s" % (loc, e)) 2449 path = re.sub("/tmp", "", softdir) 2450 # XXX 2451 softmap[pkg] = \ 2452 "https://users.isi.deterlab.net:23232/%s/%s" %\ 2453 ( path, dest) 2454 2455 # Convert the software locations in the segments into the local 2456 # copies on this host 2457 for soft in [ s for tb in topo.values() \ 2458 for e in tb.elements \ 2459 for s in e.software ]: 2460 if softmap.has_key(soft.location): 2461 soft.location = softmap[soft.location] 2462 for tb in testbeds: 2463 out_topo("%s.xml" %tb, topo[tb]) 2464 2465 vtopo = topdl.topology_to_vtopo(top) 2466 vis = self.genviz(vtopo) 2467 2468 except Exception, e: 2469 traceback.print_exc() 2470 raise service_error(service_error.internal, "%s" % e) 2471 2472 2473 2474 # Build the testbed topologies: 2475 2476 2477 if True: 2478 raise service_error(service_error.internal, "Developing") 2479 2480 # XXX old code 2481 # Objects to parse the splitter output (defined above) 2482 parse_current_testbed = self.current_testbed(eid, tmpdir, 2483 self.fedkit, self.gatewaykit) 2484 parse_allbeds = self.allbeds(self.get_access) 2485 parse_gateways = self.gateways(eid, master, tmpdir, 2486 gw_pubkey_base, gw_secretkey_base, self.copy_file, 2487 self.fedkit) 2488 parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", 2489 "^#\s+End\s+Vtopo") 2490 parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", 2491 "^#\s+End\s+hostnames", tmpdir + "/hosts") 2492 parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", 2493 "^#\s+End\s+tarfiles") 2494 parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", 2495 "^#\s+End\s+rpms") 2496 2497 # Working on the split data 2498 for line in split_data: 2499 line = line.rstrip() 2500 if parse_current_testbed(line, master, allocated, tbparams): 2501 continue 2502 elif parse_allbeds(line, user, tbparams, master, export_project, 2503 access_user): 2504 continue 2505 elif parse_gateways(line, allocated, tbparams): 2506 continue 2507 elif parse_vtopo(line): 2508 continue 2509 elif parse_hostnames(line): 2510 continue 2511 elif parse_tarfiles(line): 2512 continue 2513 elif parse_rpms(line): 2514 continue 2515 else: 2516 raise service_error(service_error.internal, 2517 "Bad tcl parse? %s" % line) 2518 # Virtual topology and visualization 2519 vtopo = self.gentopo(parse_vtopo.str) 2520 if not vtopo: 2521 raise service_error(service_error.internal, 2522 "Failed to generate virtual topology") 2523 2524 vis = self.genviz(vtopo) 2525 if not vis: 2526 raise service_error(service_error.internal, 2527 "Failed to generate visualization") 2528 2529 2455 # Allocate IP addresses: The allocator is a buddy system memory 2456 # allocator. Allocate from the largest substrate to the 2457 # smallest to make the packing more likely to work - i.e. 2458 # avoiding internal fragmentation. 2459 top = topdl.topology_from_xml(file=split_data, top="experiment") 2460 subs = sorted(top.substrates, 2461 cmp=lambda x,y: cmp(len(x.interfaces), 2462 len(y.interfaces)), 2463 reverse=True) 2464 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2465 for s in subs: 2466 a = ips.allocate(len(s.interfaces)+2) 2467 if a : 2468 base, num = a 2469 if num < len(s.interfaces) +2 : 2470 raise service_error(service_error.internal, 2471 "Allocator returned wrong number of IPs??") 2472 else: 2473 raise service_error(service_error.req, 2474 "Cannot allocate IP addresses") 2475 2476 base += 1 2477 for i in s.interfaces: 2478 i.attribute.append( 2479 topdl.Attribute('ip4_address', 2480 "%s" % ip_addr(base))) 2481 base += 1 2482 2483 # Find the testbeds to look up 2484 testbeds = set([ a.value for e in top.elements \ 2485 for a in e.attribute \ 2486 if a.attribute == 'testbed'] ) 2487 2488 # Make per testbed topologies. Copy the main topo and remove 2489 # interfaces and nodes that don't live in the testbed. 2490 topo ={ } 2491 for tb in testbeds: 2492 self.get_access(tb, None, user, tbparams, master, 2493 export_project, access_user) 2494 allocated[tb] = 1 2495 topo[tb] = top.clone() 2496 to_delete = [ ] 2497 for e in topo[tb].elements: 2498 etb = e.get_attribute('testbed') 2499 if etb and etb != tb: 2500 for i in e.interface: 2501 for s in i.subs: 2502 try: 2503 s.interfaces.remove(i) 2504 except ValueError: 2505 raise service_error(service_error.internal, 2506 "Can't remove interface??") 2507 to_delete.append(e) 2508 for e in to_delete: 2509 topo[tb].elements.remove(e) 2510 topo[tb].make_indices() 2511 2512 2513 2514 # Now, for each substrate in the main topology, find those that 2515 # have nodes on more than one testbed. Insert portal nodes 2516 # into the copies of those substrates on the sub topologies. 2517 for s in top.substrates: 2518 tests = { } 2519 for i in s.interfaces: 2520 e = i.element 2521 tb = e.get_attribute('testbed') 2522 if tb and not tests.has_key(tb): 2523 for i in e.interface: 2524 if s in i.subs: 2525 tests[tb]= \ 2526 i.get_attribute('ip4_address') 2527 if len(tests) < 2: 2528 continue 2529 2530 # More than one testbed is on this substrate. Insert 2531 # some portals into the subtopologies. 2532 2533 for st in tests.keys(): 2534 for dt in [ t for t in tests.keys() if t != st]: 2535 myname = "%stunnel" % dt 2536 desthost = "%stunnel" % st 2537 sproject = tbparams[st].get('project', 'project') 2538 dproject = tbparams[dt].get('project', 'project') 2539 sdomain = ".%s.%s%s" % (eid, sproject, 2540 tbparams[st].get('domain', ".example.com")) 2541 ddomain = ".%s.%s%s" % (eid, dproject, 2542 tbparams[dt].get('domain', ".example.com")) 2543 boss = tbparams[master].get('boss', "boss") 2544 fs = tbparams[master].get('fs', "fs") 2545 event_server = "%s%s" % \ 2546 (tbparams[st].get('eventserver', "event_server"), 2547 tbparams[dt].get('domain', "example.com")) 2548 remote_event_server = "%s%s" % \ 2549 (tbparams[dt].get('eventserver', "event_server"), 2550 tbparams[dt].get('domain', "example.com")) 2551 seer_control = "%s%s" % \ 2552 (tbparams[st].get('control', "control"), sdomain) 2553 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2554 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2555 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2556 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2557 # translate to lower case so the `hostname` hack for specifying 2558 # configuration files works. 2559 conf_file = conf_file.lower(); 2560 remote_conf_file = remote_conf_file.lower(); 2561 active = ("%s" % (st == master)) 2562 portal = topdl.Computer(**{ 2563 'name': "%stunnel" % dt, 2564 'attribute' : [{ 2565 'attribute': n, 2566 'value': v, 2567 } for n, v in (\ 2568 ('gateway', 'true'), 2569 ('boss', boss), 2570 ('fs', fs), 2571 ('event_server', event_server), 2572 ('remote_event_server', remote_event_server), 2573 ('seer_control', seer_control), 2574 ('local_key_dir', local_key_dir), 2575 ('remote_conf_dir', remote_conf_dir), 2576 ('conf_file', conf_file), 2577 ('remote_conf_file', remote_conf_file), 2578 ('remote_script_dir', "/usr/local/federation/bin"), 2579 ('local_script_dir', "/usr/local/federation/bin"), 2580 )], 2581 'interface': [{ 2582 'substrate': s.name, 2583 'attribute': [ { 2584 'attribute': 'ip4_addreess', 2585 'value': tests[dt], 2586 }, ], 2587 }, ], 2588 }) 2589 topo[st].elements.append(portal) 2590 # Connect the gateway nodes into the topologies and clear out 2591 # substrates that are not in the topologies 2592 for tb in testbeds: 2593 topo[tb].incorporate_elements() 2594 topo[tb].substrates = \ 2595 [s for s in topo[tb].substrates \ 2596 if len(s.interfaces) >0] 2597 2598 # Copy the rpms and tarfiles to a distribution directory from 2599 # which the federants can retrieve them 2600 linkpath = "%s/software" % expid 2601 softdir ="%s/%s" % ( self.repodir, linkpath) 2602 softmap = { } 2603 pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \ 2604 for p, t in l ]) 2605 pkgs.update([x.location for e in top.elements \ 2606 for x in e.software]) 2607 try: 2608 os.makedirs(softdir) 2609 except IOError, e: 2610 raise service_error( 2611 "Cannot create software directory: %s" % e) 2612 for pkg in pkgs: 2613 loc = pkg 2614 2615 scheme, host, path = urlparse(loc)[0:3] 2616 dest = os.path.basename(path) 2617 if not scheme: 2618 if not loc.startswith('/'): 2619 loc = "/%s" % loc 2620 loc = "file://%s" %loc 2621 try: 2622 u = urlopen(loc) 2623 except Exception, e: 2624 raise service_error(service_error.req, 2625 "Cannot open %s: %s" % (loc, e)) 2626 try: 2627 f = open("%s/%s" % (softdir, dest) , "w") 2628 data = u.read(4096) 2629 while data: 2630 f.write(data) 2631 data = u.read(4096) 2632 f.close() 2633 u.close() 2634 except Exception, e: 2635 raise service_error(service_error.internal, 2636 "Could not copy %s: %s" % (loc, e)) 2637 path = re.sub("/tmp", "", linkpath) 2638 # XXX 2639 softmap[pkg] = \ 2640 "https://users.isi.deterlab.net:23232/%s/%s" %\ 2641 ( path, dest) 2642 2643 # Allow the individual testbeds to access the software. 2644 for tb in tbparams.keys(): 2645 self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 2646 "%s/%s" % ( path, dest)) 2647 2648 # Convert the software locations in the segments into the local 2649 # copies on this host 2650 for soft in [ s for tb in topo.values() \ 2651 for e in tb.elements \ 2652 for s in e.software ]: 2653 if softmap.has_key(soft.location): 2654 soft.location = softmap[soft.location] 2655 2656 vtopo = topdl.topology_to_vtopo(top) 2657 vis = self.genviz(vtopo) 2658 2530 2659 # save federant information 2531 2660 for k in allocated.keys(): … … 2543 2672 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2544 2673 if tbparams[tb].has_key('federant') ] 2545 if self.state_filename: self.write_state() 2674 if self.state_filename: 2675 self.write_state() 2546 2676 self.state_lock.release() 2547 2548 # Copy tarfiles and rpms needed at remote sites into a staging area2549 try:2550 if self.fedkit:2551 for t in self.fedkit:2552 parse_tarfiles.list.append(t[1])2553 if self.gatewaykit:2554 for t in self.gatewaykit:2555 parse_tarfiles.list.append(t[1])2556 for t in parse_tarfiles.list:2557 if not os.path.exists("%s/tarfiles" % tmpdir):2558 os.mkdir("%s/tarfiles" % tmpdir)2559 self.copy_file(t, "%s/tarfiles/%s" % \2560 (tmpdir, os.path.basename(t)))2561 for r in parse_rpms.list:2562 if not os.path.exists("%s/rpms" % tmpdir):2563 os.mkdir("%s/rpms" % tmpdir)2564 self.copy_file(r, "%s/rpms/%s" % \2565 (tmpdir, os.path.basename(r)))2566 # A null experiment file in case we need to create a remote2567 # experiment from scratch2568 f = open("%s/null.tcl" % tmpdir, "w")2569 print >>f, """2570 set ns [new Simulator]2571 source tb_compat.tcl2572 2573 set a [$ns node]2574 2575 $ns rtproto Session2576 $ns run2577 """2578 f.close()2579 2580 except IOError, e:2581 raise service_error(service_error.internal,2582 "Cannot stage tarfile/rpm: %s" % e.strerror)2583 2584 2677 except service_error, e: 2585 2678 # If something goes wrong in the parse (usually an access error) … … 2587 2680 # exceptions. Failing at this point returns a fault to the remote 2588 2681 # caller. 2682 2589 2683 self.state_lock.acquire() 2590 2684 del self.state[eid] … … 2616 2710 2617 2711 # Start a thread to do the resource allocation 2618 t = Thread(target=self. allocate_resources,2712 t = Thread(target=self.new_allocate_resources, 2619 2713 args=(allocated, master, eid, expid, expcert, tbparams, 2620 t mpdir, alloc_log),2714 topo, tmpdir, alloc_log), 2621 2715 name=eid) 2622 2716 t.start() -
fedd/federation/topdl.py
r4c8a0b7 rcc8d8e9 48 48 class Capacity(base): 49 49 def __init__(self, rate, kind): 50 self.rate = rate50 self.rate = float(rate) 51 51 self.kind = kind 52 52 … … 55 55 56 56 def to_dict(self): 57 return { 'rate': self.rate, 'kind': self.kind }57 return { 'rate': float(self.rate), 'kind': self.kind } 58 58 59 59 class Latency(base): 60 60 def __init__(self, time, kind): 61 self.time = time61 self.time = float(time) 62 62 self.kind = kind 63 63 … … 66 66 67 67 def to_dict(self): 68 return { 'time': self.time, 'kind': self.kind }68 return { 'time': float(self.time), 'kind': self.kind } 69 69 70 70 class Substrate(base): … … 117 117 class Storage(base): 118 118 def __init__(self, amount, persistence, attribute=[]): 119 if isinstance(amount, basestring): 120 self.amount = float(amount) 121 else: 122 self.amount = amount 119 self.amount = float(amount) 123 120 self.presistence = persistence 124 121 self.attribute = [ self.init_class(Attribute, a) \ … … 411 408 f = open(filename, "r") 412 409 xp.ParseFile(f) 410 f.close() 413 411 elif file: 414 412 xp.ParseFile(file) … … 455 453 456 454 for eidx, e in enumerate(t.elements): 457 n = { }458 455 if e.name: name = e.name[0] 459 456 else: name = "unnamed_node%d" % eidx 460 457 461 458 ips = [ ] 462 for i in e.interfaces:459 for idx, i in enumerate(e.interface): 463 460 ip = i.get_attribute('ip4_address') 464 461 ips.append(ip) … … 490 487 }) 491 488 492 nodes.append(n) 493 return { 'vtopo': { 'node': node, 'lan': lan } } 489 return { 'node': nodes, 'lan': lans } 490 491 def topology_to_ns2(t): 492 out = """ 493 set ns [new Simulator] 494 source tb_compat.tcl 495 496 """ 497 for e in t.elements: 498 rpms = "" 499 tarfiles = "" 500 if isinstance(e, Computer): 501 name = e.name[0] 502 out += "set %s [$ns node]\n" % name 503 if e.os and len(e.os) == 1: 504 osid = e.os[0].get_attribute('osid') 505 if osid: 506 out += "tb-set-node-os $%s %s\n" % (name, osid) 507 for s in e.software: 508 if s.install: 509 tarfiles += "%s %s " % (s.install, s.location) 510 else: 511 rpms += "%s " % s.location 512 if rpms: 513 out += "tb-set-node-rpms $%s %s\n" % (name, rpms) 514 if tarfiles: 515 out += "tb-set-node-tarfiles $%s %s\n" % (name, tarfiles) 516 startcmd = e.get_attribute('startup') 517 if startcmd: 518 out+= 'tb-set-node-startcmd $%s "%s"\n' % (name, startcmd) 519 out+= "\n" 520 521 for idx, s in enumerate(t.substrates): 522 loss = s.get_attribute('loss') 523 if s.latency: 524 delay = s.latency.time 525 else: 526 delay = 0 527 name = s.name or "sub%d" % idx 528 529 if len(s.interfaces) > 2: 530 # Lan 531 members = [ i.element.name[0] for i in s.interfaces] 532 out += 'set %s [$ns make-lan "%s" %f %fms ]\n' % \ 533 (name, " ".join(members), s.capacity.rate, delay) 534 if loss: 535 "tb-set-lan-loss $%s %f\n" % (name, float(loss)) 536 537 for i in s.interfaces: 538 e = i.element 539 ip = e.get_attribute("ip4_address") 540 if ip: 541 out += "tb-set-ip-lan $%s $%s %s\n" % (e.name, name, ip) 542 if i.capacity and i.capacity.rate != s.capacity.rate: 543 out += "tb-set-node-lan-bandwidth $%s $%s %f\n" % \ 544 (e.name[0], name, i.capacity.rate) 545 if i.latency and i.latency.time != delay: 546 out += "tb-set-node-lan-delay $%s $%s %fms\n" % \ 547 (e.name[0], name, i.latency.time) 548 iloss = i.get_attribute('loss') 549 if loss and iloss != loss : 550 out += "tb-set-node-lan-loss $%s $%s %f" % \ 551 (e.name[0], name, float(loss)) 552 out+= "\n" 553 elif len(s.interfaces) == 2: 554 f = s.interfaces[0] 555 t = s.interfaces[1] 556 557 out += "set %s [$ns duplex-link $%s $%s %f %fms DropTail]\n" %\ 558 (name, f.element.name[0], t.element.name[0], 559 s.capacity.rate, delay) 560 if loss: 561 out += "tb-set-link-loss $%s %f\n" % (name, float(loss)) 562 563 for i in s.interfaces: 564 lloss = i.get_attribute("loss") 565 cap_override = i.capacity and \ 566 i.capacity.rate != s.capacity.rate 567 delay_override = i.latency and \ 568 i.latency.time != delay 569 loss_override = lloss and lloss != loss 570 if cap_override or delay_override or loss_override: 571 if i.capacity: cap = i.capacity.rate 572 else: cap = s.capacity.rate 573 574 if i.latency: delay = i.latency.time 575 576 if lloss: loss = lloss 577 else: loss = loss or 0.0 578 579 out += "tb-set-link-simplex-params $%s $%s %fms %f %f\n" % \ 580 (name, i.element.name[0], delay, cap, loss) 581 out+= "\n" 582 out+=""" 583 $ns run 584 """ 585 return out -
fedd/federation/util.py
r4c8a0b7 rcc8d8e9 115 115 else: 116 116 raise ValueError("Badly formatted line in accessdb: %s line %d" %\ 117 ( nf, lineno))117 (fn, lineno)) 118 118 f.close() 119 119 return rv
Note: See TracChangeset
for help on using the changeset viewer.