Changeset 46e4682 for fedd/federation
- Timestamp:
- Jul 28, 2009 10:57:38 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- d15522f
- Parents:
- 3c20a31
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r3c20a31 r46e4682 2044 2044 # Create a logger that logs to the experiment's state object as well as 2045 2045 # to the main log file. 2046 2047 2046 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2048 2047 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) … … 2053 2052 alloc_log.addHandler(h) 2054 2053 2055 2056 2057 2058 2059 2054 # Start a thread to do the resource allocation 2060 2055 t = Thread(target=self.allocate_resources, … … 2233 2228 for f in rv.get('federant', []): 2234 2229 if f.has_key('allocID'): del f['allocID'] 2235 2236 2230 return rv 2237 2231 … … 2318 2312 self.check_experiment_access(fid, key) 2319 2313 2314 dealloc_list = [ ] 2315 2316 2317 # Create a logger that logs to the dealloc_list as well as to the main 2318 # log file. 2319 dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) 2320 h = logging.StreamHandler(self.list_log(dealloc_list)) 2321 # XXX: there should be a global one of these rather than repeating the 2322 # code. 2323 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2324 '%d %b %y %H:%M:%S')) 2325 dealloc_log.addHandler(h) 2326 2320 2327 self.state_lock.acquire() 2321 2328 fed_exp = self.state.get(key, None) … … 2329 2336 # First make sure that the experiment creation is complete. 2330 2337 status = fed_exp.get('experimentStatus', None) 2338 2331 2339 if status: 2332 if status == 'starting':2340 if status in ('starting', 'terminating'): 2333 2341 if not force: 2334 2342 self.state_lock.release() 2335 2343 raise service_error(service_error.partial, 2336 'Experiment still being created ')2344 'Experiment still being created or destroyed') 2337 2345 else: 2338 self.log.warning('Experiment in starting state '+ \2339 'being terminated by admin.')2346 self.log.warning('Experiment in %s state ' % status + \ 2347 'being terminated by force.') 2340 2348 else: 2341 2349 # No status??? trouble … … 2381 2389 'aid': aid,\ 2382 2390 } 2391 print "%s %s" % (tb, tbparams[tb]) 2392 fed_exp['experimentStatus'] = 'terminating' 2393 if self.state_filename: self.write_state() 2383 2394 self.state_lock.release() 2384 2395 2385 # Stop everyone. 2386 thread_pool = self.thread_pool(self.nthreads) 2387 for tb in tbparams.keys(): 2388 # Create and start a thread to stop the segment 2389 thread_pool.wait_for_slot() 2390 t = self.pooled_thread(\ 2391 target=self.stop_segment(log=self.log, 2392 keyfile=self.ssh_privkey_file, debug=self.debug), 2393 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2394 pdata=thread_pool, trace_file=self.trace_file) 2395 t.start() 2396 # Wait for completions 2397 thread_pool.wait_for_all_done() 2396 # Stop everyone. NB, wait_for_all waits until a thread starts and 2397 # then completes, so we can't wait if nothing starts. So, no 2398 # tbparams, no start. 2399 if len(tbparams) > 0: 2400 thread_pool = self.thread_pool(self.nthreads) 2401 for tb in tbparams.keys(): 2402 # Create and start a thread to stop the segment 2403 thread_pool.wait_for_slot() 2404 t = self.pooled_thread(\ 2405 target=self.stop_segment(log=dealloc_log, 2406 keyfile=self.ssh_privkey_file, debug=self.debug), 2407 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2408 pdata=thread_pool, trace_file=self.trace_file) 2409 t.start() 2410 # Wait for completions 2411 thread_pool.wait_for_all_done() 2398 2412 2399 2413 # release the allocations (failed experiments have done this … … 2415 2429 self.state_lock.release() 2416 2430 2417 return { 'experiment': exp } 2431 return { 2432 'experiment': exp , 2433 'deallocationLog': "".join(dealloc_list), 2434 } 2418 2435 else: 2419 2436 # Don't forget to release the lock
Note: See TracChangeset
for help on using the changeset viewer.