Changeset 6e33086 for fedd/federation/experiment_control.py
- Timestamp:
- Nov 28, 2011 7:34:11 PM (12 years ago)
- Branches:
- compt_changes, info-ops, master
- Children:
- ec3aa4d
- Parents:
- 45e880d
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r45e880d r6e33086 90 90 call_StartSegment = service_caller('StartSegment') 91 91 call_TerminateSegment = service_caller('TerminateSegment') 92 call_InfoSegment = service_caller('InfoSegment') 92 93 call_Ns2Topdl = service_caller('Ns2Topdl') 93 94 … … 158 159 or 'legacy' 159 160 self.auth_dir = config.get('experiment_control', 'auth_dir') 161 # XXX: document this! 162 self.info_cache_limit = \ 163 config.getint('experiment_control', 'info_cache', 600) 160 164 if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")] 161 165 else: self.direct_transit = [ ] … … 930 934 return False 931 935 936 class info_segment(start_segment): 937 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 938 cert_pwd=None, trusted_certs=None, caller=None, 939 log_collector=None): 940 experiment_control_local.start_segment.__init__(self, debug, 941 log, testbed, cert_file, cert_pwd, trusted_certs, 942 caller, log_collector) 943 944 def __call__(self, uri, aid): 945 req = { 'allocID': { 'fedid' : aid } } 946 947 try: 948 self.log.debug("Calling InfoSegment at %s " % uri) 949 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 950 self.trusted_certs) 951 if r.has_key('InfoSegmentResponseBody'): 952 self.make_map(r['InfoSegmentResponseBody']) 953 if 'proof' in r: self.proof = r['proof'] 954 self.response = r 955 else: 956 raise service_error(service_error.internal, 957 "Bad response!?: %s" %r) 958 return True 959 except service_error, e: 960 self.log.error("Info segment failed on %s: %s" % \ 961 (self.testbed, e)) 962 return False 963 964 def annotate_topology(self, top, data): 965 def add_new(ann, attr): 966 for a in ann: 967 if a not in attr: attr.append(a) 968 969 # Annotate the topology with embedding info 970 for e in top.elements: 971 if isinstance(e, topdl.Computer): 972 for s in data: 973 ann = s.node.get(e.name, None) 974 if ann is not None: 975 add_new(ann[0], e.localname) 976 e.status = ann[1] 977 add_new(ann[2], e.service) 978 add_new(ann[3], e.operation) 979 break 980 981 982 932 983 def allocate_resources(self, allocated, masters, eid, expid, 933 984 tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, … … 1046 1097 self.state_lock.acquire() 1047 1098 self.state[eid].status = 'failed' 1099 self.state[eid].updated() 1048 1100 if self.state_filename: self.write_state() 1049 1101 self.state_lock.release() … … 1065 1117 if s.proof: 1066 1118 proofs[s.testbed] = s.proof 1067 1068 # Annotate the topology with embedding info 1069 for e in top.elements: 1070 if isinstance(e, topdl.Computer): 1071 for s in starters: 1072 ann = s.node.get(e.name, None) 1073 if ann is not None: 1074 e.localname.extend(ann[0]) 1075 e.status = ann[1] 1076 e.service.extend(ann[2]) 1077 e.operation.extend(ann[3]) 1078 break 1079 1119 self.annotate_topology(top, starters) 1080 1120 log.info("[start_segment]: Experiment %s active" % eid) 1081 1121 … … 1092 1132 self.state[eid] = self.state[expid] 1093 1133 self.state[eid].top = top 1134 self.state[eid].updated() 1094 1135 # Append startup proofs 1095 1136 for f in self.state[eid].get_all_allocations(): … … 1645 1686 exp = self.state[key] 1646 1687 exp.status = "starting" 1688 exp.updated() 1647 1689 expid = exp.fedid 1648 1690 eid = exp.localname … … 1980 2022 proofs = [copy.deepcopy(p) for k in tbparams.keys()\ 1981 2023 for p in tbparams[k].proof] 2024 exp.updated() 1982 2025 if self.state_filename: 1983 2026 self.write_state() … … 2192 2235 return (None, None) 2193 2236 2237 def update_info(self, key, force=False): 2238 top = None 2239 self.state_lock.acquire() 2240 if key in self.state: 2241 if force or self.state[key].older_than(self.info_cache_limit): 2242 top = self.state[key].top 2243 if top is not None: top = top.clone() 2244 d1, info_params, cert, d2 = \ 2245 self.get_segment_info(self.state[key], need_lock=False) 2246 self.state_lock.release() 2247 2248 if top is None: return 2249 2250 try: 2251 tmpdir = tempfile.mkdtemp(prefix="info-") 2252 except EnvironmentError: 2253 raise service_error(service_error.internal, 2254 "Cannot create tmp dir") 2255 cert_file = self.make_temp_certfile(cert, tmpdir) 2256 2257 data = [] 2258 try: 2259 for k, (uri, aid) in info_params.items(): 2260 info=self.info_segment(log=self.log, testbed=uri, 2261 cert_file=cert_file, cert_pwd=None, 2262 trusted_certs=self.trusted_certs, 2263 caller=self.call_InfoSegment) 2264 info(uri, aid) 2265 data.append(info) 2266 # Clean up the tmpdir no matter what 2267 finally: 2268 if tmpdir: self.remove_dirs(tmpdir) 2269 2270 self.annotate_topology(top, data) 2271 self.state_lock.acquire() 2272 if key in self.state: 2273 self.state[key].top = top 2274 self.state[key].updated() 2275 if self.state_filename: self.write_state() 2276 self.state_lock.release() 2277 2194 2278 2195 2279 def get_info(self, req, fid): … … 2205 2289 exp = req.get('experiment', None) 2206 2290 legacy = req.get('legacy', False) 2291 fresh = req.get('fresh', False) 2207 2292 if exp: 2208 2293 if exp.has_key('fedid'): … … 2219 2304 proof = self.check_experiment_access(fid, key) 2220 2305 2306 self.update_info(key, fresh) 2307 2221 2308 self.state_lock.acquire() 2222 2309 if self.state.has_key(key): 2223 2310 rv = self.state[key].get_info() 2224 # Copy the topo if we need to generate legacy representations2311 # Copy the topo if we need legacy annotations 2225 2312 if legacy: 2226 2313 top = self.state[key].top … … 2229 2316 2230 2317 # If the legacy visualization and topology representations are 2231 # requested, calculate them and add them to t ehreturn.2318 # requested, calculate them and add them to the return. 2232 2319 if legacy and rv is not None: 2233 2320 if top is not None: … … 2300 2387 "Experiment has no status!?") 2301 2388 2302 2303 def get_termination_info(self, fed_exp): 2389 def get_segment_info(self, fed_exp, need_lock=True): 2304 2390 ids = [] 2305 2391 term_params = { } 2306 self.state_lock.acquire()2392 if need_lock: self.state_lock.acquire() 2307 2393 ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ] 2308 2394 expcert = fed_exp.identity … … 2315 2401 aid = fed.allocID 2316 2402 term_params[aid] = (uri, aid) 2403 if need_lock: self.state_lock.release() 2404 return ids, term_params, expcert, repo 2405 2406 2407 def get_termination_info(self, fed_exp): 2408 self.state_lock.acquire() 2409 ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False) 2317 2410 # Change the experiment state 2318 2411 fed_exp.status = 'terminating' 2412 fed_exp.updated() 2319 2413 if self.state_filename: self.write_state() 2320 2414 self.state_lock.release() … … 2414 2508 if fed_exp: 2415 2509 status = self.check_termination_status(fed_exp, force) 2510 # get_termination_info updates the experiment state 2416 2511 ids, term_params, expcert, repo = self.get_termination_info(fed_exp) 2417 2512 self.deallocate_resources(term_params, expcert, status, force,
Note: See TracChangeset
for help on using the changeset viewer.