Changeset 22a1a77 for fedd/federation
- Timestamp:
- Nov 29, 2011 6:19:24 PM (13 years ago)
- Branches:
- compt_changes, info-ops, master
- Children:
- b709861
- Parents:
- 57facae
- Location:
- fedd/federation
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r57facae r22a1a77 37 37 from thread_pool import thread_pool, pooled_thread 38 38 from experiment_info import experiment_info, allocation_info, federated_service 39 from operation_status import operation_status 39 40 40 41 import topdl … … 65 66 call_TerminateSegment = service_caller('TerminateSegment') 66 67 call_InfoSegment = service_caller('InfoSegment') 68 call_OperationSegment = service_caller('OperationSegment') 67 69 call_Ns2Topdl = service_caller('Ns2Topdl') 68 70 … … 223 225 'Info': soap_handler('Info', self.get_info), 224 226 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 227 'Operation': soap_handler('Operation', self.do_operation), 225 228 'Terminate': soap_handler('Terminate', 226 229 self.terminate_experiment), … … 238 241 'Terminate': xmlrpc_handler('Terminate', 239 242 self.terminate_experiment), 243 'Operation': xmlrpc_handler('Operation', self.do_operation), 240 244 'GetValue': xmlrpc_handler('GetValue', self.GetValue), 241 245 'SetValue': xmlrpc_handler('SetValue', self.SetValue), … … 933 937 except service_error, e: 934 938 self.log.error("Info segment failed on %s: %s" % \ 939 (self.testbed, e)) 940 return False 941 942 class operation_segment: 943 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 944 cert_pwd=None, trusted_certs=None, caller=None, 945 log_collector=None): 946 self.log = log 947 self.debug = debug 948 self.cert_file = cert_file 949 self.cert_pwd = cert_pwd 950 self.trusted_certs = None 951 self.caller = caller 952 self.testbed = testbed 953 self.status = None 954 955 def __call__(self, uri, aid, op, targets, params): 956 req = { 957 'allocID': { 'fedid' : aid }, 958 'operation': op, 959 'target': targets, 960 } 961 if params: req['parameter'] = params 962 963 964 try: 965 self.log.debug("Calling OperationSegment at %s " % uri) 966 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 967 self.trusted_certs) 968 if 'OperationSegmentResponseBody' in r: 969 r = r['OperationSegmentResponseBody'] 970 if 'status' in r: 971 self.status = r['status'] 972 else: 973 raise service_error(service_error.internal, 974 "Bad response!?: %s" %r) 975 return True 976 except service_error, e: 977 self.log.error("Operation segment failed on %s: %s" % \ 935 978 (self.testbed, e)) 936 979 return False … … 2314 2357 raise service_error(service_error.req, "No such experiment") 2315 2358 2359 def operate_on_segments(self, op_params, cert, op, testbeds, params, 2360 results): 2361 """ 2362 Call OperateSegment on multiple testbeds and gather the results. 2363 op_params contains the parameters needed to contact that testbed, cert 2364 is a certificate containing the fedid to use, op is the operation, 2365 testbeds is a dict mapping testbed name to targets in that testbed, 2366 params are the parameters to include a,d results is a growing list of 2367 the results of the calls. 2368 """ 2369 try: 2370 tmpdir = tempfile.mkdtemp(prefix="info-") 2371 except EnvironmentError: 2372 raise service_error(service_error.internal, 2373 "Cannot create tmp dir") 2374 cert_file = self.make_temp_certfile(cert, tmpdir) 2375 2376 try: 2377 for tb, targets in testbeds.items(): 2378 if tb in op_params: 2379 uri, aid = op_params[tb] 2380 operate=self.operation_segment(log=self.log, testbed=uri, 2381 cert_file=cert_file, cert_pwd=None, 2382 trusted_certs=self.trusted_certs, 2383 caller=self.call_OperationSegment) 2384 if operate(uri, aid, op, targets, params): 2385 if operate.status is not None: 2386 results.extend(operate.status) 2387 continue 2388 # Something went wrong in a weird way. Add statuses 2389 # that reflect that to results 2390 for t in targets: 2391 results.append(operation_status(t, 2392 operation_status.federant, 2393 'Unexpected error ion %s' % tb)) 2394 # Clean up the tmpdir no matter what 2395 finally: 2396 if tmpdir: self.remove_dirs(tmpdir) 2397 2398 def do_operation(self, req, fid): 2399 """ 2400 Find the testbeds holding each target and ask them to carry out the 2401 operation. Return the statuses. 2402 """ 2403 # Map an element to the testbed containing it 2404 def element_to_tb(e): 2405 if isinstance(e, topdl.Computer): return e.get_attribute("testbed") 2406 elif isinstance(e, topdl.Testbed): return e.name 2407 else: return None 2408 # If d is an operation_status object, make it a dict 2409 def make_dict(d): 2410 if isinstance(d, dict): return d 2411 elif isinstance(d, operation_status): return d.to_dict() 2412 else: return { } 2413 2414 req = req.get('OperationRequestBody', None) 2415 if not req: 2416 raise service_error(service_error.req, 2417 "Bad request format (no OperationRequestBody)") 2418 exp = req.get('experiment', None) 2419 op = req.get('operation', None) 2420 target = set(req.get('target', [])) 2421 params = req.get('parameter', None) 2422 2423 if exp: 2424 if 'fedid' in exp: 2425 key = exp['fedid'] 2426 keytype = "fedid" 2427 elif 'localname' in exp: 2428 key = exp['localname'] 2429 keytype = "localname" 2430 else: 2431 raise service_error(service_error.req, "Unknown lookup type") 2432 else: 2433 raise service_error(service_error.req, "No request?") 2434 2435 if op is None or not target 2436 raise service_error(service_error.req, "No request?") 2437 2438 proof = self.check_experiment_access(fid, key) 2439 self.state_lock.acquire() 2440 if key in self.state: 2441 d1, op_params, cert, d2 = \ 2442 self.get_segment_info(self.state[key], need_lock=False) 2443 top = self.state[key].top 2444 if top is not None: 2445 top = top.clone() 2446 self.state_lock.release() 2447 2448 if top is None: 2449 raise service_error(service_error.partial, "No topology yet", 2450 proof=proof) 2451 2452 testbeds = { } 2453 results = [] 2454 for e in top.elements: 2455 if e.name in targets: 2456 tb = element_to_tb(e) 2457 if tb is not None: 2458 if tb in testbeds: testbeds[tb].append(e.name) 2459 else: testbeds[tb] = [ e.name ] 2460 else: 2461 results.append(operation_status(e.name, 2462 code=operation_status.no_target, 2463 description='Cannot map target to testbed')) 2464 2465 self.operate_on_segments(op_params, cert, op, testbeds, params, 2466 results) 2467 2468 return { 2469 'experiment': exp, 2470 'status': [make_dict(r) for r in results] 2471 'proof': proof.to_dict() 2472 } 2473 2474 2316 2475 def get_multi_info(self, req, fid): 2317 2476 """
Note: See TracChangeset
for help on using the changeset viewer.