Changeset 109a32a for fedd/federation
- Timestamp:
- Mar 1, 2010 4:38:17 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 8353ac6
- Parents:
- 2761484
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r2761484 r109a32a 1031 1031 tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 1032 1032 attrs=None, connInfo={}, services=[]): 1033 def get_vlan(r):1034 if r.has_key('StartSegmentResponseBody'):1035 srb = r['StartSegmentResponseBody']1036 if srb.has_key('fedAttr'):1037 for k, v in [ (a['attribute'], a['value']) \1038 for a in srb['fedAttr']]:1039 if k == 'vlan': return v1040 return None1041 1033 1042 1034 started = { } # Testbeds where a sub-experiment started … … 1046 1038 fail_soft = False 1047 1039 1048 non_transit = [ k for k in allocated.keys() \1049 if not topo[k].get_attribute('transit')]1050 transit = [ k for k in allocated.keys() \1051 if topo[k].get_attribute('transit')]1052 1053 1040 log = alloc_log or self.log 1054 1041 … … 1056 1043 threads = [ ] 1057 1044 1058 for tb in transit: 1059 uri = tbparams[tb]['uri'] 1045 for tb in allocated.keys(): 1046 # Create and start a thread to start the segment, and save it 1047 # to get the return value later 1048 thread_pool.wait_for_slot() 1049 uri = tbparams[tb].get('uri', self.tbmap.get(tb, None)) 1050 if not uri: 1051 raise service_error(service_error.internal, 1052 "Unknown testbed %s !?" % tb) 1053 1060 1054 if tbparams[tb].has_key('allocID') and \ 1061 1055 tbparams[tb]['allocID'].has_key('fedid'): … … 1065 1059 "No alloc id for testbed %s !?" % tb) 1066 1060 1067 m = re.search('(\d+)', tb) 1068 if m: 1069 to_repl = "unassigned%s" % m.group(1) 1070 else: 1071 raise service_error(service_error.internal, 1072 "Bad dynamic allocation name") 1073 break 1074 1075 ss = self.start_segment(log=log, debug=self.debug, 1076 testbed=tb, cert_file=self.cert_file, 1077 cert_pwd=self.cert_pwd, 1078 trusted_certs=self.trusted_certs, 1079 caller=self.call_StartSegment, 1080 log_collector=log_collector) 1081 t = self.pooled_thread( 1082 target=ss, 1083 args =(uri, aid, topo[tb], False, attrs, connInfo[tb], 1084 services), 1085 name=tb, pdata=thread_pool, trace_file=self.trace_file) 1061 t = self.pooled_thread(\ 1062 target=self.start_segment(log=log, debug=self.debug, 1063 testbed=tb, cert_file=self.cert_file, 1064 cert_pwd=self.cert_pwd, 1065 trusted_certs=self.trusted_certs, 1066 caller=self.call_StartSegment, 1067 log_collector=log_collector), 1068 args=(uri, aid, topo[tb], tb == master, 1069 attrs, connInfo[tb], services), 1070 name=tb, 1071 pdata=thread_pool, trace_file=self.trace_file) 1086 1072 threads.append(t) 1087 1073 t.start() 1088 # Wait until the this transit node finishes (keep pinging the log, 1089 # though) 1090 1091 mins = 0 1092 while not thread_pool.wait_for_all_done(60.0): 1093 mins += 1 1094 alloc_log.info("Waiting for transit (it has been %d mins)" \ 1095 % mins) 1096 1097 if t.rv: 1098 vlan = get_vlan(ss.response) 1099 if vlan is not None: 1100 for v in connInfo.values(): 1101 for i in v: 1102 for a in i.get('fedAttr', []): 1103 if a.get('attribute', "") == 'vlan_id' and \ 1104 a.get('value', "") == to_repl: 1105 a['value'] = vlan 1106 else: 1107 break 1108 thread_pool.clear() 1109 1110 1111 failed = [ t.getName() for t in threads if not t.rv ] 1112 1113 if len(failed) == 0: 1114 for tb in non_transit: 1115 # Create and start a thread to start the segment, and save it 1116 # to get the return value later 1117 thread_pool.wait_for_slot() 1118 uri = self.tbmap.get(tb, None) 1119 if not uri: 1120 raise service_error(service_error.internal, 1121 "Unknown testbed %s !?" % tb) 1122 1123 if tbparams[tb].has_key('allocID') and \ 1124 tbparams[tb]['allocID'].has_key('fedid'): 1125 aid = tbparams[tb]['allocID']['fedid'] 1126 else: 1127 raise service_error(service_error.internal, 1128 "No alloc id for testbed %s !?" % tb) 1129 1130 t = self.pooled_thread(\ 1131 target=self.start_segment(log=log, debug=self.debug, 1132 testbed=tb, cert_file=self.cert_file, 1133 cert_pwd=self.cert_pwd, 1134 trusted_certs=self.trusted_certs, 1135 caller=self.call_StartSegment, 1136 log_collector=log_collector), 1137 args=(uri, aid, topo[tb], tb == master, 1138 attrs, connInfo[tb], services), 1139 name=tb, 1140 pdata=thread_pool, trace_file=self.trace_file) 1141 threads.append(t) 1142 t.start() 1143 1144 # Wait until all finish (keep pinging the log, though) 1145 mins = 0 1146 while not thread_pool.wait_for_all_done(60.0): 1147 mins += 1 1148 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ 1149 % mins) 1150 1151 thread_pool.clear() 1074 1075 # Wait until all finish (keep pinging the log, though) 1076 mins = 0 1077 while not thread_pool.wait_for_all_done(60.0): 1078 mins += 1 1079 alloc_log.info("Waiting for sub threads (it has been %d mins)" \ 1080 % mins) 1081 1082 thread_pool.clear() 1152 1083 1153 1084 failed = [ t.getName() for t in threads if not t.rv ] … … 1419 1350 aid = tbparams[st]['allocID']['fedid'] 1420 1351 except: 1421 self.log.debug("Can't get allocation id?") 1352 self.log.debug("[new_portal_node] Can't get alloc id for %s?" \ 1353 % st) 1422 1354 aid = None 1423 1355 info = { … … 1511 1443 ('vlan', 'unassigned%d' % idx),)]) 1512 1444 name = "dragon%d" % idx 1445 store_key = 'fedid:%s/vlan%d' % (expid, idx) 1513 1446 for tb in tbs.keys(): 1514 1447 seg = topdl.Segment( … … 1529 1462 segs.append(seg) 1530 1463 1531 1532 try: 1533 aid = tbparams[tb]['allocID']['fedid'] 1534 except: 1535 self.log.debug("Can't get allocation id?") 1536 aid = None 1464 # Give this allocation the rights to access the key of the 1465 # vlan_id 1466 try: 1467 aid = tbparams[tb]['allocID']['fedid'] 1468 self.auth.set_attribute(aid, store_key) 1469 except: 1470 self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\ 1471 % tb) 1472 1537 1473 connInfo[name] = [ { 1474 'type': 'transit', 1538 1475 'parameter': [ { 1539 1476 'name': 'vlan_id', 1540 'key': "fedid:%s/vlan%d" % (expid, idx),1477 'key': store_key, 1541 1478 'store': self.store_url, 1542 1479 'type': 'output' 1543 1480 } ] 1544 1481 } ] 1545 1546 # Give this allocation the rights to access the key of the1547 # vlan_id1548 if aid:1549 self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, idx))1550 1482 1551 1483 topo[name] = \ … … 1554 1486 topdl.Attribute(attribute="transit", value='true'), 1555 1487 topdl.Attribute(attribute="dynamic", value='true'), 1556 topdl.Attribute(attribute="testbed", value='dragon'), 1488 topdl.Attribute(attribute="testbed", 1489 value='dragon'), 1490 topdl.Attribute(attribute="store_keys", 1491 value=store_key), 1557 1492 ] 1558 1493 ) 1559 1494 1560 1495 def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid, 1561 connInfo, peer={ },expid=None):1496 connInfo, expid=None): 1562 1497 """ 1563 1498 Add attribiutes to the various elements indicating that they are to be … … 1570 1505 if s.name == name: return s 1571 1506 else: return None 1507 1572 1508 1573 1509 mdomain = tbparams[master].get('domain', '.example.com') … … 1597 1533 aid = tbparams[tb]['allocID']['fedid'] 1598 1534 except: 1599 self.log.debug("Can't get allocation id?") 1535 self.log.debug("[creat_dragon_substrate] " + 1536 "Can't get alloc id for %s?" %tb) 1600 1537 aid = None 1601 1538 … … 1612 1549 if isinstance(i.element, topdl.Computer) ], 1613 1550 'fedAttr': [ 1614 { 'attribute': 'vlan_id',1615 'value': 'unassigned%d' % dn },1616 1551 { 'attribute': 'masterdomain', 'value': mdomain}, 1617 1552 { 'attribute': 'masterexperiment', 'value': … … 1626 1561 } ] 1627 1562 } 1628 if peer.has_key(tb):1629 info['peer'] = peer[tb]1563 if tbs.has_key(tb): 1564 info['peer'] = tbs[tb] 1630 1565 connInfo[tb].append(info) 1631 1566 … … 1639 1574 "No substrate %s in testbed %s" % (sub.name, tb)) 1640 1575 1641 self.new_dragon_topo(dn, sub, topo, tbs, tbparams )1576 self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid) 1642 1577 1643 1578 def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid, … … 1892 1827 1893 1828 self.create_dragon_substrate(csub, topo, 1894 {tb: 1, master:1}, tbparams, master, eid, connInfo, 1895 {tb: ip_addr(mip), master: ip_addr(dip)}, expid) 1829 {tb: ip_addr(mip), master: ip_addr(dip)}, 1830 tbparams, master, eid, connInfo, 1831 expid) 1896 1832 else: 1897 1833 self.add_control_portal(master, tb, master, eid, topo, … … 2239 2175 del tbparams[tb] 2240 2176 allocated[k] = 1 2177 store_keys = t.get_attribute('store_keys') 2178 # Give the testbed access to keys it exports or imports 2179 if store_keys: 2180 for sk in store_keys.split(" "): 2181 self.auth.set_attribute(\ 2182 tbparams[k]['allocID']['fedid'], sk) 2241 2183 else: 2242 2184 raise service_error(service_error.internal, … … 2748 2690 if v is not None: 2749 2691 rv['value'] = v 2692 self.log.debug("[GetValue] got %s from %s" % (v, name)) 2750 2693 return rv 2751 2694 else: … … 2769 2712 self.synch_store.set_value(name, v) 2770 2713 self.write_store() 2714 self.log.debug("[SetValue] set %s to %s" % (name, v)) 2771 2715 except synch_store.CollisionError: 2772 2716 # Translate into a service_error
Note: See TracChangeset
for help on using the changeset viewer.