#!/usr/local/bin/python import logging import time from threading import Lock, Thread, Condition from service_error import service_error class thread_pool: """ A class to keep track of a set of threads all invoked for the same task. Manages the mutual exclusion of the states. """ def __init__(self, nthreads): """ Start a pool. """ self.changed = Condition() self.started = 0 self.terminated = 0 self.nthreads = nthreads def acquire(self): """ Get the pool's lock. """ self.changed.acquire() def release(self): """ Release the pool's lock. """ self.changed.release() def wait(self, timeout = None): """ Wait for a pool thread to start or stop. """ self.changed.wait(timeout) def start(self): """ Called by a pool thread to report starting. """ self.changed.acquire() self.started += 1 self.changed.notifyAll() self.changed.release() def terminate(self): """ Called by a pool thread to report finishing. """ self.changed.acquire() self.terminated += 1 self.changed.notifyAll() self.changed.release() def clear(self): """ Clear all pool data. """ self.changed.acquire() self.started = 0 self.terminated =0 self.changed.notifyAll() self.changed.release() def wait_for_slot(self): """ Wait until we have a free slot to start another pooled thread """ self.acquire() while self.started - self.terminated >= self.nthreads: self.wait() self.release() def wait_for_all_done(self, timeout=None): """ Wait until all active threads finish (and at least one has started). If a timeout is given, return after waiting that long for termination. If all threads are done (and one has started in the since the last clear()) return True, otherwise False. """ if timeout: deadline = time.time() + timeout self.acquire() while self.started == 0 or self.started > self.terminated: self.wait(timeout) if timeout: if time.time() > deadline: break timeout = deadline - time.time() self.release() return not (self.started == 0 or self.started > self.terminated) class pooled_thread(Thread): """ One of a set of threads dedicated to a specific task. Uses the thread_pool class above for coordination. """ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, pdata=None, trace_file=None): Thread.__init__(self, group, target, name, args, kwargs) self.rv = None # Return value of the ops in this thread self.exception = None # Exception that terminated this thread self.target=target # Target function to run on start() self.args = args # Args to pass to target self.kwargs = kwargs # Additional kw args self.pdata = pdata # thread_pool for this class # Logger for this thread self.log = logging.getLogger("fedd.experiment_control") def run(self): """ Emulate Thread.run, except add pool data manipulation and error logging. """ if self.pdata: self.pdata.start() if self.target: try: self.rv = self.target(*self.args, **self.kwargs) except service_error, s: self.exception = s self.log.error("Thread exception: %s %s" % \ (s.code_string(), s.desc)) except: self.exception = sys.exc_info()[1] self.log.error(("Unexpected thread exception: %s" +\ "Trace %s") % (self.exception,\ traceback.format_exc())) if self.pdata: self.pdata.terminate()