Ignore:
Timestamp:
Apr 16, 2010 8:16:22 AM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
4f2f41f
Parents:
0c4b12c
Message:

better service handling including project_export psuedo service done more or less right- tested on dry runs

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r0c4b12c r43197eb  
    4242fl = logging.getLogger("fedd.experiment_control")
    4343fl.addHandler(nullHandler())
     44
     45
     46# Right now, no support for composition.
     47class federated_service:
     48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
     49        self.name=name
     50        self.exporter=exporter
     51        self.importers=importers
     52        self.params = params
     53        self.reqs = reqs
    4454
    4555class experiment_control_local:
     
    786796        else: return None
    787797
    788     def get_access(self, tb, nodes, tbparam, master, export_project,
    789             access_user, services):
     798    def get_access(self, tb, nodes, tbparam, access_user, masters):
    790799        """
    791800        Get access to testbed through fedd and set the parameters for that tb
    792801        """
     802        def get_export_project(svcs):
     803            """
     804            Look through for the list of federated_service for this testbed
     805            objects for a project_export service, and extract the project
     806            parameter.
     807            """
     808
     809            pe = [s for s in svcs if s.name=='project_export']
     810            if len(pe) == 1:
     811                return pe[0].params.get('project', None)
     812            elif len(pe) == 0:
     813                return None
     814            else:
     815                raise service_error(service_error.req,
     816                        "More than one project export is not supported")
     817
    793818        uri = self.tbmap.get(tb, None)
    794819        if not uri:
    795820            raise service_error(service_error.server_config,
    796821                    "Unknown testbed: %s" % tb)
     822
     823        export_svcs = masters.get(tb,[])
     824        import_svcs = [ s for m in masters.values() \
     825                for s in m \
     826                    if tb in s.importers ]
     827
     828        export_project = get_export_project(export_svcs)
    797829
    798830        # Tweak search order so that if there are entries in access_user that
     
    822854                        'destinationTestbed' : { 'uri' : uri },
    823855                        'credential': [ 'user: %s' % u ],
    824                         'user':  [ {'userID': { 'localname': u } } ],
    825856                        'allocID' : { 'localname': 'test' },
    826857                    }
    827858
    828             # If there is a master, and this is it, ask it to export services
    829             # XXX move this to export pseudo-service
    830             if tb == master:
    831                 req['service'] = [
    832                         { 'name': 'userconfig', 'visibility': 'export'},
    833                         { 'name': 'SMB', 'visibility': 'export'},
    834                         { 'name': 'seer', 'visibility': 'export'},
    835                         { 'name': 'tmcd', 'visibility': 'export'},
    836                     ]
     859            # Make the service request from the services we're importing and
     860            # exporting.  Keep track of the export request ids so we can
     861            # collect the resulting info from the access response.
     862            e_keys = { }
     863            if import_svcs or export_svcs:
     864                req['service'] = [ ]
     865
     866                for i, s in enumerate(import_svcs):
     867                    idx = 'import%d' % i
     868                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
     869                    if s.params:
     870                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
     871                                for k, v in s.params.items()]
     872                    req['service'].append(sr)
     873
     874                for i, s in enumerate(export_svcs):
     875                    idx = 'export%d' % i
     876                    e_keys[idx] = s
     877                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
     878                    if s.params:
     879                        sr['fedAttr'] = [ { 'attribute': k, 'value': v }
     880                                for k, v in s.params.items()]
     881                    req['service'].append(sr)
    837882
    838883            # node resources if any
     
    886931                "uri": uri,
    887932                }
    888         if 'service' in r:
    889             for s in r['service']:
    890                 # Tag each service with the origin testbed
    891                 if s.has_key('fedAttr'):
    892                     # The else connects with the for
    893                     for a in s['fedAttr']:
    894                         if a.get('attribute', None) == 'testbed':
    895                             break
    896                     else:
    897                         s['fedAttr'].append({'attribute': 'testbed',
    898                             'value': tb})
    899                 else:
    900                     s['fedAttr'] = [ {'attribute': 'testbed', 'value': tb} ]
    901             services.extend(r['service'])
     933
     934        # Collect the responses corresponding to the services this testbed
     935        # exports.  These will be the service requests that we will include in
     936        # the start segment requests (with appropriate visibility values) to
     937        # import and export the segments.
     938        for s in r.get('service', []):
     939            id = s.get('id', None)
     940            if id and id in e_keys:
     941                e_keys[id].reqs.append(s)
    902942
    903943        # Add attributes to parameter space.  We don't allow attributes to
     
    9681008            self.response = None
    9691009
    970         def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
    971                 services=None):
     1010        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
    9721011            req = {
    9731012                    'allocID': { 'fedid' : aid },
     
    9751014                        'topdldescription': topo.to_dict(),
    9761015                    },
    977                     'master': master,
     1016                    # XXX: deprecated
     1017                    'master': False,
    9781018                }
    9791019
    9801020            if connInfo:
    9811021                req['connection'] = connInfo
    982             # Add services to request.  The master exports, everyone else
    983             # imports.
    984             if services:
    985                 svcs = [ x.copy() for x in services]
    986                 for s in svcs:
    987                     if master: s['visibility'] = 'export'
    988                     else: s['visibility'] = 'import'
    989                 req['service'] = svcs
     1022
     1023            import_svcs = [ s for m in masters.values() \
     1024                    for s in m if self.testbed in s.importers]
     1025
     1026            if import_svcs or self.testbed in masters:
     1027                req['service'] = []
     1028
     1029            for s in import_svcs:
     1030                for r in s.reqs:
     1031                    sr = copy.deepcopy(r)
     1032                    sr['visibility'] = 'import';
     1033                    req['service'].append(sr)
     1034
     1035            for s in masters.get(self.testbed, []):
     1036                for r in s.reqs:
     1037                    sr = copy.deepcopy(r)
     1038                    sr['visibility'] = 'export';
     1039                    req['service'].append(sr)
     1040
    9901041            if attrs:
    9911042                req['fedAttr'] = attrs
     
    10381089   
    10391090
    1040     def allocate_resources(self, allocated, master, eid, expid,
     1091    def allocate_resources(self, allocated, masters, eid, expid,
    10411092            tbparams, topo, tmpdir, alloc_log=None, log_collector=None,
    1042             attrs=None, connInfo={}, services=[]):
     1093            attrs=None, connInfo={}):
    10431094
    10441095        started = { }           # Testbeds where a sub-experiment started
     
    10761127                        caller=self.call_StartSegment,
    10771128                        log_collector=log_collector),
    1078                     args=(uri, aid, topo[tb], tb == master,
    1079                         attrs, connInfo[tb], services),
     1129                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
    10801130                    name=tb,
    10811131                    pdata=thread_pool, trace_file=self.trace_file)
     
    13071357        return hosts, ips
    13081358
    1309     def get_access_to_testbeds(self, testbeds, access_user,
    1310             export_project, master, allocated, tbparams, services):
     1359    def get_access_to_testbeds(self, testbeds, access_user, allocated,
     1360            tbparams, masters):
    13111361        """
    13121362        Request access to the various testbeds required for this instantiation
     
    13161366        """
    13171367        for tb in testbeds:
    1318             self.get_access(tb, None, tbparams, master,
    1319                     export_project, access_user, services)
     1368            self.get_access(tb, None, tbparams, access_user, masters)
    13201369            allocated[tb] = 1
    13211370
     
    14791528        return rv
    14801529
    1481     def get_master_project(self, req):
    1482         master= None
    1483         export_project = None
    1484         for e in [ s for s in req.get('service', []) \
    1485                 if s.get('name') == 'project_export']:
    1486             for a in e.get('fedAttr', []):
    1487                 attr = a.get('attribute', None)
    1488                 if attr == 'testbed':
    1489                     master = a.get('value', None)
    1490                 elif attr == 'project':
    1491                     export_project = a.get('value', None)
    1492        
    1493         return (master, export_project)
    1494 
    1495 
    1496 
    14971530    def create_experiment(self, req, fid):
    14981531        """
     
    15011534
    15021535        Creates a working directory, splits the incoming description using the
    1503         splitter script and parses out the avrious subsections using the
    1504         lcasses above.  Once each sub-experiment is created, use pooled threads
     1536        splitter script and parses out the various subsections using the
     1537        classes above.  Once each sub-experiment is created, use pooled threads
    15051538        to instantiate them and start it all up.
    15061539        """
     
    15901623                raise service_error(service_error.server_config,
    15911624                        "Bad key type (%s)" % self.ssh_type)
    1592             master, export_project = self.get_master_project(req)
    1593             # XXX get these out when master and project are optional
    1594             if not master:
    1595                 raise service_error(service_error.req,
    1596                         "No master testbed label")
    1597             if not export_project:
    1598                 raise service_error(service_error.req, "No export project")
    1599             # XXX
    1600 
     1625
     1626            # Copy the service request
     1627            tb_services = [ s for s in req.get('service',[]) ]
    16011628            # Translate to topdl
    16021629            if self.splitter_url:
     
    16071634            else:
    16081635                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x',
    1609                     str(self.muxmax), '-m', master]
     1636                    str(self.muxmax), '-m', 'dummy']
    16101637
    16111638                tclcmd.extend([pid, gid, eid, tclfile])
     
    16281655                        if a.attribute == 'testbed'])
    16291656
     1657            masters = { }           # testbeds exporting services
     1658            for s in tb_services:
     1659                # If this is a project_export request with the imports field
     1660                # blank, fill it in.
     1661                if s.get('name', '') == 'project_export':
     1662                    if 'import' not in s or len(s['import']) == 0:
     1663                        s['import'] = [ tb for tb in testbeds \
     1664                                if tb not in s.get('export',[])]
     1665                # Add the service to masters
     1666                for tb in s.get('export', []):
     1667                    if s.get('name', None) and s.get('import', None):
     1668                        if tb not in masters:
     1669                            masters[tb] = [ ]
     1670
     1671                        params = { }
     1672                        if 'fedAttr' in s:
     1673                            for a in s['fedAttr']:
     1674                                params[a.get('attribute', '')] = \
     1675                                        a.get('value','')
     1676
     1677                        masters[tb].append(federated_service(name=s['name'],
     1678                                exporter=tb, importers=s.get('import',[]),
     1679                                params=params))
     1680                    else:
     1681                        log.error('Testbed service does not have name " + \
     1682                                "and importers')
     1683
     1684
    16301685            allocated = { }         # Testbeds we can access
    16311686            topo ={ }               # Sub topologies
    16321687            connInfo = { }          # Connection information
    1633             services = [ ]
    1634             masters = { }           # testbeds exporting services
    1635             self.get_access_to_testbeds(testbeds, access_user,
    1636                     export_project, master, allocated, tbparams, services)
    1637 
    1638             # After this masters will hold a set of services exported by each
    1639             # testbed
    1640             for s in services:
    1641                 i = s.get('visibility', 'import')
    1642                 if i == 'export':
    1643                     for a in s.get('fedAttr', []):
    1644                         if a.get('attribute', '') == 'testbed':
    1645                             tb = a.get('value', None)
    1646                             if tb:
    1647                                 if masters.has_key(tb):
    1648                                     masters[tb].add(s.get('name',''))
    1649                                 else:
    1650                                     masters[tb] = set([s.get('name','')])
    1651                             else:
    1652                                 log.error('Testbed attribute with no value?')
    1653 
     1688            self.get_access_to_testbeds(testbeds, access_user, allocated,
     1689                    tbparams, masters)
    16541690
    16551691            self.split_topology(top, topo, testbeds)
     
    16971733                tb = t.get_attribute('testbed')
    16981734                if tb:
    1699                     self.get_access(tb, None, tbparams, master,
    1700                             export_project, access_user, services)
     1735                    self.get_access(tb, None, tbparams, export_project,
     1736                            access_user, masters)
    17011737                    tbparams[k] = tbparams[tb]
    17021738                    del tbparams[tb]
     
    17221758                        'name': [ { 'localname' : eid} ],
    17231759                        'allocID' : tbparams[k]['allocID'],
    1724                         'master' : k == master,
     1760                        # XXX:
     1761                        #'master' : k == master,
    17251762                        'uri': tbparams[k]['uri'],
    17261763                    }
     
    18031840        # Start a thread to do the resource allocation
    18041841        t  = Thread(target=self.allocate_resources,
    1805                 args=(allocated, master, eid, expid, tbparams,
    1806                     topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
    1807                     services),
     1842                args=(allocated, masters, eid, expid, tbparams,
     1843                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo),
    18081844                name=eid)
    18091845        t.start()
Note: See TracChangeset for help on using the changeset viewer.