- Timestamp:
- Aug 21, 2008 6:12:35 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 3441fe3
- Parents:
- 6546868
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd_create_experiment.py
r6546868 r1af38d6 16 16 import tempfile 17 17 import copy 18 19 from threading import * 18 20 19 21 from subprocess import * … … 35 37 debug=False, 36 38 muxmax=2, 39 nthreads=2, 37 40 project_user = "faber", 38 41 scp_exec="/usr/bin/scp", … … 50 53 ): 51 54 self.scripts = fedd_create_experiment_local.scripts 55 self.thread_with_rv = fedd_create_experiment_local.pooled_thread 56 self.thread_pool = fedd_create_experiment_local.thread_pool 52 57 53 58 self.cert_file = cert_file … … 55 60 self.debug = debug 56 61 self.muxmax = muxmax 62 self.nthreads = nthreads 57 63 self.project_user = project_user 58 64 self.scp_exec = scp_exec … … 93 99 raise service_error(service_error.server_config, 94 100 "%s/%s not in local script dir" % (self.scripts_dir, s)) 101 class thread_pool: 102 def __init__(self): 103 self.changed = Condition() 104 self.started = 0 105 self.terminated = 0 106 107 def acquire(self): 108 self.changed.acquire() 109 110 def release(self): 111 self.changed.release() 112 113 def wait(self, timeout = None): 114 self.changed.wait(timeout) 115 116 def start(self): 117 self.changed.acquire() 118 self.started += 1 119 self.changed.notifyAll() 120 self.changed.release() 121 122 def terminate(self): 123 self.changed.acquire() 124 self.terminated += 1 125 self.changed.notifyAll() 126 self.changed.release() 127 128 def clear(self): 129 self.changed.acquire() 130 self.started = 0 131 self.terminated =0 132 self.changed.notifyAll() 133 self.changed.release() 134 135 136 137 class pooled_thread(Thread): 138 def __init__(self, group=None, target=None, name=None, args=(), 139 kwargs={}, pdata=None): 140 Thread.__init__(self, group, target, name, args, kwargs) 141 self.rv = None 142 self.exception = None 143 self.target=target 144 self.args = args 145 self.kwargs = kwargs 146 self.pdata = pdata 147 148 def run(self): 149 if self.pdata: 150 self.pdata.start() 151 152 if self.target: 153 try: 154 self.rv = self.target(*self.args, **self.kwargs) 155 except e: 156 self.exception = e 157 158 print "%s done: %s" % (self.getName(), self.rv) 159 if self.pdata: 160 self.pdata.terminate() 95 161 96 162 def copy_file(self, src, dest, size=1024): … … 686 752 active_end, tbparams, dtb, myname, desthost, type): 687 753 """ 688 Produce a gateway configuration file from a line of the gateways section754 Produce a gateway configuration file from a gateways line. 689 755 """ 690 756 … … 1011 1077 raise service_error(service_error.internal, 1012 1078 "Cannot stage tarfile/rpm: %s" % e.strerror) 1013 1014 # XXX: more parallelism 1015 for tb in allocated.iterkeys(): 1016 if tb != master: 1017 if self.start_segment(tb, eid, tbparams, tmpdir, 0): 1018 started[tb] = True 1019 else: 1020 break 1021 1022 if len(started) == len(allocated)-1: 1023 if self.start_segment(master, eid, tbparams, tmpdir): 1024 started[master] = True 1025 1026 if not fail_soft and len(allocated) != len(started): 1027 for tb in started.iterkeys(): 1079 1080 thread_pool_info = self.thread_pool() 1081 threads = [ ] 1082 1083 for tb in [ k for k in allocated.keys() if k != master]: 1084 # Wait until we have a free slot to start the next testbed load 1085 thread_pool_info.acquire() 1086 while thread_pool_info.started - \ 1087 thread_pool_info.terminated >= self.nthreads: 1088 thread_pool_info.wait() 1089 thread_pool_info.release() 1090 1091 # Create and start a thread to start the segment, and save it to 1092 # get the return value later 1093 t = self.pooled_thread(target=self.start_segment, 1094 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1095 pdata=thread_pool_info) 1096 threads.append(t) 1097 t.start() 1098 1099 # Wait until all finish (the first clause of the while is to make sure 1100 # one starts) 1101 thread_pool_info.acquire() 1102 while thread_pool_info.started == 0 or \ 1103 thread_pool_info.started > thread_pool_info.terminated: 1104 thread_pool_info.wait() 1105 thread_pool_info.release() 1106 1107 # If none failed, start the master 1108 failed = [ t.getName() for t in threads if not t.rv ] 1109 1110 if len(failed) == 0: 1111 if not self.start_segment(master, eid, tbparams, tmpdir): 1112 failed.append(master) 1113 1114 # If one failed clean up 1115 if not fail_soft and len(failed) > 0: 1116 for tb in failed: 1028 1117 self.stop_segment(tb, eid, tbparams) 1029 1118 else:
Note: See TracChangeset
for help on using the changeset viewer.