Changeset 5ae3857 for fedd/federation/experiment_control.py
- Timestamp:
- Sep 6, 2009 2:15:52 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 2b7d768
- Parents:
- 1da6a23
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r1da6a23 r5ae3857 197 197 call_ReleaseAccess = service_caller('ReleaseAccess') 198 198 call_StartSegment = service_caller('StartSegment') 199 call_TerminateSegment = service_caller('TerminateSegment') 199 200 call_Ns2Split = service_caller('Ns2Split') 200 201 … … 350 351 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 351 352 'Terminate': soap_handler('Terminate', 352 self. terminate_experiment),353 self.new_terminate_experiment), 353 354 } 354 355 … … 360 361 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 361 362 'Terminate': xmlrpc_handler('Terminate', 362 self. terminate_experiment),363 self.new_terminate_experiment), 363 364 } 364 365 … … 2166 2167 self.trusted_certs) 2167 2168 print r 2169 return True 2170 2171 2172 2173 class new_terminate_segment: 2174 def __init__(self, debug=False, log=None, cert_file=None, 2175 cert_pwd=None, trusted_certs=None, caller=None): 2176 self.log = log 2177 self.debug = debug 2178 self.cert_file = cert_file 2179 self.cert_pwd = cert_pwd 2180 self.trusted_certs = None 2181 self.caller = caller 2182 2183 def __call__(self, uri, aid ): 2184 print "in terminate_segment: %s" % aid 2185 req = { 2186 'allocID': aid , 2187 } 2188 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 2189 self.trusted_certs) 2168 2190 return True 2169 2191 … … 3236 3258 self.state_lock.release() 3237 3259 raise service_error(service_error.req, "No saved state") 3260 3261 def new_terminate_experiment(self, req, fid): 3262 """ 3263 Swap this experiment out on the federants and delete the shared 3264 information 3265 """ 3266 tbparams = { } 3267 req = req.get('TerminateRequestBody', None) 3268 if not req: 3269 raise service_error(service_error.req, 3270 "Bad request format (no TerminateRequestBody)") 3271 force = req.get('force', False) 3272 exp = req.get('experiment', None) 3273 if exp: 3274 if exp.has_key('fedid'): 3275 key = exp['fedid'] 3276 keytype = "fedid" 3277 elif exp.has_key('localname'): 3278 key = exp['localname'] 3279 keytype = "localname" 3280 else: 3281 raise service_error(service_error.req, "Unknown lookup type") 3282 else: 3283 raise service_error(service_error.req, "No request?") 3284 3285 self.check_experiment_access(fid, key) 3286 3287 dealloc_list = [ ] 3288 3289 3290 # Create a logger that logs to the dealloc_list as well as to the main 3291 # log file. 3292 dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) 3293 h = logging.StreamHandler(self.list_log(dealloc_list)) 3294 # XXX: there should be a global one of these rather than repeating the 3295 # code. 3296 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 3297 '%d %b %y %H:%M:%S')) 3298 dealloc_log.addHandler(h) 3299 3300 self.state_lock.acquire() 3301 fed_exp = self.state.get(key, None) 3302 3303 if fed_exp: 3304 # This branch of the conditional holds the lock to generate a 3305 # consistent temporary tbparams variable to deallocate experiments. 3306 # It releases the lock to do the deallocations and reacquires it to 3307 # remove the experiment state when the termination is complete. 3308 3309 # First make sure that the experiment creation is complete. 3310 status = fed_exp.get('experimentStatus', None) 3311 3312 if status: 3313 if status in ('starting', 'terminating'): 3314 if not force: 3315 self.state_lock.release() 3316 raise service_error(service_error.partial, 3317 'Experiment still being created or destroyed') 3318 else: 3319 self.log.warning('Experiment in %s state ' % status + \ 3320 'being terminated by force.') 3321 else: 3322 # No status??? trouble 3323 self.state_lock.release() 3324 raise service_error(service_error.internal, 3325 "Experiment has no status!?") 3326 3327 ids = [] 3328 # experimentID is a list of dicts that are self-describing 3329 # identifiers. This finds all the fedids and localnames - the 3330 # keys of self.state - and puts them into ids. 3331 for id in fed_exp.get('experimentID', []): 3332 if id.has_key('fedid'): ids.append(id['fedid']) 3333 if id.has_key('localname'): ids.append(id['localname']) 3334 3335 # Collect the allocation/segment ids 3336 for fed in fed_exp.get('federant', []): 3337 try: 3338 print "looking at %s" % fed 3339 tb = fed['emulab']['project']['testbed']['localname'] 3340 aid = fed['allocID'] 3341 except KeyError, e: 3342 print "Key error: %s" %e 3343 continue 3344 tbparams[tb] = aid 3345 fed_exp['experimentStatus'] = 'terminating' 3346 if self.state_filename: self.write_state() 3347 self.state_lock.release() 3348 3349 # Stop everyone. NB, wait_for_all waits until a thread starts and 3350 # then completes, so we can't wait if nothing starts. So, no 3351 # tbparams, no start. 3352 if len(tbparams) > 0: 3353 thread_pool = self.thread_pool(self.nthreads) 3354 for tb in tbparams.keys(): 3355 # Create and start a thread to stop the segment 3356 thread_pool.wait_for_slot() 3357 uri = self.tbmap.get(tb, None) 3358 t = self.pooled_thread(\ 3359 target=self.new_terminate_segment(log=dealloc_log, 3360 cert_file=self.cert_file, 3361 cert_pwd=self.cert_pwd, 3362 trusted_certs=self.trusted_certs, 3363 caller=self.call_TerminateSegment), 3364 args=(uri, tbparams[tb]), name=tb, 3365 pdata=thread_pool, trace_file=self.trace_file) 3366 t.start() 3367 # Wait for completions 3368 thread_pool.wait_for_all_done() 3369 3370 # release the allocations (failed experiments have done this 3371 # already, and starting experiments may be in odd states, so we 3372 # ignore errors releasing those allocations 3373 try: 3374 for tb in tbparams.keys(): 3375 self.release_access(tb, tbparams[tb]) 3376 except service_error, e: 3377 if status != 'failed' and not force: 3378 raise e 3379 3380 # Remove the terminated experiment 3381 self.state_lock.acquire() 3382 for id in ids: 3383 if self.state.has_key(id): del self.state[id] 3384 3385 if self.state_filename: self.write_state() 3386 self.state_lock.release() 3387 3388 return { 3389 'experiment': exp , 3390 'deallocationLog': "".join(dealloc_list), 3391 } 3392 else: 3393 # Don't forget to release the lock 3394 self.state_lock.release() 3395 raise service_error(service_error.req, "No saved state")
Note: See TracChangeset
for help on using the changeset viewer.