- Timestamp:
- Jan 7, 2009 10:50:21 AM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- b0c185f
- Parents:
- f9665d1
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rf9665d1 r8bc5754 44 44 task. Manages the mutual exclusion of the states. 45 45 """ 46 def __init__(self ):46 def __init__(self, nthreads): 47 47 """ 48 48 Start a pool. … … 51 51 self.started = 0 52 52 self.terminated = 0 53 self.nthreads = nthreads 53 54 54 55 def acquire(self): … … 97 98 self.changed.notifyAll() 98 99 self.changed.release() 100 101 def wait_for_slot(self): 102 """ 103 Wait until we have a free slot to start another pooled thread 104 """ 105 self.acquire() 106 while self.started - self.terminated >= self.nthreads: 107 self.wait() 108 self.release() 109 110 def wait_for_all_done(self): 111 """ 112 Wait until all active threads finish (and at least one has started) 113 """ 114 self.acquire() 115 while self.started == 0 or self.started > self.terminated: 116 self.wait() 117 self.release() 99 118 100 119 class pooled_thread(Thread): … … 1700 1719 raise e 1701 1720 1702 thread_pool _info = self.thread_pool()1721 thread_pool = self.thread_pool(self.nthreads) 1703 1722 threads = [ ] 1704 1723 1705 1724 for tb in [ k for k in allocated.keys() if k != master]: 1706 # Wait until we have a free slot to start the next testbed load1707 thread_pool_info.acquire()1708 while thread_pool_info.started - \1709 thread_pool_info.terminated >= self.nthreads:1710 thread_pool_info.wait()1711 thread_pool_info.release()1712 1713 1725 # Create and start a thread to start the segment, and save it to 1714 1726 # get the return value later 1727 thread_pool.wait_for_slot() 1715 1728 t = self.pooled_thread(target=self.start_segment, 1716 1729 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1717 pdata=thread_pool _info, trace_file=self.trace_file)1730 pdata=thread_pool, trace_file=self.trace_file) 1718 1731 threads.append(t) 1719 1732 t.start() 1720 1733 1721 # Wait until all finish (the first clause of the while is to make sure 1722 # one starts) 1723 thread_pool_info.acquire() 1724 while thread_pool_info.started == 0 or \ 1725 thread_pool_info.started > thread_pool_info.terminated: 1726 thread_pool_info.wait() 1727 thread_pool_info.release() 1734 # Wait until all finish 1735 thread_pool.wait_for_all_done() 1728 1736 1729 1737 # If none failed, start the master … … 1738 1746 if failed: 1739 1747 if not fail_soft: 1748 thread_pool.clear() 1740 1749 for tb in succeeded: 1741 self.stop_segment(tb, eid, tbparams) 1750 # Create and start a thread to stop the segment 1751 thread_pool.wait_for_slot() 1752 t = self.pooled_thread(target=self.stop_segment, 1753 args=(tb, eid, tbparams), name=tb, 1754 pdata=thread_pool, trace_file=self.trace_file) 1755 t.start() 1756 # Wait until all finish 1757 thread_pool.wait_for_all_done() 1758 1742 1759 # release the allocations 1743 1760 for tb in tbparams.keys(): … … 2031 2048 2032 2049 # Stop everyone. 2050 thread_pool = self.thread_pool(self.nthreads) 2033 2051 for tb in tbparams.keys(): 2034 self.stop_segment(tb, tbparams[tb]['eid'], tbparams) 2052 # Create and start a thread to stop the segment 2053 thread_pool.wait_for_slot() 2054 t = self.pooled_thread(target=self.stop_segment, 2055 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2056 pdata=thread_pool, trace_file=self.trace_file) 2057 t.start() 2058 # Wait for completions 2059 thread_pool.wait_for_all_done() 2035 2060 2036 2061 # release the allocations
Note: See TracChangeset
for help on using the changeset viewer.