Changeset faea607
- Timestamp:
- Dec 3, 2010 3:01:34 PM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master
- Children:
- 1d73342
- Parents:
- 6fd2b29
- Location:
- fedd/federation
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r6fd2b29 rfaea607 32 32 from experiment_partition import experiment_partition 33 33 from authorizer import abac_authorizer 34 from thread_pool import thread_pool, pooled_thread 34 35 35 36 import topdl … … 81 82 class ssh_cmd_timeout(RuntimeError): pass 82 83 83 class thread_pool:84 """85 A class to keep track of a set of threads all invoked for the same86 task. Manages the mutual exclusion of the states.87 """88 def __init__(self, nthreads):89 """90 Start a pool.91 """92 self.changed = Condition()93 self.started = 094 self.terminated = 095 self.nthreads = nthreads96 97 def acquire(self):98 """99 Get the pool's lock.100 """101 self.changed.acquire()102 103 def release(self):104 """105 Release the pool's lock.106 """107 self.changed.release()108 109 def wait(self, timeout = None):110 """111 Wait for a pool thread to start or stop.112 """113 self.changed.wait(timeout)114 115 def start(self):116 """117 Called by a pool thread to report starting.118 """119 self.changed.acquire()120 self.started += 1121 self.changed.notifyAll()122 self.changed.release()123 124 def terminate(self):125 """126 Called by a pool thread to report finishing.127 """128 self.changed.acquire()129 self.terminated += 1130 self.changed.notifyAll()131 self.changed.release()132 133 def clear(self):134 """135 Clear all pool data.136 """137 self.changed.acquire()138 self.started = 0139 self.terminated =0140 self.changed.notifyAll()141 self.changed.release()142 143 def wait_for_slot(self):144 """145 Wait until we have a free slot to start another pooled thread146 """147 self.acquire()148 while self.started - self.terminated >= self.nthreads:149 self.wait()150 self.release()151 152 def wait_for_all_done(self, timeout=None):153 """154 Wait until all active threads finish (and at least one has155 started). If a timeout is given, return after waiting that long156 for termination. If all threads are done (and one has started in157 the since the last clear()) return True, otherwise False.158 """159 if timeout:160 deadline = time.time() + timeout161 self.acquire()162 while self.started == 0 or self.started > self.terminated:163 self.wait(timeout)164 if timeout:165 if time.time() > deadline:166 break167 timeout = deadline - time.time()168 self.release()169 return not (self.started == 0 or self.started > self.terminated)170 171 class pooled_thread(Thread):172 """173 One of a set of threads dedicated to a specific task. Uses the174 thread_pool class above for coordination.175 """176 def __init__(self, group=None, target=None, name=None, args=(),177 kwargs={}, pdata=None, trace_file=None):178 Thread.__init__(self, group, target, name, args, kwargs)179 self.rv = None # Return value of the ops in this thread180 self.exception = None # Exception that terminated this thread181 self.target=target # Target function to run on start()182 self.args = args # Args to pass to target183 self.kwargs = kwargs # Additional kw args184 self.pdata = pdata # thread_pool for this class185 # Logger for this thread186 self.log = logging.getLogger("fedd.experiment_control")187 188 def run(self):189 """190 Emulate Thread.run, except add pool data manipulation and error191 logging.192 """193 if self.pdata:194 self.pdata.start()195 196 if self.target:197 try:198 self.rv = self.target(*self.args, **self.kwargs)199 except service_error, s:200 self.exception = s201 self.log.error("Thread exception: %s %s" % \202 (s.code_string(), s.desc))203 except:204 self.exception = sys.exc_info()[1]205 self.log.error(("Unexpected thread exception: %s" +\206 "Trace %s") % (self.exception,\207 traceback.format_exc()))208 if self.pdata:209 self.pdata.terminate()210 211 84 call_RequestAccess = service_caller('RequestAccess') 212 85 call_ReleaseAccess = service_caller('ReleaseAccess') … … 234 107 return rv 235 108 236 self.thread_with_rv = experiment_control_local.pooled_thread237 self.thread_pool = experiment_control_local.thread_pool238 109 self.list_log = list_log.list_log 239 110 … … 1141 1012 log = alloc_log or self.log 1142 1013 1143 t hread_pool = self.thread_pool(self.nthreads)1014 tp = thread_pool(self.nthreads) 1144 1015 threads = [ ] 1145 1016 starters = [ ] … … 1156 1027 # to get the return value later 1157 1028 tb_attrs = copy.copy(attrs) 1158 t hread_pool.wait_for_slot()1029 tp.wait_for_slot() 1159 1030 uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None)) 1160 1031 base, suffix = split_testbed(tb) … … 1181 1052 log_collector=log_collector) 1182 1053 starters.append(s) 1183 t = self.pooled_thread(\1054 t = pooled_thread(\ 1184 1055 target=s, name=tb, 1185 1056 args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]), 1186 pdata=t hread_pool, trace_file=self.trace_file)1057 pdata=tp, trace_file=self.trace_file) 1187 1058 threads.append(t) 1188 1059 t.start() … … 1191 1062 mins = 0 1192 1063 revoked = False 1193 while not t hread_pool.wait_for_all_done(60.0):1064 while not tp.wait_for_all_done(60.0): 1194 1065 mins += 1 1195 1066 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ … … 1214 1085 if failed: 1215 1086 if not fail_soft: 1216 t hread_pool.clear()1087 tp.clear() 1217 1088 for tb in succeeded: 1218 1089 # Create and start a thread to stop the segment 1219 t hread_pool.wait_for_slot()1090 tp.wait_for_slot() 1220 1091 uri = tbparams[tb]['uri'] 1221 t = self.pooled_thread(\1092 t = pooled_thread(\ 1222 1093 target=self.terminate_segment(log=log, 1223 1094 testbed=tb, … … 1228 1099 args=(uri, tbparams[tb]['federant']['allocID']), 1229 1100 name=tb, 1230 pdata=t hread_pool, trace_file=self.trace_file)1101 pdata=tp, trace_file=self.trace_file) 1231 1102 t.start() 1232 1103 # Wait until all finish (if any are being stopped) 1233 1104 if succeeded: 1234 t hread_pool.wait_for_all_done()1105 tp.wait_for_all_done() 1235 1106 1236 1107 # release the allocations … … 2508 2379 # no tbparams, no start. 2509 2380 if len(tbparams) > 0: 2510 t hread_pool = self.thread_pool(self.nthreads)2381 tp = thread_pool(self.nthreads) 2511 2382 for k in tbparams.keys(): 2512 2383 # Create and start a thread to stop the segment 2513 t hread_pool.wait_for_slot()2384 tp.wait_for_slot() 2514 2385 uri, aid = tbparams[k] 2515 t = self.pooled_thread(\2386 t = pooled_thread(\ 2516 2387 target=self.terminate_segment(log=dealloc_log, 2517 2388 testbed=uri, … … 2521 2392 caller=self.call_TerminateSegment), 2522 2393 args=(uri, aid), name=k, 2523 pdata=t hread_pool, trace_file=self.trace_file)2394 pdata=tp, trace_file=self.trace_file) 2524 2395 t.start() 2525 2396 # Wait for completions 2526 t hread_pool.wait_for_all_done()2397 tp.wait_for_all_done() 2527 2398 2528 2399 # release the allocations (failed experiments have done this
Note: See TracChangeset
for help on using the changeset viewer.