Changeset 8bc5754


Ignore:
Timestamp:
Jan 7, 2009 10:50:21 AM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
b0c185f
Parents:
f9665d1
Message:

Parallelize testbed swapouts, including failed creations and terminate calls.
I also cleaned up the thread_pool interface some to aviod cutting and pasting
rather cryptic code throughout those functions. The interface is still a
little odd, but much more palatable.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rf9665d1 r8bc5754  
    4444        task.  Manages the mutual exclusion of the states.
    4545        """
    46         def __init__(self):
     46        def __init__(self, nthreads):
    4747            """
    4848            Start a pool.
     
    5151            self.started = 0
    5252            self.terminated = 0
     53            self.nthreads = nthreads
    5354
    5455        def acquire(self):
     
    9798            self.changed.notifyAll()
    9899            self.changed.release()
     100
     101        def wait_for_slot(self):
     102            """
     103            Wait until we have a free slot to start another pooled thread
     104            """
     105            self.acquire()
     106            while self.started - self.terminated >= self.nthreads:
     107                self.wait()
     108            self.release()
     109
     110        def wait_for_all_done(self):
     111            """
     112            Wait until all active threads finish (and at least one has started)
     113            """
     114            self.acquire()
     115            while self.started == 0 or self.started > self.terminated:
     116                self.wait()
     117            self.release()
    99118
    100119    class pooled_thread(Thread):
     
    17001719            raise e
    17011720
    1702         thread_pool_info = self.thread_pool()
     1721        thread_pool = self.thread_pool(self.nthreads)
    17031722        threads = [ ]
    17041723
    17051724        for tb in [ k for k in allocated.keys() if k != master]:
    1706             # Wait until we have a free slot to start the next testbed load
    1707             thread_pool_info.acquire()
    1708             while thread_pool_info.started - \
    1709                     thread_pool_info.terminated >= self.nthreads:
    1710                 thread_pool_info.wait()
    1711             thread_pool_info.release()
    1712 
    17131725            # Create and start a thread to start the segment, and save it to
    17141726            # get the return value later
     1727            thread_pool.wait_for_slot()
    17151728            t  = self.pooled_thread(target=self.start_segment,
    17161729                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1717                     pdata=thread_pool_info, trace_file=self.trace_file)
     1730                    pdata=thread_pool, trace_file=self.trace_file)
    17181731            threads.append(t)
    17191732            t.start()
    17201733
    1721         # Wait until all finish (the first clause of the while is to make sure
    1722         # one starts)
    1723         thread_pool_info.acquire()
    1724         while thread_pool_info.started == 0 or \
    1725                 thread_pool_info.started > thread_pool_info.terminated:
    1726             thread_pool_info.wait()
    1727         thread_pool_info.release()
     1734        # Wait until all finish
     1735        thread_pool.wait_for_all_done()
    17281736
    17291737        # If none failed, start the master
     
    17381746        if failed:
    17391747            if not fail_soft:
     1748                thread_pool.clear()
    17401749                for tb in succeeded:
    1741                     self.stop_segment(tb, eid, tbparams)
     1750                    # Create and start a thread to stop the segment
     1751                    thread_pool.wait_for_slot()
     1752                    t  = self.pooled_thread(target=self.stop_segment,
     1753                            args=(tb, eid, tbparams), name=tb,
     1754                            pdata=thread_pool, trace_file=self.trace_file)
     1755                    t.start()
     1756                # Wait until all finish
     1757                thread_pool.wait_for_all_done()
     1758
    17421759                # release the allocations
    17431760                for tb in tbparams.keys():
     
    20312048
    20322049            # Stop everyone.
     2050            thread_pool = self.thread_pool(self.nthreads)
    20332051            for tb in tbparams.keys():
    2034                 self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
     2052                # Create and start a thread to stop the segment
     2053                thread_pool.wait_for_slot()
     2054                t  = self.pooled_thread(target=self.stop_segment,
     2055                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
     2056                        pdata=thread_pool, trace_file=self.trace_file)
     2057                t.start()
     2058            # Wait for completions
     2059            thread_pool.wait_for_all_done()
    20352060
    20362061            # release the allocations
Note: See TracChangeset for help on using the changeset viewer.