- Timestamp:
- Sep 9, 2009 1:40:11 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- e2a7a413
- Parents:
- 3c6dbec
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r3c6dbec r32e7d93 120 120 self.release() 121 121 122 def wait_for_all_done(self): 123 """ 124 Wait until all active threads finish (and at least one has started) 125 """ 122 def wait_for_all_done(self, timeout=None): 123 """ 124 Wait until all active threads finish (and at least one has 125 started). If a timeout is given, return after waiting that long 126 for termination. If all threads are done (and one has started in 127 the since the last clear()) return True, otherwise False. 128 """ 129 if timeout: 130 deadline = time.time() + timeout 126 131 self.acquire() 127 132 while self.started == 0 or self.started > self.terminated: 128 self.wait() 133 self.wait(timeout) 134 if timeout: 135 if time.time() > deadline: 136 break 137 timeout = deadline - time.time() 129 138 self.release() 139 return not (self.started == 0 or self.started > self.terminated) 130 140 131 141 class pooled_thread(Thread): … … 994 1004 t.start() 995 1005 996 # Wait until all finish 997 thread_pool.wait_for_all_done() 1006 # Wait until all finish (keep pinging the log, though) 1007 mins = 0 1008 while not thread_pool.wait_for_all_done(60.0): 1009 mins += 1 1010 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ 1011 % mins) 1012 1013 thread_pool.clear() 998 1014 999 1015 # If none failed, start the master … … 1012 1028 raise service_error(service_error.internal, 1013 1029 "No alloc id for testbed %s !?" % master) 1014 starter = self.start_segment(log=log, debug=self.debug, 1015 testbed=master, cert_file=self.cert_file, 1016 cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, 1017 caller=self.call_StartSegment, 1018 log_collector=log_collector) 1019 if not starter(uri, aid, topo[master], True, attrs): 1020 failed.append(master) 1030 t = self.pooled_thread( 1031 target=self.start_segment(log=log, debug=self.debug, 1032 testbed=master, cert_file=self.cert_file, 1033 cert_pwd=self.cert_pwd, 1034 trusted_certs=self.trusted_certs, 1035 caller=self.call_StartSegment, 1036 log_collector=log_collector), 1037 args =(uri, aid, topo[master], True, attrs), 1038 name=master, pdata=thread_pool, trace_file=self.trace_file) 1039 threads.append(t) 1040 t.start() 1041 # Wait until the master finishes (keep pinging the log, though) 1042 mins = 0 1043 while not thread_pool.wait_for_all_done(60.0): 1044 mins += 1 1045 alloc_log.info("Waiting for master (it has been %d mins)" \ 1046 % mins) 1047 # update failed to include the master, if it failed 1048 failed = [ t.getName() for t in threads if not t.rv ] 1021 1049 1022 1050 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1023 1051 # If one failed clean up, unless fail_soft is set 1024 if failed and False:1052 if failed: 1025 1053 if not fail_soft: 1026 1054 thread_pool.clear() … … 1028 1056 # Create and start a thread to stop the segment 1029 1057 thread_pool.wait_for_slot() 1058 uri = self.tbmap.get(tb, None) 1030 1059 t = self.pooled_thread(\ 1031 target=self. stop_segment(log=log,1060 target=self.terminate_segment(log=log, 1032 1061 testbed=tb, 1033 keyfile=self.ssh_privkey_file, 1034 debug=self.debug), 1035 args=(tb, eid, tbparams), name=tb, 1062 cert_file=self.cert_file, 1063 cert_pwd=self.cert_pwd, 1064 trusted_certs=self.trusted_certs, 1065 caller=self.call_TerminateSegment), 1066 args=(uri, tbparams[tb]['federant']['allocID']), 1067 name=tb, 1036 1068 pdata=thread_pool, trace_file=self.trace_file) 1037 1069 t.start()
Note: See TracChangeset
for help on using the changeset viewer.