source: fedd/federation/thread_pool.py @ cd60510

axis_examplecompt_changesinfo-ops
Last change on this file since cd60510 was faea607, checked in by Ted Faber <faber@…>, 14 years ago

Move thread pools into their own package. Starting on #10

  • Property mode set to 100644
File size: 3.3 KB
Line 
1#!/usr/local/bin/python
2
3import logging
4import time
5from threading import Lock, Thread, Condition
6
7class thread_pool:
8    """
9    A class to keep track of a set of threads all invoked for the same
10    task.  Manages the mutual exclusion of the states.
11    """
12    def __init__(self, nthreads):
13        """
14        Start a pool.
15        """
16        self.changed = Condition()
17        self.started = 0
18        self.terminated = 0
19        self.nthreads = nthreads
20
21    def acquire(self):
22        """
23        Get the pool's lock.
24        """
25        self.changed.acquire()
26
27    def release(self):
28        """
29        Release the pool's lock.
30        """
31        self.changed.release()
32
33    def wait(self, timeout = None):
34        """
35        Wait for a pool thread to start or stop.
36        """
37        self.changed.wait(timeout)
38
39    def start(self):
40        """
41        Called by a pool thread to report starting.
42        """
43        self.changed.acquire()
44        self.started += 1
45        self.changed.notifyAll()
46        self.changed.release()
47
48    def terminate(self):
49        """
50        Called by a pool thread to report finishing.
51        """
52        self.changed.acquire()
53        self.terminated += 1
54        self.changed.notifyAll()
55        self.changed.release()
56
57    def clear(self):
58        """
59        Clear all pool data.
60        """
61        self.changed.acquire()
62        self.started = 0
63        self.terminated =0
64        self.changed.notifyAll()
65        self.changed.release()
66
67    def wait_for_slot(self):
68        """
69        Wait until we have a free slot to start another pooled thread
70        """
71        self.acquire()
72        while self.started - self.terminated >= self.nthreads:
73            self.wait()
74        self.release()
75
76    def wait_for_all_done(self, timeout=None):
77        """
78        Wait until all active threads finish (and at least one has
79        started).  If a timeout is given, return after waiting that long
80        for termination.  If all threads are done (and one has started in
81        the since the last clear()) return True, otherwise False.
82        """
83        if timeout:
84            deadline = time.time() + timeout
85        self.acquire()
86        while self.started == 0 or self.started > self.terminated:
87            self.wait(timeout)
88            if timeout:
89                if time.time() > deadline:
90                    break
91                timeout = deadline - time.time()
92        self.release()
93        return not (self.started == 0 or self.started > self.terminated)
94
95class pooled_thread(Thread):
96    """
97    One of a set of threads dedicated to a specific task.  Uses the
98    thread_pool class above for coordination.
99    """
100    def __init__(self, group=None, target=None, name=None, args=(), 
101            kwargs={}, pdata=None, trace_file=None):
102        Thread.__init__(self, group, target, name, args, kwargs)
103        self.rv = None          # Return value of the ops in this thread
104        self.exception = None   # Exception that terminated this thread
105        self.target=target      # Target function to run on start()
106        self.args = args        # Args to pass to target
107        self.kwargs = kwargs    # Additional kw args
108        self.pdata = pdata      # thread_pool for this class
109        # Logger for this thread
110        self.log = logging.getLogger("fedd.experiment_control")
111   
112    def run(self):
113        """
114        Emulate Thread.run, except add pool data manipulation and error
115        logging.
116        """
117        if self.pdata:
118            self.pdata.start()
119
120        if self.target:
121            try:
122                self.rv = self.target(*self.args, **self.kwargs)
123            except service_error, s:
124                self.exception = s
125                self.log.error("Thread exception: %s %s" % \
126                        (s.code_string(), s.desc))
127            except:
128                self.exception = sys.exc_info()[1]
129                self.log.error(("Unexpected thread exception: %s" +\
130                        "Trace %s") % (self.exception,\
131                            traceback.format_exc()))
132        if self.pdata:
133            self.pdata.terminate()
Note: See TracBrowser for help on using the repository browser.