Changeset 29d5f7c
- Timestamp:
- Nov 18, 2011 6:43:19 PM (13 years ago)
- Branches:
- compt_changes, info-ops, master
- Children:
- 80b1e82
- Parents:
- 2ac64d1a
- Location:
- fedd
- Files:
-
- 1 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/Makefile
r2ac64d1a r29d5f7c 9 9 ${MODULE_DIR}/fedd_internal_server.py \ 10 10 ${MODULE_DIR}/fedd_internal_types.py \ 11 ${MODULE_DIR}/fedd_services_types.py \ 11 12 ${MODULE_DIR}/fedd_types.py \ 12 13 ${MODULE_DIR}/fedd_server.py -
fedd/fedd_ftopo.py
r2ac64d1a r29d5f7c 48 48 sys.exit("Error processing RPC: %s" % e) 49 49 50 emap = { }51 if 'embedding' in resp_dict:52 for e in resp_dict['embedding']:53 tn = e.get('toponame', None)54 if tn:55 emap[tn] = ":".join((tn,56 ",".join(e.get('physname', [])),e.get('testbed',"")))57 else:58 sys.exit("No embedding")59 60 50 if 'experimentdescription' in resp_dict and \ 61 51 'topdldescription' in resp_dict['experimentdescription']: … … 63 53 **resp_dict['experimentdescription']['topdldescription']) 64 54 for e in top.elements: 65 if isinstance(e, topdl.Computer) and e.name in emap: 66 print emap[e.name] 55 if isinstance(e, topdl.Computer): 56 print ":".join((e.name, ",".join(e.localname), e.status, 57 ",".join(e.operation), e.get_attribute('testbed') or "")) 67 58 else: 68 59 sys.exit("Badly formatted response (no experiment descrption)!?") -
fedd/federation/emulab_access.py
r2ac64d1a r29d5f7c 1016 1016 the response that may need to be replayed, and return the response. 1017 1017 """ 1018 i = 0 1018 1019 # Copy the assigned names into the return topology 1019 embedding = [ ] 1020 for n in starter.node: 1021 embedding.append({ 1022 'toponame': n, 1023 'physname': ["%s%s" % (starter.node[n], self.domain)], 1024 }) 1020 for e in topo.elements: 1021 if isinstance(e, topdl.Computer): 1022 if not self.create_debug: 1023 if e.name in starter.node: 1024 e.localname.append("%s%s" % \ 1025 (starter.node[e.name], self.domain)) 1026 e.status = 'active' 1027 else: 1028 # Simple debugging assignment 1029 e.localname.append("node%d%s" % (i, self.domain)) 1030 e.status = 'active' 1031 e.operation.extend(('testop1', 'testop2')) 1032 i += 1 1033 1025 1034 # Grab the log (this is some anal locking, but better safe than 1026 1035 # sorry) … … 1036 1045 'topdldescription': topo.clone().to_dict() 1037 1046 }, 1038 'embedding': embedding,1039 1047 'proof': proof.to_dict(), 1040 1048 } -
fedd/federation/experiment_control.py
r2ac64d1a r29d5f7c 36 36 from authorizer import abac_authorizer 37 37 from thread_pool import thread_pool, pooled_thread 38 from experiment_info import experiment_info, allocation_info 38 39 39 40 import topdl … … 286 287 287 288 @staticmethod 288 def get_alloc_ids(state): 289 """ 290 Pull the fedids of the identifiers of each allocation from the 291 state. Again, a dict dive that's best isolated. 292 293 Used by read_store and read state 294 """ 295 296 return [ f['allocID']['fedid'] 297 for f in state.get('federant',[]) \ 298 if f.has_key('allocID') and \ 299 f['allocID'].has_key('fedid')] 289 def get_alloc_ids(exp): 290 """ 291 Used by read_store and read state. This used to be worse. 292 """ 293 294 return [ a.allocID for a in exp.get_all_allocations() ] 295 300 296 301 297 # Call while holding self.state_lock … … 307 303 """ 308 304 309 def get_experiment_id(state):310 """311 Pull the fedid experimentID out of the saved state. This is kind312 of a gross walk through the dict.313 """314 315 if state.has_key('experimentID'):316 for e in state['experimentID']:317 if e.has_key('fedid'):318 return e['fedid']319 else:320 return None321 else:322 return None323 324 305 try: 325 306 f = open(self.state_filename, "r") … … 337 318 try: 338 319 339 eid = get_experiment_id(s)320 eid = s.fedid 340 321 if eid : 341 322 if self.auth_type == 'legacy': 342 323 # XXX: legacy 343 324 # Give the owner rights to the experiment 344 self.auth.set_attribute(s['owner'], eid)325 #self.auth.set_attribute(s['owner'], eid) 345 326 # And holders of the eid as well 346 327 self.auth.set_attribute(eid, eid) … … 809 790 if self.local_access.has_key(uri): 810 791 resp = self.local_access[uri].ReleaseAccess(\ 811 { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 792 { 'ReleaseAccessRequestBody' : 793 {'allocID': {'fedid': aid}},}, 812 794 fedid(file=cert_file)) 813 795 resp = { 'ReleaseAccessResponseBody': resp } 814 796 else: 815 resp = self.call_ReleaseAccess(uri, {'allocID': aid},797 resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} }, 816 798 cert_file, cert_pwd, self.trusted_certs) 817 799 … … 855 837 856 838 def make_map(self, resp): 857 for e in resp.get('embedding', []): 858 if 'toponame' in e and 'physname' in e: 859 self.node[e['toponame']] = e['physname'][0] 839 if 'segmentdescription' not in resp or \ 840 'topdldescription' not in resp['segmentdescription']: 841 self.log.warn('No topology returned from startsegment') 842 return 843 844 top = topdl.Topology( 845 **resp['segmentdescription']['topdldescription']) 846 847 for e in top.elements: 848 if isinstance(e, topdl.Computer): 849 self.node[e.name] = \ 850 (e.localname, e.status, e.service, e.operation) 860 851 861 852 def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None): … … 928 919 def __call__(self, uri, aid ): 929 920 req = { 930 'allocID': aid,921 'allocID': {'fedid': aid }, 931 922 } 932 923 try: … … 1056 1047 # Remove the placeholder 1057 1048 self.state_lock.acquire() 1058 self.state[eid]['experimentStatus']= 'failed'1049 self.state[eid].status = 'failed' 1059 1050 if self.state_filename: self.write_state() 1060 1051 self.state_lock.release() … … 1071 1062 return 1072 1063 else: 1073 # Walk through the successes and gather the virtual to physical 1074 # mapping. 1075 embedding = [ ] 1064 # Walk through the successes and gather the proofs 1076 1065 proofs = { } 1077 1066 for s in starters: 1078 for k, v in s.node.items():1079 embedding.append({1080 'toponame': k,1081 'physname': [ v],1082 'testbed': s.testbed1083 })1084 1067 if s.proof: 1085 1068 proofs[s.testbed] = s.proof 1069 1070 # Annotate the topology with embedding info 1071 for e in top.elements: 1072 if isinstance(e, topdl.Computer): 1073 for s in starters: 1074 ann = s.node.get(e.name, None) 1075 if ann is not None: 1076 e.localname.extend(ann[0]) 1077 e.status = ann[1] 1078 e.service.extend(ann[2]) 1079 e.operation.extend(ann[3]) 1080 break 1081 1086 1082 log.info("[start_segment]: Experiment %s active" % eid) 1087 1083 … … 1095 1091 # Insert the experiment into our state and update the disk copy. 1096 1092 self.state_lock.acquire() 1097 self.state[expid] ['experimentStatus']= 'active'1093 self.state[expid].status = 'active' 1098 1094 self.state[eid] = self.state[expid] 1099 self.state[eid]['experimentdescription']['topdldescription'] = \ 1100 top.to_dict() 1101 self.state[eid]['embedding'] = embedding 1095 self.state[eid].top = top 1102 1096 # Append startup proofs 1103 for f in self.state[eid]['federant']: 1104 if 'name' in f and 'localname' in f['name']: 1105 if f['name']['localname'] in proofs: 1106 f['proof'].append(proofs[f['name']['localname']]) 1097 for f in self.state[eid].get_all_allocations(): 1098 if f.tb in proofs: 1099 f.proof.append(proofs[f.tb]) 1107 1100 1108 1101 if self.state_filename: self.write_state() … … 1135 1128 1136 1129 if need_state_lock: self.state_lock.acquire() 1137 self.state[expid]['auth'].update(attrs) 1130 # XXX: really a no op? 1131 #self.state[expid]['auth'].update(attrs) 1138 1132 if self.state_filename: self.write_state() 1139 1133 if need_state_lock: self.state_lock.release() … … 1146 1140 1147 1141 if need_state_lock: self.state_lock.acquire() 1148 if expid in self.state and 'auth' in self.state[expid]: 1149 for p, a in self.state[expid]['auth']: 1150 self.auth.unset_attribute(p, a) 1151 self.state[expid]['auth'] = set() 1142 # XXX: should be a no-op 1143 #if expid in self.state and 'auth' in self.state[expid]: 1144 #for p, a in self.state[expid]['auth']: 1145 #self.auth.unset_attribute(p, a) 1146 #self.state[expid]['auth'] = set() 1152 1147 if self.state_filename: self.write_state() 1153 1148 if need_state_lock: self.state_lock.release() … … 1184 1179 if users_experiment: 1185 1180 self.state_lock.acquire() 1186 status = self.state[eid]. get('experimentStatus', None)1181 status = self.state[eid].status 1187 1182 if status and status == 'failed': 1188 1183 # remove the old access attributes … … 1200 1195 eid += random.choice(string.ascii_letters) 1201 1196 # Initial state 1202 self.state[eid] = { 1203 'experimentID' : \ 1204 [ { 'localname' : eid }, {'fedid': expid } ], 1205 'experimentStatus': state, 1206 'experimentAccess': { 'X509' : expcert }, 1207 'owner': fid, 1208 'log' : [], 1209 'auth': set(), 1210 } 1197 self.state[eid] = experiment_info(fedid=expid, localname=eid, 1198 identity=expcert) 1211 1199 self.state[expid] = self.state[eid] 1212 1200 if self.state_filename: self.write_state() … … 1222 1210 eid += random.choice(string.ascii_letters) 1223 1211 # Initial state 1224 self.state[eid] = { 1225 'experimentID' : \ 1226 [ { 'localname' : eid }, {'fedid': expid } ], 1227 'experimentStatus': state, 1228 'experimentAccess': { 'X509' : expcert }, 1229 'owner': fid, 1230 'log' : [], 1231 'auth': set(), 1232 } 1212 self.state[eid] = experiment_info(fedid=expid, localname=eid, 1213 identity=expcert) 1233 1214 self.state[expid] = self.state[eid] 1234 1215 if self.state_filename: self.write_state() … … 1668 1649 expid = eid = None 1669 1650 self.state_lock.acquire() 1670 if self.state.has_key(key): 1671 self.state[key]['experimentStatus'] = "starting" 1672 for e in self.state[key].get('experimentID',[]): 1673 if not expid and e.has_key('fedid'): 1674 expid = e['fedid'] 1675 elif not eid and e.has_key('localname'): 1676 eid = e['localname'] 1677 if 'experimentAccess' in self.state[key] and \ 1678 'X509' in self.state[key]['experimentAccess']: 1679 expcert = self.state[key]['experimentAccess']['X509'] 1680 else: 1681 expcert = None 1651 if key in self.state: 1652 exp = self.state[key] 1653 exp.status = "starting" 1654 expid = exp.fedid 1655 eid = exp.localname 1656 expcert = exp.identity 1682 1657 self.state_lock.release() 1683 1658 … … 1916 1891 1917 1892 self.state_lock.acquire() 1893 # XXX: this needs to be recalculated 1918 1894 if self.state.has_key(key): 1919 1895 if self.state[key].has_key('vtopo'): … … 1961 1937 1962 1938 self.state_lock.acquire() 1939 # XXX: this needs to be recalculated 1963 1940 if self.state.has_key(key): 1964 1941 if self.state[key].has_key('vis'): … … 1990 1967 response message. 1991 1968 """ 1969 self.state_lock.acquire() 1970 # XXX: 1971 #self.state[eid]['vtopo'] = vtopo 1972 #self.state[eid]['vis'] = vis 1973 exp = self.state[eid] 1974 exp.topology = top 1992 1975 # save federant information 1993 1976 for k in allocated.keys(): 1994 tbparams[k]['federant'] = { 1995 'name': [ { 'localname' : eid} ], 1996 'allocID' : tbparams[k]['allocID'], 1997 'uri': tbparams[k]['uri'], 1998 'proof': tbparams[k]['proof'], 1999 } 2000 2001 self.state_lock.acquire() 2002 self.state[eid]['vtopo'] = vtopo 2003 self.state[eid]['vis'] = vis 2004 self.state[eid]['experimentdescription'] = \ 2005 { 'topdldescription': top.to_dict() } 2006 self.state[eid]['federant'] = \ 2007 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2008 if tbparams[tb].has_key('federant') ] 1977 exp.add_allocation( 1978 allocation_info( 1979 tb=k, 1980 allocID=tbparams[k]['allocID'].get('fedid', None), 1981 uri=tbparams[k]['uri'], 1982 proof=tbparams[k]['proof']) 1983 ) 1984 2009 1985 # Access proofs for the response message 2010 1986 proofs = [copy.deepcopy(p) for k in tbparams.keys()\ 2011 for p in tbparams[k][' federant']['proof']]1987 for p in tbparams[k]['proof']] 2012 1988 if self.state_filename: 2013 1989 self.write_state() … … 2154 2130 # to the main log file. 2155 2131 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2156 alloc_collector = self.list_log(self.state[eid] ['log'])2132 alloc_collector = self.list_log(self.state[eid].log) 2157 2133 h = logging.StreamHandler(alloc_collector) 2158 2134 # XXX: there should be a global one of these rather than repeating the … … 2187 2163 rv = None 2188 2164 self.state_lock.acquire() 2189 if self.state.has_key(key): 2190 if isinstance(self.state[key], dict): 2191 try: 2192 kl = [ f['fedid'] for f in \ 2193 self.state[key]['experimentID']\ 2194 if f.has_key('fedid') ] 2195 except KeyError: 2196 self.state_lock.release() 2197 raise service_error(service_error.internal, 2198 "No fedid for experiment %s when getting "+\ 2199 "fedid(!?)" % key) 2200 if len(kl) == 1: 2201 rv = kl[0] 2202 else: 2203 self.state_lock.release() 2204 raise service_error(service_error.internal, 2205 "multiple fedids for experiment %s when " +\ 2206 "getting fedid(!?)" % key) 2207 else: 2208 self.state_lock.release() 2209 raise service_error(service_error.internal, 2210 "Unexpected state for %s" % key) 2165 if key in self.state: 2166 rv = self.state[key].fedid 2211 2167 self.state_lock.release() 2212 2168 return rv … … 2242 2198 return (None, None) 2243 2199 2244 def clean_info_response(self, rv, proof): 2245 """ 2246 Remove the information in the experiment's state object that is not in 2247 the info response. 2248 """ 2249 # Remove the owner info (should always be there, but...) 2250 if rv.has_key('owner'): del rv['owner'] 2251 if 'auth' in rv: del rv['auth'] 2252 2253 # Convert the log into the allocationLog parameter and remove the 2254 # log entry (with defensive programming) 2255 if rv.has_key('log'): 2256 rv['allocationLog'] = "".join(rv['log']) 2257 del rv['log'] 2258 else: 2259 rv['allocationLog'] = "" 2260 2261 if rv['experimentStatus'] != 'active': 2262 if rv.has_key('federant'): del rv['federant'] 2263 else: 2264 # remove the allocationID and uri info from each federant 2265 for f in rv.get('federant', []): 2266 if f.has_key('allocID'): del f['allocID'] 2267 if f.has_key('uri'): del f['uri'] 2268 rv['proof'] = proof.to_dict() 2269 2270 return rv 2271 2200 2272 2201 def get_info(self, req, fid): 2273 2202 """ … … 2295 2224 proof = self.check_experiment_access(fid, key) 2296 2225 2297 # The state may be massaged by the service function that called2298 # get_info (e.g., encoded for XMLRPC transport) so send a copy of the2299 # state.2300 2226 self.state_lock.acquire() 2301 2227 if self.state.has_key(key): 2302 rv = copy.deepcopy(self.state[key])2228 rv = self.state[key].get_info() 2303 2229 self.state_lock.release() 2304 2230 2305 2231 if rv: 2306 return self.clean_info_response(rv, proof) 2232 rv['proof'] = proof.to_dict() 2233 return rv 2307 2234 else: 2308 2235 raise service_error(service_error.req, "No such experiment") … … 2326 2253 2327 2254 if self.state.has_key(key): 2328 e = copy.deepcopy(self.state[key])2329 e = self.clean_info_response(e, proof)2255 e = self.state[key].get_info() 2256 e['proof'] = proof.to_dict() 2330 2257 rv['info'].append(e) 2331 2258 rv['proof'].append(proof.to_dict()) … … 2340 2267 """ 2341 2268 self.state_lock.acquire() 2342 status = fed_exp. get('experimentStatus', None)2269 status = fed_exp.status 2343 2270 2344 2271 if status: … … 2364 2291 term_params = { } 2365 2292 self.state_lock.acquire() 2366 # experimentID is a list of dicts that are self-describing 2367 # identifiers. This finds all the fedids and localnames - the 2368 # keys of self.state - and puts them into ids, which is used to delete 2369 # the state after everything is swapped out. 2370 for id in fed_exp.get('experimentID', []): 2371 if 'fedid' in id: 2372 ids.append(id['fedid']) 2373 repo = "%s" % id['fedid'] 2374 if 'localname' in id: ids.append(id['localname']) 2375 2376 # Get the experimentAccess - the principal for this experiment. It 2377 # is this principal to which credentials have been delegated, and 2378 # as which the experiment controller must act. 2379 if 'experimentAccess' in fed_exp and \ 2380 'X509' in fed_exp['experimentAccess']: 2381 expcert = fed_exp['experimentAccess']['X509'] 2382 else: 2383 expcert = None 2293 ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ] 2294 expcert = fed_exp.identity 2295 repo = "%s" % fed_exp.fedid 2384 2296 2385 2297 # Collect the allocation/segment ids into a dict keyed by the fedid 2386 # of the allocation (or a monotonically increasing integer) that 2387 # contains a tuple of uri, aid (which is a dict...) 2388 for i, fed in enumerate(fed_exp.get('federant', [])): 2389 try: 2390 uri = fed['uri'] 2391 aid = fed['allocID'] 2392 k = fed['allocID'].get('fedid', i) 2393 except KeyError, e: 2394 continue 2395 term_params[k] = (uri, aid) 2298 # of the allocation that contains a tuple of uri, aid 2299 for i, fed in enumerate(fed_exp.get_all_allocations()): 2300 uri = fed.uri 2301 aid = fed.allocID 2302 term_params[aid] = (uri, aid) 2396 2303 # Change the experiment state 2397 fed_exp ['experimentStatus']= 'terminating'2304 fed_exp.status = 'terminating' 2398 2305 if self.state_filename: self.write_state() 2399 2306 self.state_lock.release() -
fedd/federation/topdl.py
r2ac64d1a r29d5f7c 168 168 for i in self.importer],'') 169 169 if self.param: 170 rv += join( '<param>%s</param>' % p.to_xml() \170 rv += join(['<param>%s</param>' % p.to_xml() \ 171 171 for p in self.param], '') 172 172 if self.description is not None: … … 206 206 localname = [ ln for ln in self.localname], 207 207 status = self.status, 208 service = [ s.clone() for s in self.service] 209 operation=[ op for op in operation])208 service = [ s.clone() for s in self.service], 209 operation=[ op for op in self.operation]) 210 210 211 211 def to_dict(self): … … 238 238 for a in self.attribute], "") 239 239 if self.localname: 240 rv += join( '<localname>%s</localname>' % ln \240 rv += join(['<localname>%s</localname>' % ln \ 241 241 for ln in self.localname], '') 242 242 if self.status is not None: … … 489 489 interface=[x.clone() for x in self.interface], 490 490 attribute=[x.clone() for x in self.attribute], 491 localname =[ ln for forln in self.localname],491 localname =[ ln for ln in self.localname], 492 492 status = self.status, 493 493 service = [s.clone() for s in self.service], … … 539 539 for a in self.attribute], "") 540 540 if self.localname: 541 rv += join( '<localname>%s</localname>' % ln \541 rv += join(['<localname>%s</localname>' % ln \ 542 542 for ln in self.localname], '') 543 543 if self.status is not None: … … 607 607 for a in self.attribute], "") 608 608 if self.localname: 609 rv += join( '<localname>%s</localname>' % ln \609 rv += join(['<localname>%s</localname>' % ln \ 610 610 for ln in self.localname], '') 611 611 if self.status is not None:
Note: See TracChangeset
for help on using the changeset viewer.