Changeset b4b19c7 for fedd/federation/experiment_control.py
- Timestamp:
- Apr 21, 2010 5:31:03 AM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- f7a54c6
- Parents:
- f54e8e4
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rf54e8e4 rb4b19c7 1007 1007 self.log_collector = log_collector 1008 1008 self.response = None 1009 self.node = { } 1010 1011 def make_map(self, resp): 1012 if 'segmentdescription' in resp and \ 1013 'topdldescription' in resp['segmentdescription']: 1014 top = topdl.Topology(\ 1015 **resp['segmentdescription']['topdldescription']) 1016 for e in [e for e in top.elements \ 1017 if isinstance(e, topdl.Computer)]: 1018 hn = e.get_attribute('hostname') 1019 if hn: 1020 for n in e.name: 1021 self.node[n] = hn 1009 1022 1010 1023 def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None): … … 1050 1063 for line in lval.splitlines(True): 1051 1064 self.log_collector.write(line) 1065 self.make_map(r['StartSegmentResponseBody']) 1052 1066 self.response = r 1053 1067 else: … … 1088 1102 1089 1103 def allocate_resources(self, allocated, masters, eid, expid, 1090 tbparams, top o, tmpdir, alloc_log=None, log_collector=None,1104 tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 1091 1105 attrs=None, connInfo={}): 1092 1106 … … 1101 1115 thread_pool = self.thread_pool(self.nthreads) 1102 1116 threads = [ ] 1117 starters = [ ] 1103 1118 1104 1119 for tb in allocated.keys(): … … 1118 1133 "No alloc id for testbed %s !?" % tb) 1119 1134 1135 s = self.start_segment(log=log, debug=self.debug, 1136 testbed=tb, cert_file=self.cert_file, 1137 cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, 1138 caller=self.call_StartSegment, 1139 log_collector=log_collector) 1140 starters.append(s) 1120 1141 t = self.pooled_thread(\ 1121 target=self.start_segment(log=log, debug=self.debug, 1122 testbed=tb, cert_file=self.cert_file, 1123 cert_pwd=self.cert_pwd, 1124 trusted_certs=self.trusted_certs, 1125 caller=self.call_StartSegment, 1126 log_collector=log_collector), 1142 target=s, name=tb, 1127 1143 args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]), 1128 name=tb,1129 1144 pdata=thread_pool, trace_file=self.trace_file) 1130 1145 threads.append(t) … … 1190 1205 return 1191 1206 else: 1207 # Walk through the successes and gather the virtual to physical 1208 # mapping. 1209 node = { } 1210 for s in starters: 1211 node.update(s.node) 1212 # Assigng the mapping as a hostname attribute 1213 for e in [ e for e in top.elements \ 1214 if isinstance(e, topdl.Computer)]: 1215 for n in e.name: 1216 if n in node: 1217 e.set_attribute('hostname', node[n]) 1192 1218 log.info("[start_segment]: Experiment %s active" % eid) 1193 1219 … … 1205 1231 log.debug("[start_experiment]: not removing %s" % tmpdir) 1206 1232 1207 # Insert the experiment into our state and update the disk copy 1233 # Insert the experiment into our state and update the disk copy. 1208 1234 self.state_lock.acquire() 1209 1235 self.state[expid]['experimentStatus'] = 'active' 1210 1236 self.state[eid] = self.state[expid] 1237 self.state[eid]['experimentdescription']['topdldescription'] = \ 1238 top.to_dict() 1211 1239 if self.state_filename: self.write_state() 1212 1240 self.state_lock.release() … … 1228 1256 1229 1257 1230 def create_experiment_state(self, fid, req, expid, expcert, 1258 def create_experiment_state(self, fid, req, expid, expcert, 1231 1259 state='starting'): 1232 1260 """ … … 1662 1690 # Add the service to masters 1663 1691 for tb in s.get('export', []): 1664 if s.get('name', None) and s.get('import', None):1692 if s.get('name', None): 1665 1693 if tb not in masters: 1666 1694 masters[tb] = [ ] … … 1676 1704 params=params)) 1677 1705 else: 1678 log.error('Testbed service does not have name " + \1706 self.log.error('Testbed service does not have name " + \ 1679 1707 "and importers') 1680 1708 … … 1761 1789 self.state[eid]['vtopo'] = vtopo 1762 1790 self.state[eid]['vis'] = vis 1791 self.state[eid]['experimentdescription'] = \ 1792 { 'topdldescription': top.clone() } 1763 1793 self.state[expid]['federant'] = \ 1764 1794 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ … … 1833 1863 t = Thread(target=self.allocate_resources, 1834 1864 args=(allocated, masters, eid, expid, tbparams, 1835 topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo), 1865 top, topo, tmpdir, alloc_log, alloc_collector, attrs, 1866 connInfo), 1836 1867 name=eid) 1837 1868 t.start() … … 2011 2042 if f.has_key('allocID'): del f['allocID'] 2012 2043 if f.has_key('uri'): del f['uri'] 2044 2013 2045 return rv 2014 2046
Note: See TracChangeset
for help on using the changeset viewer.