Changeset faea607


Ignore:
Timestamp:
Dec 3, 2010 3:01:34 PM (13 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master
Children:
1d73342
Parents:
6fd2b29
Message:

Move thread pools into their own package. Starting on #10

Location:
fedd/federation
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r6fd2b29 rfaea607  
    3232from experiment_partition import experiment_partition
    3333from authorizer import abac_authorizer
     34from thread_pool import thread_pool, pooled_thread
    3435
    3536import topdl
     
    8182    class ssh_cmd_timeout(RuntimeError): pass
    8283   
    83     class thread_pool:
    84         """
    85         A class to keep track of a set of threads all invoked for the same
    86         task.  Manages the mutual exclusion of the states.
    87         """
    88         def __init__(self, nthreads):
    89             """
    90             Start a pool.
    91             """
    92             self.changed = Condition()
    93             self.started = 0
    94             self.terminated = 0
    95             self.nthreads = nthreads
    96 
    97         def acquire(self):
    98             """
    99             Get the pool's lock.
    100             """
    101             self.changed.acquire()
    102 
    103         def release(self):
    104             """
    105             Release the pool's lock.
    106             """
    107             self.changed.release()
    108 
    109         def wait(self, timeout = None):
    110             """
    111             Wait for a pool thread to start or stop.
    112             """
    113             self.changed.wait(timeout)
    114 
    115         def start(self):
    116             """
    117             Called by a pool thread to report starting.
    118             """
    119             self.changed.acquire()
    120             self.started += 1
    121             self.changed.notifyAll()
    122             self.changed.release()
    123 
    124         def terminate(self):
    125             """
    126             Called by a pool thread to report finishing.
    127             """
    128             self.changed.acquire()
    129             self.terminated += 1
    130             self.changed.notifyAll()
    131             self.changed.release()
    132 
    133         def clear(self):
    134             """
    135             Clear all pool data.
    136             """
    137             self.changed.acquire()
    138             self.started = 0
    139             self.terminated =0
    140             self.changed.notifyAll()
    141             self.changed.release()
    142 
    143         def wait_for_slot(self):
    144             """
    145             Wait until we have a free slot to start another pooled thread
    146             """
    147             self.acquire()
    148             while self.started - self.terminated >= self.nthreads:
    149                 self.wait()
    150             self.release()
    151 
    152         def wait_for_all_done(self, timeout=None):
    153             """
    154             Wait until all active threads finish (and at least one has
    155             started).  If a timeout is given, return after waiting that long
    156             for termination.  If all threads are done (and one has started in
    157             the since the last clear()) return True, otherwise False.
    158             """
    159             if timeout:
    160                 deadline = time.time() + timeout
    161             self.acquire()
    162             while self.started == 0 or self.started > self.terminated:
    163                 self.wait(timeout)
    164                 if timeout:
    165                     if time.time() > deadline:
    166                         break
    167                     timeout = deadline - time.time()
    168             self.release()
    169             return not (self.started == 0 or self.started > self.terminated)
    170 
    171     class pooled_thread(Thread):
    172         """
    173         One of a set of threads dedicated to a specific task.  Uses the
    174         thread_pool class above for coordination.
    175         """
    176         def __init__(self, group=None, target=None, name=None, args=(),
    177                 kwargs={}, pdata=None, trace_file=None):
    178             Thread.__init__(self, group, target, name, args, kwargs)
    179             self.rv = None          # Return value of the ops in this thread
    180             self.exception = None   # Exception that terminated this thread
    181             self.target=target      # Target function to run on start()
    182             self.args = args        # Args to pass to target
    183             self.kwargs = kwargs    # Additional kw args
    184             self.pdata = pdata      # thread_pool for this class
    185             # Logger for this thread
    186             self.log = logging.getLogger("fedd.experiment_control")
    187        
    188         def run(self):
    189             """
    190             Emulate Thread.run, except add pool data manipulation and error
    191             logging.
    192             """
    193             if self.pdata:
    194                 self.pdata.start()
    195 
    196             if self.target:
    197                 try:
    198                     self.rv = self.target(*self.args, **self.kwargs)
    199                 except service_error, s:
    200                     self.exception = s
    201                     self.log.error("Thread exception: %s %s" % \
    202                             (s.code_string(), s.desc))
    203                 except:
    204                     self.exception = sys.exc_info()[1]
    205                     self.log.error(("Unexpected thread exception: %s" +\
    206                             "Trace %s") % (self.exception,\
    207                                 traceback.format_exc()))
    208             if self.pdata:
    209                 self.pdata.terminate()
    210 
    21184    call_RequestAccess = service_caller('RequestAccess')
    21285    call_ReleaseAccess = service_caller('ReleaseAccess')
     
    234107            return rv
    235108
    236         self.thread_with_rv = experiment_control_local.pooled_thread
    237         self.thread_pool = experiment_control_local.thread_pool
    238109        self.list_log = list_log.list_log
    239110
     
    11411012        log = alloc_log or self.log
    11421013
    1143         thread_pool = self.thread_pool(self.nthreads)
     1014        tp = thread_pool(self.nthreads)
    11441015        threads = [ ]
    11451016        starters = [ ]
     
    11561027            # to get the return value later
    11571028            tb_attrs = copy.copy(attrs)
    1158             thread_pool.wait_for_slot()
     1029            tp.wait_for_slot()
    11591030            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
    11601031            base, suffix = split_testbed(tb)
     
    11811052                    log_collector=log_collector)
    11821053            starters.append(s)
    1183             t  = self.pooled_thread(\
     1054            t  = pooled_thread(\
    11841055                    target=s, name=tb,
    11851056                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
    1186                     pdata=thread_pool, trace_file=self.trace_file)
     1057                    pdata=tp, trace_file=self.trace_file)
    11871058            threads.append(t)
    11881059            t.start()
     
    11911062        mins = 0
    11921063        revoked = False
    1193         while not thread_pool.wait_for_all_done(60.0):
     1064        while not tp.wait_for_all_done(60.0):
    11941065            mins += 1
    11951066            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
     
    12141085        if failed:
    12151086            if not fail_soft:
    1216                 thread_pool.clear()
     1087                tp.clear()
    12171088                for tb in succeeded:
    12181089                    # Create and start a thread to stop the segment
    1219                     thread_pool.wait_for_slot()
     1090                    tp.wait_for_slot()
    12201091                    uri = tbparams[tb]['uri']
    1221                     t  = self.pooled_thread(\
     1092                    t  = pooled_thread(\
    12221093                            target=self.terminate_segment(log=log,
    12231094                                testbed=tb,
     
    12281099                            args=(uri, tbparams[tb]['federant']['allocID']),
    12291100                            name=tb,
    1230                             pdata=thread_pool, trace_file=self.trace_file)
     1101                            pdata=tp, trace_file=self.trace_file)
    12311102                    t.start()
    12321103                # Wait until all finish (if any are being stopped)
    12331104                if succeeded:
    1234                     thread_pool.wait_for_all_done()
     1105                    tp.wait_for_all_done()
    12351106
    12361107                # release the allocations
     
    25082379                # no tbparams, no start.
    25092380                if len(tbparams) > 0:
    2510                     thread_pool = self.thread_pool(self.nthreads)
     2381                    tp = thread_pool(self.nthreads)
    25112382                    for k in tbparams.keys():
    25122383                        # Create and start a thread to stop the segment
    2513                         thread_pool.wait_for_slot()
     2384                        tp.wait_for_slot()
    25142385                        uri, aid = tbparams[k]
    2515                         t  = self.pooled_thread(\
     2386                        t  = pooled_thread(\
    25162387                                target=self.terminate_segment(log=dealloc_log,
    25172388                                    testbed=uri,
     
    25212392                                    caller=self.call_TerminateSegment),
    25222393                                args=(uri, aid), name=k,
    2523                                 pdata=thread_pool, trace_file=self.trace_file)
     2394                                pdata=tp, trace_file=self.trace_file)
    25242395                        t.start()
    25252396                    # Wait for completions
    2526                     thread_pool.wait_for_all_done()
     2397                    tp.wait_for_all_done()
    25272398
    25282399                # release the allocations (failed experiments have done this
Note: See TracChangeset for help on using the changeset viewer.