Changeset 43197eb for fedd/federation/experiment_control.py
- Timestamp:
- Apr 16, 2010 8:16:22 AM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 4f2f41f
- Parents:
- 0c4b12c
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r0c4b12c r43197eb 42 42 fl = logging.getLogger("fedd.experiment_control") 43 43 fl.addHandler(nullHandler()) 44 45 46 # Right now, no support for composition. 47 class federated_service: 48 def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]): 49 self.name=name 50 self.exporter=exporter 51 self.importers=importers 52 self.params = params 53 self.reqs = reqs 44 54 45 55 class experiment_control_local: … … 786 796 else: return None 787 797 788 def get_access(self, tb, nodes, tbparam, master, export_project, 789 access_user, services): 798 def get_access(self, tb, nodes, tbparam, access_user, masters): 790 799 """ 791 800 Get access to testbed through fedd and set the parameters for that tb 792 801 """ 802 def get_export_project(svcs): 803 """ 804 Look through for the list of federated_service for this testbed 805 objects for a project_export service, and extract the project 806 parameter. 807 """ 808 809 pe = [s for s in svcs if s.name=='project_export'] 810 if len(pe) == 1: 811 return pe[0].params.get('project', None) 812 elif len(pe) == 0: 813 return None 814 else: 815 raise service_error(service_error.req, 816 "More than one project export is not supported") 817 793 818 uri = self.tbmap.get(tb, None) 794 819 if not uri: 795 820 raise service_error(service_error.server_config, 796 821 "Unknown testbed: %s" % tb) 822 823 export_svcs = masters.get(tb,[]) 824 import_svcs = [ s for m in masters.values() \ 825 for s in m \ 826 if tb in s.importers ] 827 828 export_project = get_export_project(export_svcs) 797 829 798 830 # Tweak search order so that if there are entries in access_user that … … 822 854 'destinationTestbed' : { 'uri' : uri }, 823 855 'credential': [ 'user: %s' % u ], 824 'user': [ {'userID': { 'localname': u } } ],825 856 'allocID' : { 'localname': 'test' }, 826 857 } 827 858 828 # If there is a master, and this is it, ask it to export services 829 # XXX move this to export pseudo-service 830 if tb == master: 831 req['service'] = [ 832 { 'name': 'userconfig', 'visibility': 'export'}, 833 { 'name': 'SMB', 'visibility': 'export'}, 834 { 'name': 'seer', 'visibility': 'export'}, 835 { 'name': 'tmcd', 'visibility': 'export'}, 836 ] 859 # Make the service request from the services we're importing and 860 # exporting. Keep track of the export request ids so we can 861 # collect the resulting info from the access response. 862 e_keys = { } 863 if import_svcs or export_svcs: 864 req['service'] = [ ] 865 866 for i, s in enumerate(import_svcs): 867 idx = 'import%d' % i 868 sr = {'id': idx, 'name': s.name, 'visibility': 'import' } 869 if s.params: 870 sr['fedAttr'] = [ { 'attribute': k, 'value': v } \ 871 for k, v in s.params.items()] 872 req['service'].append(sr) 873 874 for i, s in enumerate(export_svcs): 875 idx = 'export%d' % i 876 e_keys[idx] = s 877 sr = {'id': idx, 'name': s.name, 'visibility': 'export' } 878 if s.params: 879 sr['fedAttr'] = [ { 'attribute': k, 'value': v } 880 for k, v in s.params.items()] 881 req['service'].append(sr) 837 882 838 883 # node resources if any … … 886 931 "uri": uri, 887 932 } 888 if 'service' in r: 889 for s in r['service']: 890 # Tag each service with the origin testbed 891 if s.has_key('fedAttr'): 892 # The else connects with the for 893 for a in s['fedAttr']: 894 if a.get('attribute', None) == 'testbed': 895 break 896 else: 897 s['fedAttr'].append({'attribute': 'testbed', 898 'value': tb}) 899 else: 900 s['fedAttr'] = [ {'attribute': 'testbed', 'value': tb} ] 901 services.extend(r['service']) 933 934 # Collect the responses corresponding to the services this testbed 935 # exports. These will be the service requests that we will include in 936 # the start segment requests (with appropriate visibility values) to 937 # import and export the segments. 938 for s in r.get('service', []): 939 id = s.get('id', None) 940 if id and id in e_keys: 941 e_keys[id].reqs.append(s) 902 942 903 943 # Add attributes to parameter space. We don't allow attributes to … … 968 1008 self.response = None 969 1009 970 def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None, 971 services=None): 1010 def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None): 972 1011 req = { 973 1012 'allocID': { 'fedid' : aid }, … … 975 1014 'topdldescription': topo.to_dict(), 976 1015 }, 977 'master': master, 1016 # XXX: deprecated 1017 'master': False, 978 1018 } 979 1019 980 1020 if connInfo: 981 1021 req['connection'] = connInfo 982 # Add services to request. The master exports, everyone else 983 # imports. 984 if services: 985 svcs = [ x.copy() for x in services] 986 for s in svcs: 987 if master: s['visibility'] = 'export' 988 else: s['visibility'] = 'import' 989 req['service'] = svcs 1022 1023 import_svcs = [ s for m in masters.values() \ 1024 for s in m if self.testbed in s.importers] 1025 1026 if import_svcs or self.testbed in masters: 1027 req['service'] = [] 1028 1029 for s in import_svcs: 1030 for r in s.reqs: 1031 sr = copy.deepcopy(r) 1032 sr['visibility'] = 'import'; 1033 req['service'].append(sr) 1034 1035 for s in masters.get(self.testbed, []): 1036 for r in s.reqs: 1037 sr = copy.deepcopy(r) 1038 sr['visibility'] = 'export'; 1039 req['service'].append(sr) 1040 990 1041 if attrs: 991 1042 req['fedAttr'] = attrs … … 1038 1089 1039 1090 1040 def allocate_resources(self, allocated, master , eid, expid,1091 def allocate_resources(self, allocated, masters, eid, expid, 1041 1092 tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 1042 attrs=None, connInfo={} , services=[]):1093 attrs=None, connInfo={}): 1043 1094 1044 1095 started = { } # Testbeds where a sub-experiment started … … 1076 1127 caller=self.call_StartSegment, 1077 1128 log_collector=log_collector), 1078 args=(uri, aid, topo[tb], tb == master, 1079 attrs, connInfo[tb], services), 1129 args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]), 1080 1130 name=tb, 1081 1131 pdata=thread_pool, trace_file=self.trace_file) … … 1307 1357 return hosts, ips 1308 1358 1309 def get_access_to_testbeds(self, testbeds, access_user, 1310 export_project, master, allocated, tbparams, services):1359 def get_access_to_testbeds(self, testbeds, access_user, allocated, 1360 tbparams, masters): 1311 1361 """ 1312 1362 Request access to the various testbeds required for this instantiation … … 1316 1366 """ 1317 1367 for tb in testbeds: 1318 self.get_access(tb, None, tbparams, master, 1319 export_project, access_user, services) 1368 self.get_access(tb, None, tbparams, access_user, masters) 1320 1369 allocated[tb] = 1 1321 1370 … … 1479 1528 return rv 1480 1529 1481 def get_master_project(self, req):1482 master= None1483 export_project = None1484 for e in [ s for s in req.get('service', []) \1485 if s.get('name') == 'project_export']:1486 for a in e.get('fedAttr', []):1487 attr = a.get('attribute', None)1488 if attr == 'testbed':1489 master = a.get('value', None)1490 elif attr == 'project':1491 export_project = a.get('value', None)1492 1493 return (master, export_project)1494 1495 1496 1497 1530 def create_experiment(self, req, fid): 1498 1531 """ … … 1501 1534 1502 1535 Creates a working directory, splits the incoming description using the 1503 splitter script and parses out the avrious subsections using the1504 lcasses above. Once each sub-experiment is created, use pooled threads1536 splitter script and parses out the various subsections using the 1537 classes above. Once each sub-experiment is created, use pooled threads 1505 1538 to instantiate them and start it all up. 1506 1539 """ … … 1590 1623 raise service_error(service_error.server_config, 1591 1624 "Bad key type (%s)" % self.ssh_type) 1592 master, export_project = self.get_master_project(req) 1593 # XXX get these out when master and project are optional 1594 if not master: 1595 raise service_error(service_error.req, 1596 "No master testbed label") 1597 if not export_project: 1598 raise service_error(service_error.req, "No export project") 1599 # XXX 1600 1625 1626 # Copy the service request 1627 tb_services = [ s for s in req.get('service',[]) ] 1601 1628 # Translate to topdl 1602 1629 if self.splitter_url: … … 1607 1634 else: 1608 1635 tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 1609 str(self.muxmax), '-m', master]1636 str(self.muxmax), '-m', 'dummy'] 1610 1637 1611 1638 tclcmd.extend([pid, gid, eid, tclfile]) … … 1628 1655 if a.attribute == 'testbed']) 1629 1656 1657 masters = { } # testbeds exporting services 1658 for s in tb_services: 1659 # If this is a project_export request with the imports field 1660 # blank, fill it in. 1661 if s.get('name', '') == 'project_export': 1662 if 'import' not in s or len(s['import']) == 0: 1663 s['import'] = [ tb for tb in testbeds \ 1664 if tb not in s.get('export',[])] 1665 # Add the service to masters 1666 for tb in s.get('export', []): 1667 if s.get('name', None) and s.get('import', None): 1668 if tb not in masters: 1669 masters[tb] = [ ] 1670 1671 params = { } 1672 if 'fedAttr' in s: 1673 for a in s['fedAttr']: 1674 params[a.get('attribute', '')] = \ 1675 a.get('value','') 1676 1677 masters[tb].append(federated_service(name=s['name'], 1678 exporter=tb, importers=s.get('import',[]), 1679 params=params)) 1680 else: 1681 log.error('Testbed service does not have name " + \ 1682 "and importers') 1683 1684 1630 1685 allocated = { } # Testbeds we can access 1631 1686 topo ={ } # Sub topologies 1632 1687 connInfo = { } # Connection information 1633 services = [ ] 1634 masters = { } # testbeds exporting services 1635 self.get_access_to_testbeds(testbeds, access_user, 1636 export_project, master, allocated, tbparams, services) 1637 1638 # After this masters will hold a set of services exported by each 1639 # testbed 1640 for s in services: 1641 i = s.get('visibility', 'import') 1642 if i == 'export': 1643 for a in s.get('fedAttr', []): 1644 if a.get('attribute', '') == 'testbed': 1645 tb = a.get('value', None) 1646 if tb: 1647 if masters.has_key(tb): 1648 masters[tb].add(s.get('name','')) 1649 else: 1650 masters[tb] = set([s.get('name','')]) 1651 else: 1652 log.error('Testbed attribute with no value?') 1653 1688 self.get_access_to_testbeds(testbeds, access_user, allocated, 1689 tbparams, masters) 1654 1690 1655 1691 self.split_topology(top, topo, testbeds) … … 1697 1733 tb = t.get_attribute('testbed') 1698 1734 if tb: 1699 self.get_access(tb, None, tbparams, master,1700 export_project, access_user, services)1735 self.get_access(tb, None, tbparams, export_project, 1736 access_user, masters) 1701 1737 tbparams[k] = tbparams[tb] 1702 1738 del tbparams[tb] … … 1722 1758 'name': [ { 'localname' : eid} ], 1723 1759 'allocID' : tbparams[k]['allocID'], 1724 'master' : k == master, 1760 # XXX: 1761 #'master' : k == master, 1725 1762 'uri': tbparams[k]['uri'], 1726 1763 } … … 1803 1840 # Start a thread to do the resource allocation 1804 1841 t = Thread(target=self.allocate_resources, 1805 args=(allocated, master, eid, expid, tbparams, 1806 topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo, 1807 services), 1842 args=(allocated, masters, eid, expid, tbparams, 1843 topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo), 1808 1844 name=eid) 1809 1845 t.start()
Note: See TracChangeset
for help on using the changeset viewer.