- Timestamp:
- Feb 28, 2010 12:31:25 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 109a32a
- Parents:
- ef252e9
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/emulab_access.py
ref252e9 r2761484 152 152 } 153 153 154 self.call_SetValue = service_caller('SetValue') 155 self.call_GetValue = service_caller('GetValue') 154 156 155 157 if not config.has_option("allocate", "uri"): … … 1199 1201 "Cannot write experiment file %s: %s" % (expfn,e)) 1200 1202 1203 def export_store_info(self, cf, proj, ename, connInfo): 1204 """ 1205 For the export requests in the connection info, install the peer names 1206 at the experiment controller via SetValue calls. 1207 """ 1208 1209 for c in connInfo: 1210 for p in [ p for p in c.get('parameter', []) \ 1211 if p.get('type', '') == 'output']: 1212 1213 if p.get('name', '') != 'peer': 1214 self.log.error("Unknown export parameter: %s" % \ 1215 p.get('name')) 1216 continue 1217 1218 k = p.get('key', None) 1219 surl = p.get('store', None) 1220 if surl and k and k.index('/') != -1: 1221 value = "%s.%s.%s%s" % \ 1222 (k[k.index('/')+1:], ename, proj, self.domain) 1223 req = { 'name': k, 'value': value } 1224 self.call_SetValue(surl, req, cf) 1225 else: 1226 self.log.error("Bad export request: %s" % p) 1227 1228 def import_store_info(self, cf, connInfo): 1229 """ 1230 Pull any import parameters in connInfo in. We translate them either 1231 into known member names or fedAddrs. 1232 """ 1233 1234 for c in connInfo: 1235 for p in [ p for p in c.get('parameter', []) \ 1236 if p.get('type', '') == 'input']: 1237 name = p.get('name', None) 1238 key = p.get('key', None) 1239 store = p.get('store', None) 1240 1241 if name and key and store : 1242 req = { 'name': key, 'wait': True } 1243 r = self.call_GetValue(store, req, cf) 1244 r = r.get('GetValueResponseBody', None) 1245 if r : 1246 if r.get('name', '') == key: 1247 v = r.get('value', None) 1248 if v is not None: 1249 if name == 'peer': 1250 c['peer'] = v 1251 else: 1252 if c.has_key('fedAttr'): 1253 c['fedAttr'].append({ 1254 'attribute': name, 'value': value}) 1255 else: 1256 c['fedAttr']= [{ 1257 'attribute': name, 'value': value}] 1258 else: 1259 raise service_error(service_error.internal, 1260 'None value exported for %s' % key) 1261 else: 1262 raise service_error(service_error.internal, 1263 'Different name returned for %s: %s' \ 1264 % (key, r.get('name',''))) 1265 else: 1266 raise service_error(service_error.internal, 1267 'Badly formatted response: no GetValueResponseBody') 1268 else: 1269 raise service_error(service_error.internal, 1270 'Bad Services missing info for import %s' % c) 1271 1201 1272 def StartSegment(self, req, fid): 1202 1273 def get_url(url, cf, destdir, fn=None): … … 1251 1322 raise service_error(service_error.req, 1252 1323 "Request missing segmentdescription'") 1253 1324 1254 1325 master = req.get('master', False) 1255 1326 … … 1353 1424 "Can't find creation user for %s" %aid) 1354 1425 1426 self.export_store_info(certfile, proj, ename, connInfo) 1427 self.import_store_info(certfile, connInfo) 1428 1355 1429 expfile = "%s/experiment.tcl" % tmpdir 1356 1430 -
fedd/federation/experiment_control.py
ref252e9 r2761484 28 28 from remote_service import xmlrpc_handler, soap_handler, service_caller 29 29 from service_error import service_error 30 from synch_store import synch_store 30 31 31 32 import topdl … … 239 240 self.state_filename = config.get("experiment_control", 240 241 "experiment_state") 242 self.store_filename = config.get("experiment_control", 243 "synch_store") 244 self.store_url = config.get("experiment_control", "store_url") 241 245 self.splitter_url = config.get("experiment_control", "splitter_uri") 242 246 self.fedkit = parse_tarfile_list(\ … … 332 336 self.read_state() 333 337 338 if self.store_filename: 339 self.read_store() 340 else: 341 self.log.warning("No saved synch store") 342 self.synch_store = synch_store 343 334 344 # Dispatch tables 335 345 self.soap_services = {\ … … 342 352 'Terminate': soap_handler('Terminate', 343 353 self.terminate_experiment), 354 'GetValue': soap_handler('GetValue', self.GetValue), 355 'SetValue': soap_handler('SetValue', self.SetValue), 344 356 } 345 357 … … 353 365 'Terminate': xmlrpc_handler('Terminate', 354 366 self.terminate_experiment), 367 'GetValue': xmlrpc_handler('GetValue', self.GetValue), 368 'SetValue': xmlrpc_handler('SetValue', self.SetValue), 355 369 } 356 370 … … 377 391 self.log.error("Pickling problem (TypeError): %s" % e) 378 392 393 @staticmethod 394 def get_alloc_ids(state): 395 """ 396 Pull the fedids of the identifiers of each allocation from the 397 state. Again, a dict dive that's best isolated. 398 399 Used by read_store and read state 400 """ 401 402 return [ f['allocID']['fedid'] 403 for f in state.get('federant',[]) \ 404 if f.has_key('allocID') and \ 405 f['allocID'].has_key('fedid')] 406 379 407 # Call while holding self.state_lock 380 408 def read_state(self): … … 399 427 else: 400 428 return None 401 402 def get_alloc_ids(state):403 """404 Pull the fedids of the identifiers of each allocation from the405 state. Again, a dict dive that's best isolated.406 """407 408 return [ f['allocID']['fedid']409 for f in state.get('federant',[]) \410 if f.has_key('allocID') and \411 f['allocID'].has_key('fedid')]412 413 429 414 430 try: … … 438 454 # Set permissions to allow reading of the software repo, if 439 455 # any, as well. 440 for a in get_alloc_ids(s):456 for a in self.get_alloc_ids(s): 441 457 self.auth.set_attribute(a, 'repo/%s' % eid) 442 458 else: … … 529 545 "open %s: %s" % (file, e)) 530 546 f.close() 547 548 def read_store(self): 549 try: 550 self.synch_store = synch_store() 551 self.synch_store.load(self.store_filename) 552 self.log.debug("[read_store]: Read store from %s" % \ 553 self.store_filename) 554 except IOError, e: 555 self.log.warning("[read_store]: No saved store: Can't open %s: %s"\ 556 % (self.state_filename, e)) 557 self.synch_store = synch_store() 558 559 # Set the initial permissions on data in the store. XXX: This ad hoc 560 # authorization attribute initialization is getting out of hand. 561 for k in self.synch_store.all_keys(): 562 try: 563 if k.startswith('fedid:'): 564 fid = fedid(hexstr=k[6:46]) 565 if self.state.has_key(fid): 566 for a in self.get_alloc_ids(self.state[fid]): 567 self.auth.set_attribute(a, k) 568 except ValueError, e: 569 self.log.warn("Cannot deduce permissions for %s" % k) 570 571 572 def write_store(self): 573 """ 574 Write a new copy of synch_store after writing current state 575 to a backup. We use the internal synch_store pickle method to avoid 576 incinsistent data. 577 578 State format is a simple pickling of the store. 579 """ 580 if os.access(self.store_filename, os.W_OK): 581 copy_file(self.store_filename, \ 582 "%s.bak" % self.store_filename) 583 try: 584 self.synch_store.save(self.store_filename) 585 except IOError, e: 586 self.log.error("Can't write file %s: %s" % \ 587 (self.store_filename, e)) 588 except TypeError, e: 589 self.log.error("Pickling problem (TypeError): %s" % e) 590 531 591 532 592 def generate_ssh_keys(self, dest, type="rsa" ): … … 1324 1384 1325 1385 def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost, 1326 portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[]): 1386 portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 1387 expid=None): 1327 1388 """ 1328 1389 Return a new internet portal node and a dict with the connectionInfo to … … 1355 1416 ifaces.append(inf) 1356 1417 if conn_type == "ssh": 1418 try: 1419 aid = tbparams[st]['allocID']['fedid'] 1420 except: 1421 self.log.debug("Can't get allocation id?") 1422 aid = None 1357 1423 info = { 1358 1424 "type" : conn_type, 1359 1425 "portal": myname, 1360 'peer': desthost,1361 1426 'fedAttr': [ 1362 1427 { 'attribute': 'masterdomain', 'value': mdomain}, … … 1368 1433 { 'attribute': 'smbshare', 'value': smbshare}, 1369 1434 ], 1435 'parameter': [ 1436 { 1437 'name': 'peer', 1438 'key': 'fedid:%s/%s' % (expid, myname), 1439 'store': self.store_url, 1440 'type': 'output', 1441 }, 1442 { 1443 'name': 'peer', 1444 'key': 'fedid:%s/%s' % (expid, desthost), 1445 'store': self.store_url, 1446 'type': 'input', 1447 }, 1448 ] 1370 1449 } 1450 # Give this allocation the rights to access the key of the 1451 # peers 1452 if aid: 1453 for h in (myname, desthost): 1454 self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h)) 1455 else: 1456 self.log.error("No aid for %s in new_portal_node" % st) 1371 1457 else: 1372 1458 info = None … … 1384 1470 ), info) 1385 1471 1386 def new_portal_substrate(self, st, dt, eid, tbparams ):1472 def new_portal_substrate(self, st, dt, eid, tbparams, expid): 1387 1473 ddomain = tbparams[dt].get('domain', ".example.com") 1388 1474 dproject = tbparams[dt].get('project', 'project') … … 1414 1500 return (tsubstrate, segment_element) 1415 1501 1416 def new_dragon_topo(self, idx, sub, topo, tbs, tbparams ):1502 def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid): 1417 1503 if sub.capacity is None: 1418 1504 raise service_error(service_error.internal, … … 1424 1510 for n, v, in (\ 1425 1511 ('vlan', 'unassigned%d' % idx),)]) 1512 name = "dragon%d" % idx 1426 1513 for tb in tbs.keys(): 1427 1514 seg = topdl.Segment( … … 1442 1529 segs.append(seg) 1443 1530 1444 topo["dragon%d" %idx] = \ 1531 1532 try: 1533 aid = tbparams[tb]['allocID']['fedid'] 1534 except: 1535 self.log.debug("Can't get allocation id?") 1536 aid = None 1537 connInfo[name] = [ { 1538 'parameter': [ { 1539 'name': 'vlan_id', 1540 'key': "fedid:%s/vlan%d" % (expid, idx), 1541 'store': self.store_url, 1542 'type': 'output' 1543 } ] 1544 } ] 1545 1546 # Give this allocation the rights to access the key of the 1547 # vlan_id 1548 if aid: 1549 self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, idx)) 1550 1551 topo[name] = \ 1445 1552 topdl.Topology(substrates=[substr], elements=segs, 1446 1553 attribute=[ … … 1452 1559 1453 1560 def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid, 1454 connInfo, peer={ } ):1561 connInfo, peer={ }, expid=None): 1455 1562 """ 1456 1563 Add attribiutes to the various elements indicating that they are to be … … 1486 1593 if not connInfo.has_key(tb): 1487 1594 connInfo[tb] = [ ] 1595 1596 try: 1597 aid = tbparams[tb]['allocID']['fedid'] 1598 except: 1599 self.log.debug("Can't get allocation id?") 1600 aid = None 1488 1601 1489 1602 # This may need another look, but only a service gateway will … … 1506 1619 { 'attribute': 'active', 'value': active}, 1507 1620 ], 1621 'parameter': [ { 1622 'name': 'vlan_id', 1623 'key': 'fedid:%s/vlan%d' % (expid, dn), 1624 'store': self.store_url, 1625 'type': 'input', 1626 } ] 1508 1627 } 1509 1628 if peer.has_key(tb): 1510 1629 info['peer'] = peer[tb] 1511 1630 connInfo[tb].append(info) 1631 1632 # Give this allocation the rights to access the key of the 1633 # vlan_id 1634 if aid: 1635 self.auth.set_attribute(aid, 1636 'fedid:%s/vlan%d' % (expid, dn)) 1512 1637 else: 1513 1638 raise service_error(service_error.internal, … … 1517 1642 1518 1643 def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid, 1519 segment_substrate, portals, connInfo ):1644 segment_substrate, portals, connInfo, expid): 1520 1645 # More than one testbed is on this substrate. Insert 1521 1646 # some portals into the subtopologies. st == source testbed, … … 1546 1671 # testbed in there. 1547 1672 tsubstrate, segment_element = \ 1548 self.new_portal_substrate(st, dt, eid, tbparams) 1673 self.new_portal_substrate(st, dt, eid, tbparams, 1674 expid) 1549 1675 segment_substrate[st][dt] = tsubstrate 1550 1676 topo[st].substrates.append(tsubstrate) … … 1567 1693 portal_type = "experiment" 1568 1694 myname = "%stunnel%d" % (dt, len(portals[st][dt])) 1569 desthost = "%stunnel%d.%s.%s%s" % (st, 1570 len(portals[st][dt]), eid.lower(), 1571 dproject.lower(), ddomain.lower()) 1695 desthost = "%stunnel%d" % (st.lower(), 1696 len(portals[st][dt])) 1572 1697 else: 1573 1698 new_i = topdl.Interface( … … 1586 1711 portals[st][dt] = [ ] 1587 1712 myname = "%stunnel%d" % (dt, len(portals[st][dt])) 1588 desthost = "%stunnel%d.%s.%s%s" % (st.lower(), 1589 len(portals[st][dt]), eid.lower(), 1590 dproject.lower(), ddomain.lower()) 1713 desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt])) 1591 1714 1592 1715 if dt == master or st == master: portal_type = "both" … … 1602 1725 portal, info = self.new_portal_node(st, dt, tbparams, 1603 1726 master, eid, myname, desthost, portal_type, 1604 infs )1727 infs, conn_type="ssh", conn_attrs=[], expid=expid) 1605 1728 if self.fedkit: 1606 1729 self.add_kit(portal, self.fedkit) … … 1612 1735 connInfo[st].append(info) 1613 1736 1614 def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo ):1737 def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid): 1615 1738 # Add to the master testbed 1616 1739 tsubstrate, segment_element = \ 1617 self.new_portal_substrate(st, dt, eid, tbparams )1740 self.new_portal_substrate(st, dt, eid, tbparams, expid) 1618 1741 myname = "%stunnel" % dt 1619 1742 desthost = "%stunnel" % st … … 1621 1744 portal, info = self.new_portal_node(st, dt, tbparams, master, 1622 1745 eid, myname, desthost, "control", 1623 ((tsubstrate.name,(('portal','true'),)),)) 1746 ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh", 1747 conn_attrs=[], expid=expid) 1624 1748 if self.fedkit: 1625 1749 self.add_kit(portal, self.fedkit) … … 1635 1759 1636 1760 def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 1637 substrate, tbparams ):1761 substrate, tbparams, expid): 1638 1762 # Add to the master testbed 1639 1763 myname = "%stunnel" % dt … … 1645 1769 ('portal','true'), 1646 1770 ('ip4_address', "%s" % ip_addr(myip)),)),), 1647 conn_type="transit" )1771 conn_type="transit", conn_attrs=[], expid=expid) 1648 1772 if self.fedkit: 1649 1773 self.add_kit(portal, self.fedkit) … … 1654 1778 1655 1779 def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 1656 connInfo ):1780 connInfo, expid): 1657 1781 """ 1658 1782 For each substrate in the main topology, find those that … … 1680 1804 all([tbparams[x].has_key('dragon') for x in tbs]): 1681 1805 self.create_dragon_substrate(s, topo, tbs, tbparams, 1682 master, eid, connInfo )1806 master, eid, connInfo, expid) 1683 1807 else: 1684 1808 self.insert_internet_portals(s, topo, tbs, tbparams, master, 1685 eid, segment_substrate, portals, connInfo )1809 eid, segment_substrate, portals, connInfo, expid) 1686 1810 1687 1811 # Make sure that all the slaves have a control portal back to the … … 1733 1857 ) 1734 1858 portal = self.new_dragon_portal(tb, master, 1735 master, eid, dip, mip, idx, csub, tbparams )1859 master, eid, dip, mip, idx, csub, tbparams, expid) 1736 1860 topo[tb].substrates.append(csub) 1737 1861 topo[tb].elements.append(portal) … … 1760 1884 ) 1761 1885 portal = self.new_dragon_portal(master, tb, master, 1762 eid, mip, dip, idx, mcsub, tbparams )1886 eid, mip, dip, idx, mcsub, tbparams, expid) 1763 1887 topo[master].substrates.append(mcsub) 1764 1888 topo[master].elements.append(portal) … … 1769 1893 self.create_dragon_substrate(csub, topo, 1770 1894 {tb: 1, master:1}, tbparams, master, eid, connInfo, 1771 {tb: ip_addr(mip), master: ip_addr(dip)} )1895 {tb: ip_addr(mip), master: ip_addr(dip)}, expid) 1772 1896 else: 1773 1897 self.add_control_portal(master, tb, master, eid, topo, 1774 tbparams, connInfo )1898 tbparams, connInfo, expid) 1775 1899 self.add_control_portal(tb, master, master, eid, topo, 1776 tbparams, connInfo )1900 tbparams, connInfo, expid) 1777 1901 1778 1902 # Connect the portal nodes into the topologies and clear out … … 2103 2227 2104 2228 self.add_portals(top, topo, eid, master, tbparams, ip_allocator, 2105 connInfo )2229 connInfo, expid) 2106 2230 # Now get access to the dynamic testbeds 2107 2231 for k, t in topo.items(): … … 2585 2709 self.state_lock.release() 2586 2710 2711 # Delete any synch points associated with this experiment. All 2712 # synch points begin with the fedid of the experiment. 2713 fedid_keys = set(["fedid:%s" % f for f in ids \ 2714 if isinstance(f, fedid)]) 2715 for k in self.synch_store.all_keys(): 2716 try: 2717 if len(k) > 45 and k[0:46] in fedid_keys: 2718 self.synch_store.del_value(k) 2719 except synch_store.BadDeleteionError: 2720 pass 2721 self.write_store() 2722 2587 2723 return { 2588 2724 'experiment': exp , … … 2593 2729 self.state_lock.release() 2594 2730 raise service_error(service_error.req, "No saved state") 2731 2732 2733 def GetValue(self, req, fid): 2734 """ 2735 Get a value from the synchronized store 2736 """ 2737 req = req.get('GetValueRequestBody', None) 2738 if not req: 2739 raise service_error(service_error.req, 2740 "Bad request format (no GetValueRequestBody)") 2741 2742 name = req['name'] 2743 wait = req['wait'] 2744 rv = { 'name': name } 2745 2746 if self.auth.check_attribute(fid, name): 2747 v = self.synch_store.get_value(name, wait) 2748 if v is not None: 2749 rv['value'] = v 2750 return rv 2751 else: 2752 raise service_error(service_error.access, "Access Denied") 2753 2754 2755 def SetValue(self, req, fid): 2756 """ 2757 Set a value in the synchronized store 2758 """ 2759 req = req.get('SetValueRequestBody', None) 2760 if not req: 2761 raise service_error(service_error.req, 2762 "Bad request format (no SetValueRequestBody)") 2763 2764 name = req['name'] 2765 v = req['value'] 2766 2767 if self.auth.check_attribute(fid, name): 2768 try: 2769 self.synch_store.set_value(name, v) 2770 self.write_store() 2771 except synch_store.CollisionError: 2772 # Translate into a service_error 2773 raise service_error(service_error.req, 2774 "Value already set: %s" %name) 2775 return { 'name': name, 'value': v } 2776 else: 2777 raise service_error(service_error.access, "Access Denied")
Note: See TracChangeset
for help on using the changeset viewer.