source: fedd/federation/thread_pool.py @ e83f2f2

axis_examplecompt_changesinfo-ops
Last change on this file since e83f2f2 was e83f2f2, checked in by Ted Faber <faber@…>, 13 years ago

Move proofs around. Lots of changes, including fault handling.

  • Property mode set to 100644
File size: 3.3 KB
RevLine 
[faea607]1#!/usr/local/bin/python
2
3import logging
4import time
5from threading import Lock, Thread, Condition
[e83f2f2]6from service_error import service_error
[faea607]7
8class thread_pool:
9    """
10    A class to keep track of a set of threads all invoked for the same
11    task.  Manages the mutual exclusion of the states.
12    """
13    def __init__(self, nthreads):
14        """
15        Start a pool.
16        """
17        self.changed = Condition()
18        self.started = 0
19        self.terminated = 0
20        self.nthreads = nthreads
21
22    def acquire(self):
23        """
24        Get the pool's lock.
25        """
26        self.changed.acquire()
27
28    def release(self):
29        """
30        Release the pool's lock.
31        """
32        self.changed.release()
33
34    def wait(self, timeout = None):
35        """
36        Wait for a pool thread to start or stop.
37        """
38        self.changed.wait(timeout)
39
40    def start(self):
41        """
42        Called by a pool thread to report starting.
43        """
44        self.changed.acquire()
45        self.started += 1
46        self.changed.notifyAll()
47        self.changed.release()
48
49    def terminate(self):
50        """
51        Called by a pool thread to report finishing.
52        """
53        self.changed.acquire()
54        self.terminated += 1
55        self.changed.notifyAll()
56        self.changed.release()
57
58    def clear(self):
59        """
60        Clear all pool data.
61        """
62        self.changed.acquire()
63        self.started = 0
64        self.terminated =0
65        self.changed.notifyAll()
66        self.changed.release()
67
68    def wait_for_slot(self):
69        """
70        Wait until we have a free slot to start another pooled thread
71        """
72        self.acquire()
73        while self.started - self.terminated >= self.nthreads:
74            self.wait()
75        self.release()
76
77    def wait_for_all_done(self, timeout=None):
78        """
79        Wait until all active threads finish (and at least one has
80        started).  If a timeout is given, return after waiting that long
81        for termination.  If all threads are done (and one has started in
82        the since the last clear()) return True, otherwise False.
83        """
84        if timeout:
85            deadline = time.time() + timeout
86        self.acquire()
87        while self.started == 0 or self.started > self.terminated:
88            self.wait(timeout)
89            if timeout:
90                if time.time() > deadline:
91                    break
92                timeout = deadline - time.time()
93        self.release()
94        return not (self.started == 0 or self.started > self.terminated)
95
96class pooled_thread(Thread):
97    """
98    One of a set of threads dedicated to a specific task.  Uses the
99    thread_pool class above for coordination.
100    """
101    def __init__(self, group=None, target=None, name=None, args=(), 
102            kwargs={}, pdata=None, trace_file=None):
103        Thread.__init__(self, group, target, name, args, kwargs)
104        self.rv = None          # Return value of the ops in this thread
105        self.exception = None   # Exception that terminated this thread
106        self.target=target      # Target function to run on start()
107        self.args = args        # Args to pass to target
108        self.kwargs = kwargs    # Additional kw args
109        self.pdata = pdata      # thread_pool for this class
110        # Logger for this thread
111        self.log = logging.getLogger("fedd.experiment_control")
112   
113    def run(self):
114        """
115        Emulate Thread.run, except add pool data manipulation and error
116        logging.
117        """
118        if self.pdata:
119            self.pdata.start()
120
121        if self.target:
122            try:
123                self.rv = self.target(*self.args, **self.kwargs)
124            except service_error, s:
125                self.exception = s
126                self.log.error("Thread exception: %s %s" % \
127                        (s.code_string(), s.desc))
128            except:
129                self.exception = sys.exc_info()[1]
130                self.log.error(("Unexpected thread exception: %s" +\
131                        "Trace %s") % (self.exception,\
132                            traceback.format_exc()))
133        if self.pdata:
134            self.pdata.terminate()
Note: See TracBrowser for help on using the repository browser.