Changeset 6e33086 for fedd/federation
- Timestamp:
- Nov 28, 2011 7:34:11 PM (13 years ago)
- Branches:
- compt_changes, info-ops, master
- Children:
- ec3aa4d
- Parents:
- 45e880d
- Location:
- fedd/federation
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/emulab_access.py
r45e880d r6e33086 101 101 self.start_segment = proxy_emulab_segment.start_segment 102 102 self.stop_segment = proxy_emulab_segment.stop_segment 103 self.info_segment = proxy_emulab_segment.info_segment 103 104 elif self.access_type == 'local_emulab': 104 105 self.start_segment = local_emulab_segment.start_segment 105 106 self.stop_segment = local_emulab_segment.stop_segment 107 self.info_segment = local_emulab_segment.info_segment 106 108 else: 107 109 self.start_segment = None 108 110 self.stop_segment = None 111 self.info_segment = None 109 112 110 113 self.restricted = [ ] … … 181 184 'TerminateSegment': soap_handler("TerminateSegment", 182 185 self.TerminateSegment), 186 'InfoSegment': soap_handler("InfoSegment", self.InfoSegment), 183 187 } 184 188 self.xmlrpc_services = {\ … … 190 194 'TerminateSegment': xmlrpc_handler('TerminateSegment', 191 195 self.TerminateSegment), 196 'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment), 192 197 } 193 198 … … 1011 1016 return (ename, proj, user, pubkey_base, secretkey_base, alloc_log) 1012 1017 1013 def finalize_experiment(self, starter, topo, aid, alloc_id, proof): 1014 """ 1015 Store key bits of experiment state in the global repository, including 1016 the response that may need to be replayed, and return the response. 1017 """ 1018 i = 0 1019 t = topo.clone() 1018 def decorate_topology(self, info, t): 1019 """ 1020 Copy the physical mapping and status onto the topology. Used by 1021 StartSegment and InfoSegment 1022 """ 1023 def add_new(ann, attr): 1024 for a in ann: 1025 if a not in attr: attr.append(a) 1026 1020 1027 # Copy the assigned names into the return topology 1021 1028 for e in t.elements: 1022 1029 if isinstance(e, topdl.Computer): 1023 1030 if not self.create_debug: 1024 if e.name in starter.node:1025 e.localname.append("%s%s" % \1026 (starter.node[e.name][0], self.domain))1027 e.status = starter.node[e.name][1]1031 if e.name in info.node: 1032 add_new(("%s%s" % (info.node[e.name][0], self.domain),), 1033 e.localname) 1034 e.status = info.node[e.name][1] 1028 1035 else: 1029 1036 # Simple debugging assignment 1030 e.localname.append("node%d%s" % (i, self.domain))1037 add_new(("node%d%s" % (i, self.domain),), e.localname) 1031 1038 e.status = 'active' 1032 e.operation.extend(('testop1', 'testop2'))1039 add_new(('testop1', 'testop2'), e.operation) 1033 1040 i += 1 1034 1041 1042 1043 def finalize_experiment(self, starter, topo, aid, alloc_id, proof): 1044 """ 1045 Store key bits of experiment state in the global repository, including 1046 the response that may need to be replayed, and return the response. 1047 """ 1048 i = 0 1049 t = topo.clone() 1050 self.decorate_topology(starter, t) 1035 1051 # Grab the log (this is some anal locking, but better safe than 1036 1052 # sorry) … … 1246 1262 info = self.info_segment(keyfile=self.ssh_privkey_file, 1247 1263 debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert) 1248 info(self, user, proj, ename, gid, nonce) 1264 info(self, user, proj, ename) 1265 self.decorate_topology(info, topo) 1249 1266 return { 1250 1267 'allocID': req['allocID'], 1251 'experimentdescription' { 'topdldescription' : topo.to_dict() }, 1268 'segmentdescription': 1269 { 'topdldescription' : topo.to_dict() }, 1252 1270 'proof': proof.to_dict(), 1253 1271 } 1254 -
fedd/federation/emulab_segment.py
r45e880d r6e33086 31 31 self.log = getattr(self, 'log', None) 32 32 self.debug = getattr(self, 'debug', False) 33 self.node = { } 33 34 34 35 def emulab_call(self, method, params): … … 210 211 """ 211 212 213 ev_ok = ('ISUP', 'ALWAYSUP' ) 214 212 215 if self.debug: 213 216 if self.log: … … 226 229 if code ==0: 227 230 for k, v in nodes.items(): 228 print v229 231 if v.get('erole', False) and 'pnode' in v: 230 232 st = v.get('status', 'up') 231 if st == 'up': st = 'active' 233 ev = v.get('eventstatus', 'ISUP') 234 235 if st == 'up' and ev in ev_ok: st = 'active' 232 236 else: st = 'failed' 233 237 self.node[k] = (v['pnode'], st) -
fedd/federation/experiment_control.py
r45e880d r6e33086 90 90 call_StartSegment = service_caller('StartSegment') 91 91 call_TerminateSegment = service_caller('TerminateSegment') 92 call_InfoSegment = service_caller('InfoSegment') 92 93 call_Ns2Topdl = service_caller('Ns2Topdl') 93 94 … … 158 159 or 'legacy' 159 160 self.auth_dir = config.get('experiment_control', 'auth_dir') 161 # XXX: document this! 162 self.info_cache_limit = \ 163 config.getint('experiment_control', 'info_cache', 600) 160 164 if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")] 161 165 else: self.direct_transit = [ ] … … 930 934 return False 931 935 936 class info_segment(start_segment): 937 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 938 cert_pwd=None, trusted_certs=None, caller=None, 939 log_collector=None): 940 experiment_control_local.start_segment.__init__(self, debug, 941 log, testbed, cert_file, cert_pwd, trusted_certs, 942 caller, log_collector) 943 944 def __call__(self, uri, aid): 945 req = { 'allocID': { 'fedid' : aid } } 946 947 try: 948 self.log.debug("Calling InfoSegment at %s " % uri) 949 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 950 self.trusted_certs) 951 if r.has_key('InfoSegmentResponseBody'): 952 self.make_map(r['InfoSegmentResponseBody']) 953 if 'proof' in r: self.proof = r['proof'] 954 self.response = r 955 else: 956 raise service_error(service_error.internal, 957 "Bad response!?: %s" %r) 958 return True 959 except service_error, e: 960 self.log.error("Info segment failed on %s: %s" % \ 961 (self.testbed, e)) 962 return False 963 964 def annotate_topology(self, top, data): 965 def add_new(ann, attr): 966 for a in ann: 967 if a not in attr: attr.append(a) 968 969 # Annotate the topology with embedding info 970 for e in top.elements: 971 if isinstance(e, topdl.Computer): 972 for s in data: 973 ann = s.node.get(e.name, None) 974 if ann is not None: 975 add_new(ann[0], e.localname) 976 e.status = ann[1] 977 add_new(ann[2], e.service) 978 add_new(ann[3], e.operation) 979 break 980 981 982 932 983 def allocate_resources(self, allocated, masters, eid, expid, 933 984 tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, … … 1046 1097 self.state_lock.acquire() 1047 1098 self.state[eid].status = 'failed' 1099 self.state[eid].updated() 1048 1100 if self.state_filename: self.write_state() 1049 1101 self.state_lock.release() … … 1065 1117 if s.proof: 1066 1118 proofs[s.testbed] = s.proof 1067 1068 # Annotate the topology with embedding info 1069 for e in top.elements: 1070 if isinstance(e, topdl.Computer): 1071 for s in starters: 1072 ann = s.node.get(e.name, None) 1073 if ann is not None: 1074 e.localname.extend(ann[0]) 1075 e.status = ann[1] 1076 e.service.extend(ann[2]) 1077 e.operation.extend(ann[3]) 1078 break 1079 1119 self.annotate_topology(top, starters) 1080 1120 log.info("[start_segment]: Experiment %s active" % eid) 1081 1121 … … 1092 1132 self.state[eid] = self.state[expid] 1093 1133 self.state[eid].top = top 1134 self.state[eid].updated() 1094 1135 # Append startup proofs 1095 1136 for f in self.state[eid].get_all_allocations(): … … 1645 1686 exp = self.state[key] 1646 1687 exp.status = "starting" 1688 exp.updated() 1647 1689 expid = exp.fedid 1648 1690 eid = exp.localname … … 1980 2022 proofs = [copy.deepcopy(p) for k in tbparams.keys()\ 1981 2023 for p in tbparams[k].proof] 2024 exp.updated() 1982 2025 if self.state_filename: 1983 2026 self.write_state() … … 2192 2235 return (None, None) 2193 2236 2237 def update_info(self, key, force=False): 2238 top = None 2239 self.state_lock.acquire() 2240 if key in self.state: 2241 if force or self.state[key].older_than(self.info_cache_limit): 2242 top = self.state[key].top 2243 if top is not None: top = top.clone() 2244 d1, info_params, cert, d2 = \ 2245 self.get_segment_info(self.state[key], need_lock=False) 2246 self.state_lock.release() 2247 2248 if top is None: return 2249 2250 try: 2251 tmpdir = tempfile.mkdtemp(prefix="info-") 2252 except EnvironmentError: 2253 raise service_error(service_error.internal, 2254 "Cannot create tmp dir") 2255 cert_file = self.make_temp_certfile(cert, tmpdir) 2256 2257 data = [] 2258 try: 2259 for k, (uri, aid) in info_params.items(): 2260 info=self.info_segment(log=self.log, testbed=uri, 2261 cert_file=cert_file, cert_pwd=None, 2262 trusted_certs=self.trusted_certs, 2263 caller=self.call_InfoSegment) 2264 info(uri, aid) 2265 data.append(info) 2266 # Clean up the tmpdir no matter what 2267 finally: 2268 if tmpdir: self.remove_dirs(tmpdir) 2269 2270 self.annotate_topology(top, data) 2271 self.state_lock.acquire() 2272 if key in self.state: 2273 self.state[key].top = top 2274 self.state[key].updated() 2275 if self.state_filename: self.write_state() 2276 self.state_lock.release() 2277 2194 2278 2195 2279 def get_info(self, req, fid): … … 2205 2289 exp = req.get('experiment', None) 2206 2290 legacy = req.get('legacy', False) 2291 fresh = req.get('fresh', False) 2207 2292 if exp: 2208 2293 if exp.has_key('fedid'): … … 2219 2304 proof = self.check_experiment_access(fid, key) 2220 2305 2306 self.update_info(key, fresh) 2307 2221 2308 self.state_lock.acquire() 2222 2309 if self.state.has_key(key): 2223 2310 rv = self.state[key].get_info() 2224 # Copy the topo if we need to generate legacy representations2311 # Copy the topo if we need legacy annotations 2225 2312 if legacy: 2226 2313 top = self.state[key].top … … 2229 2316 2230 2317 # If the legacy visualization and topology representations are 2231 # requested, calculate them and add them to t ehreturn.2318 # requested, calculate them and add them to the return. 2232 2319 if legacy and rv is not None: 2233 2320 if top is not None: … … 2300 2387 "Experiment has no status!?") 2301 2388 2302 2303 def get_termination_info(self, fed_exp): 2389 def get_segment_info(self, fed_exp, need_lock=True): 2304 2390 ids = [] 2305 2391 term_params = { } 2306 self.state_lock.acquire()2392 if need_lock: self.state_lock.acquire() 2307 2393 ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ] 2308 2394 expcert = fed_exp.identity … … 2315 2401 aid = fed.allocID 2316 2402 term_params[aid] = (uri, aid) 2403 if need_lock: self.state_lock.release() 2404 return ids, term_params, expcert, repo 2405 2406 2407 def get_termination_info(self, fed_exp): 2408 self.state_lock.acquire() 2409 ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False) 2317 2410 # Change the experiment state 2318 2411 fed_exp.status = 'terminating' 2412 fed_exp.updated() 2319 2413 if self.state_filename: self.write_state() 2320 2414 self.state_lock.release() … … 2414 2508 if fed_exp: 2415 2509 status = self.check_termination_status(fed_exp, force) 2510 # get_termination_info updates the experiment state 2416 2511 ids, term_params, expcert, repo = self.get_termination_info(fed_exp) 2417 2512 self.deallocate_resources(term_params, expcert, status, force, -
fedd/federation/experiment_info.py
r45e880d r6e33086 2 2 3 3 import copy 4 from datetime import datetime, timedelta 5 from numbers import Number 4 6 5 7 class allocation_info: … … 38 40 self.log = [] 39 41 self.alloc = { } 42 self.last_update = datetime.now() 40 43 41 44 def add_allocation(self, a): … … 47 50 def get_all_allocations(self): 48 51 return self.alloc.values() 52 53 def updated(self): 54 self.last_update = datetime.now() 55 56 def older_than(self, secs=None, dt=None): 57 """ 58 If the last update of this info was more than secs seconds ago, or 59 before dt (a datetime), return True. If both secs and dt or neither is 60 given return False. If the last update time is completelt unknown 61 (which should never happen) return True. 62 """ 63 if self.last_update is None: 64 return True 65 elif dt is None and isinstance(secs, Number): 66 return self.last_update + timedelta(seconds=secs) < datetime.now() 67 elif secs is None and isinstance(dt, datetime): 68 return self.last_update < dt 69 else: 70 return False 71 49 72 50 73 def get_info(self): -
fedd/federation/local_emulab_segment.py
r45e880d r6e33086 21 21 local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) 22 22 emulab_segment.__init__(self, boss=boss, cert=cert) 23 self.node = { }24 23 25 24 def set_up_experiment_filespace(self, pid, eid, tmpdir): … … 123 122 return rv 124 123 124 class info_segment(local_segment, emulab_segment): 125 def __init__(self, log=None, keyfile=None, debug=False, boss=None, 126 cert=None): 127 local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) 128 emulab_segment.__init__(self, boss=boss, cert=cert) 129 130 def __call__(self, parent, user, pid, eid): 131 self.log.info("[info_segment]: Getting info from %s" % eid) 132 self.get_mapping(pid,eid) 133 return True
Note: See TracChangeset
for help on using the changeset viewer.