source: fedd/federation/thread_pool.py @ b745876

compt_changes
Last change on this file since b745876 was 42dedbd, checked in by Ted Faber <faber@…>, 14 years ago

Missing includes for debugging code

  • Property mode set to 100644
File size: 3.4 KB
RevLine 
[faea607]1#!/usr/local/bin/python
2
3import logging
4import time
[42dedbd]5import sys 
6import traceback
7
[faea607]8from threading import Lock, Thread, Condition
[e83f2f2]9from service_error import service_error
[faea607]10
11class thread_pool:
12    """
13    A class to keep track of a set of threads all invoked for the same
14    task.  Manages the mutual exclusion of the states.
15    """
16    def __init__(self, nthreads):
17        """
18        Start a pool.
19        """
20        self.changed = Condition()
21        self.started = 0
22        self.terminated = 0
23        self.nthreads = nthreads
24
25    def acquire(self):
26        """
27        Get the pool's lock.
28        """
29        self.changed.acquire()
30
31    def release(self):
32        """
33        Release the pool's lock.
34        """
35        self.changed.release()
36
37    def wait(self, timeout = None):
38        """
39        Wait for a pool thread to start or stop.
40        """
41        self.changed.wait(timeout)
42
43    def start(self):
44        """
45        Called by a pool thread to report starting.
46        """
47        self.changed.acquire()
48        self.started += 1
49        self.changed.notifyAll()
50        self.changed.release()
51
52    def terminate(self):
53        """
54        Called by a pool thread to report finishing.
55        """
56        self.changed.acquire()
57        self.terminated += 1
58        self.changed.notifyAll()
59        self.changed.release()
60
61    def clear(self):
62        """
63        Clear all pool data.
64        """
65        self.changed.acquire()
66        self.started = 0
67        self.terminated =0
68        self.changed.notifyAll()
69        self.changed.release()
70
71    def wait_for_slot(self):
72        """
73        Wait until we have a free slot to start another pooled thread
74        """
75        self.acquire()
76        while self.started - self.terminated >= self.nthreads:
77            self.wait()
78        self.release()
79
80    def wait_for_all_done(self, timeout=None):
81        """
82        Wait until all active threads finish (and at least one has
83        started).  If a timeout is given, return after waiting that long
84        for termination.  If all threads are done (and one has started in
85        the since the last clear()) return True, otherwise False.
86        """
87        if timeout:
88            deadline = time.time() + timeout
89        self.acquire()
90        while self.started == 0 or self.started > self.terminated:
91            self.wait(timeout)
92            if timeout:
93                if time.time() > deadline:
94                    break
95                timeout = deadline - time.time()
96        self.release()
97        return not (self.started == 0 or self.started > self.terminated)
98
99class pooled_thread(Thread):
100    """
101    One of a set of threads dedicated to a specific task.  Uses the
102    thread_pool class above for coordination.
103    """
104    def __init__(self, group=None, target=None, name=None, args=(), 
105            kwargs={}, pdata=None, trace_file=None):
106        Thread.__init__(self, group, target, name, args, kwargs)
107        self.rv = None          # Return value of the ops in this thread
108        self.exception = None   # Exception that terminated this thread
109        self.target=target      # Target function to run on start()
110        self.args = args        # Args to pass to target
111        self.kwargs = kwargs    # Additional kw args
112        self.pdata = pdata      # thread_pool for this class
113        # Logger for this thread
114        self.log = logging.getLogger("fedd.experiment_control")
115   
116    def run(self):
117        """
118        Emulate Thread.run, except add pool data manipulation and error
119        logging.
120        """
121        if self.pdata:
122            self.pdata.start()
123
124        if self.target:
125            try:
126                self.rv = self.target(*self.args, **self.kwargs)
127            except service_error, s:
128                self.exception = s
129                self.log.error("Thread exception: %s %s" % \
130                        (s.code_string(), s.desc))
131            except:
132                self.exception = sys.exc_info()[1]
133                self.log.error(("Unexpected thread exception: %s" +\
134                        "Trace %s") % (self.exception,\
135                            traceback.format_exc()))
136        if self.pdata:
137            self.pdata.terminate()
Note: See TracBrowser for help on using the repository browser.