Changeset 1af38d6


Ignore:
Timestamp:
Aug 21, 2008 6:12:35 PM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
3441fe3
Parents:
6546868
Message:

multithreading for startup. Needs more testing

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd_create_experiment.py

    r6546868 r1af38d6  
    1616import tempfile
    1717import copy
     18
     19from threading import *
    1820
    1921from subprocess import *
     
    3537            debug=False,
    3638            muxmax=2,
     39            nthreads=2,
    3740            project_user = "faber",
    3841            scp_exec="/usr/bin/scp",
     
    5053            ):
    5154        self.scripts = fedd_create_experiment_local.scripts
     55        self.thread_with_rv = fedd_create_experiment_local.pooled_thread
     56        self.thread_pool = fedd_create_experiment_local.thread_pool
    5257
    5358        self.cert_file = cert_file
     
    5560        self.debug = debug
    5661        self.muxmax = muxmax
     62        self.nthreads = nthreads
    5763        self.project_user = project_user
    5864        self.scp_exec = scp_exec
     
    9399                raise service_error(service_error.server_config,
    94100                        "%s/%s not in local script dir" % (self.scripts_dir, s))
     101    class thread_pool:
     102        def __init__(self):
     103            self.changed = Condition()
     104            self.started = 0
     105            self.terminated = 0
     106
     107        def acquire(self):
     108            self.changed.acquire()
     109
     110        def release(self):
     111            self.changed.release()
     112
     113        def wait(self, timeout = None):
     114            self.changed.wait(timeout)
     115
     116        def start(self):
     117            self.changed.acquire()
     118            self.started += 1
     119            self.changed.notifyAll()
     120            self.changed.release()
     121
     122        def terminate(self):
     123            self.changed.acquire()
     124            self.terminated += 1
     125            self.changed.notifyAll()
     126            self.changed.release()
     127
     128        def clear(self):
     129            self.changed.acquire()
     130            self.started = 0
     131            self.terminated =0
     132            self.changed.notifyAll()
     133            self.changed.release()
     134
     135
     136
     137    class pooled_thread(Thread):
     138        def __init__(self, group=None, target=None, name=None, args=(),
     139                kwargs={}, pdata=None):
     140            Thread.__init__(self, group, target, name, args, kwargs)
     141            self.rv = None
     142            self.exception = None
     143            self.target=target
     144            self.args = args
     145            self.kwargs = kwargs
     146            self.pdata = pdata
     147       
     148        def run(self):
     149            if self.pdata:
     150                self.pdata.start()
     151
     152            if self.target:
     153                try:
     154                    self.rv = self.target(*self.args, **self.kwargs)
     155                except e:
     156                    self.exception = e
     157
     158            print "%s done: %s" % (self.getName(), self.rv)
     159            if self.pdata:
     160                self.pdata.terminate()
    95161
    96162    def copy_file(self, src, dest, size=1024):
     
    686752                active_end, tbparams, dtb, myname, desthost, type):
    687753            """
    688             Produce a gateway configuration file from a line of the gateways section
     754            Produce a gateway configuration file from a gateways line.
    689755            """
    690756
     
    10111077            raise service_error(service_error.internal,
    10121078                    "Cannot stage tarfile/rpm: %s" % e.strerror)
    1013        
    1014         # XXX: more parallelism
    1015         for tb in allocated.iterkeys():
    1016             if tb != master:
    1017                 if self.start_segment(tb, eid, tbparams, tmpdir, 0):
    1018                     started[tb] = True
    1019                 else:
    1020                     break
    1021 
    1022         if len(started) == len(allocated)-1:
    1023             if self.start_segment(master, eid, tbparams, tmpdir):
    1024                 started[master] = True
    1025        
    1026         if not fail_soft and  len(allocated) != len(started):
    1027             for tb in started.iterkeys():
     1079
     1080        thread_pool_info = self.thread_pool()
     1081        threads = [ ]
     1082
     1083        for tb in [ k for k in allocated.keys() if k != master]:
     1084            # Wait until we have a free slot to start the next testbed load
     1085            thread_pool_info.acquire()
     1086            while thread_pool_info.started - \
     1087                    thread_pool_info.terminated >= self.nthreads:
     1088                thread_pool_info.wait()
     1089            thread_pool_info.release()
     1090
     1091            # Create and start a thread to start the segment, and save it to
     1092            # get the return value later
     1093            t  = self.pooled_thread(target=self.start_segment,
     1094                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
     1095                    pdata=thread_pool_info)
     1096            threads.append(t)
     1097            t.start()
     1098
     1099        # Wait until all finish (the first clause of the while is to make sure
     1100        # one starts)
     1101        thread_pool_info.acquire()
     1102        while thread_pool_info.started == 0 or \
     1103                thread_pool_info.started > thread_pool_info.terminated:
     1104            thread_pool_info.wait()
     1105        thread_pool_info.release()
     1106
     1107        # If none failed, start the master
     1108        failed = [ t.getName() for t in threads if not t.rv ]
     1109
     1110        if len(failed) == 0:
     1111            if not self.start_segment(master, eid, tbparams, tmpdir):
     1112                failed.append(master)
     1113
     1114        # If one failed clean up
     1115        if not fail_soft and len(failed) > 0:
     1116            for tb in failed:
    10281117                self.stop_segment(tb, eid, tbparams)
    10291118        else:
Note: See TracChangeset for help on using the changeset viewer.