Changeset 43197eb for fedd/federation


Ignore:
Timestamp:
Apr 16, 2010 8:16:22 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
4f2f41f
Parents:
0c4b12c
Message:

better service handling including project_export psuedo service done more or less right- tested on dry runs

Location:
fedd/federation
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/emulab_access.py

    r0c4b12c r43197eb  
    124124        self.state_lock = Lock()
    125125        # XXX: Configurable
    126         self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig'))
     126        self.exports = set(('SMB', 'seer', 'tmcd', 'userconfig',
     127            'project_export'))
    127128        self.imports = set(('SMB', 'seer', 'userconfig'))
    128129
     
    550551        return confid, confcert
    551552
     553    def export_SMB(self, id, state, project, user):
     554        return {
     555                'id': id,
     556                'name': 'SMB',
     557                'visibility': 'export',
     558                'server': 'http://fs:139',
     559                'fedAttr': [
     560                        { 'attribute': 'SMBSHARE', 'value': 'USERS' },
     561                        { 'attribute': 'SMBUSER', 'value': user },
     562                        { 'attribute': 'SMBPROJ', 'value': project },
     563                    ]
     564                }
     565
     566    def export_seer(self, id, state, project, user):
     567        return {
     568                'id': id,
     569                'name': 'seer',
     570                'visibility': 'export',
     571                'server': 'http://control:16606',
     572                }
     573
     574    def export_tmcd(self, id, state, project, user):
     575        return {
     576                'id': id,
     577                'name': 'seer',
     578                'visibility': 'export',
     579                'server': 'http://boss:7777',
     580                }
     581
     582    def export_userconfig(self, id, state, project, user):
     583        if self.userconfdir and self.userconfcmd \
     584                and self.userconfurl:
     585            cid, cert = self.export_userconf(project)
     586            state['userconfig'] = unicode(cid)
     587            return {
     588                    'id': id,
     589                    'name': 'userconfig',
     590                    'visibility': 'export',
     591                    'server': "%s/%s" % (self.userconfurl, str(cid)),
     592                    'fedAttr': [
     593                        { 'attribute': 'cert', 'value': cert },
     594                    ]
     595                    }
     596        else:
     597            return None
    552598
    553599    def export_services(self, sreq, project, user):
     
    561607            if svis == 'export':
    562608                if sname in self.exports:
    563                     outs = s.copy()
     609                    id = s.get('id', 'no_id')
    564610                    if sname == 'SMB':
    565                         outs = s.copy()
    566                         outs['server'] = "http://fs:139"
    567                         outs['fedAttr'] = [
    568                                 { 'attribute': 'SMBSHARE', 'value': 'USERS' },
    569                                 { 'attribute': 'SMBUSER', 'value': user },
    570                                 { 'attribute': 'SMBPROJ', 'value': project },
    571                             ]
     611                        exp.append(self.export_SMB(id, state, project, user))
    572612                    elif sname == 'seer':
    573                         outs['server'] = "http://control:16606"
     613                        exp.append(self.export_seer(id, state, project, user))
    574614                    elif sname == 'tmcd':
    575                         outs['server'] = "http://boss:7777"
     615                        exp.append(self.export_tmcd(id, state, project, user))
    576616                    elif sname == 'userconfig':
    577                         if self.userconfdir and self.userconfcmd \
    578                                 and self.userconfurl:
    579                             cid, cert = self.export_userconf(project)
    580                             outs['server'] = "%s/%s" % \
    581                                     (self.userconfurl, str(cid))
    582                             outs['fedAttr'] = [
    583                                     { 'attribute': 'cert', 'value': cert },
    584                                 ]
    585                             state['userconfig'] = unicode(cid)
    586                     exp.append(outs)
     617                        exp.append(self.export_userconfig(id, state,
     618                            project, user))
     619                    elif sname == 'project_export':
     620                        exp.append(self.export_SMB(id, state, project, user))
     621                        exp.append(self.export_seer(id, state, project, user))
     622                        exp.append(self.export_userconfig(id, state,
     623                            project, user))
    587624        return (exp, state)
    588625
     
    627664            else: return h
    628665
     666        def get_export_project(svcs):
     667            """
     668            if the service requests includes one to export a project, return
     669            that project.
     670            """
     671            rv = None
     672            for s in svcs:
     673                if s.get('name', '') == 'project_export' and \
     674                        s.get('visibility', '') == 'export':
     675                    if not rv:
     676                        for a in s.get('feddAttr', []):
     677                            if a.get('attribute', '') == 'project' \
     678                                    and 'value' in a:
     679                                rv = a['value']
     680                    else:
     681                        raise service_error(service_error, access,
     682                                'Requesting multiple project exports is ' + \
     683                                        'not supported');
     684            return rv
     685
    629686        # The dance to get into the request body
    630687        if req.has_key('RequestAccessRequestBody'):
     
    642699            ap = None
    643700
    644             # If this is a request to export a project and the access project
    645             # is not the project to export, access denied.
    646             if req.has_key('exportProject'):
    647                 ep = unpack_id(req['exportProject'])
    648                 if ep != found[0].name:
     701            # if this includes a project export request and the exported
     702            # project is not the access project, access denied.
     703            if 'service' in req:
     704                ep = get_export_project(req['service'])
     705                if ep and ep != found[0].name:
    649706                    raise service_error(service_error.access,
    650707                            "Cannot export %s" % ep)
     
    9831040                'seer': client_null,
    9841041                'userconfig': client_null,
     1042                'project_export': client_null,
    9851043            }
    9861044
     
    9921050
    9931051        def server_seer(f, s):
    994             print >>f, 'seer: true'
     1052            print >>f, 'seer: True'
    9951053
    9961054        server_service_out = {
     
    9981056                'tmcd': server_port,
    9991057                'userconfig': server_null,
     1058                'project_export': server_null,
    10001059                'seer': server_seer,
    10011060            }
  • fedd/federation/experiment_control.py

    r0c4b12c r43197eb  
    4242fl = logging.getLogger("fedd.experiment_control")
    4343fl.addHandler(nullHandler())
     44
     45
     46# Right now, no support for composition.
     47class federated_service:
     48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
     49        self.name=name
     50        self.exporter=exporter
     51        self.importers=importers
     52        self.params = params
     53        self.reqs = reqs
    4454
    4555class experiment_control_local:
     
    786796        else: return None
    787797
    788     def get_access(self, tb, nodes, tbparam, master, export_project,
    789             access_user, services):
     798    def get_access(self, tb, nodes, tbparam, access_user, masters):
    790799        """
    791800        Get access to testbed through fedd and set the parameters for that tb
    792801        """
     802        def get_export_project(svcs):
     803            """
     804            Look through for the list of federated_service for this testbed
     805            objects for a project_export service, and extract the project
     806            parameter.
     807            """
     808
     809            pe = [s for s in svcs if s.name=='project_export']
     810            if len(pe) == 1:
     811                return pe[0].params.get('project', None)
     812            elif len(pe) == 0:
     813                return None
     814            else:
     815                raise service_error(service_error.req,
     816                        "More than one project export is not supported")
     817
    793818        uri = self.tbmap.get(tb, None)
    794819        if not uri:
    795820            raise service_error(service_error.server_config,
    796821                    "Unknown testbed: %s" % tb)
     822
     823        export_svcs = masters.get(tb,[])
     824        import_svcs = [ s for m in masters.values() \
     825                for s in m \
     826                    if tb in s.importers ]
     827
     828        export_project = get_export_project(export_svcs)
    797829
    798830        # Tweak search order so that if there are entries in access_user that
     
    822854                        'destinationTestbed' : { 'uri' : uri },
    823855                        'credential': [ 'user: %s' % u ],
    824                         'user':  [ {'userID': { 'localname': u } } ],
    825856                        'allocID' : { 'localname': 'test' },
    826857                    }
    827858
    828             # If there is a master, and this is it, ask it to export services
    829             # XXX move this to export pseudo-service
    830             if tb == master:
    831                 req['service'] = [
    832                         { 'name': 'userconfig', 'visibility': 'export'},
    833                         { 'name': 'SMB', 'visibility': 'export'},
    834                         { 'name': 'seer', 'visibility': 'export'},
    835                         { 'name': 'tmcd', 'visibility': 'export'},
    836                     ]
     859            # Make the service request from the services we're importing and
     860            # exporting.  Keep track of the export request ids so we can
     861            # collect the resulting info from the access response.
     862            e_keys = { }
     863            if import_svcs or export_svcs:
     864                req['service'] = [ ]
     865
     866                for i, s in enumerate(import_svcs):
     867                    idx = 'import%d' % i
     868                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
     869                    if s.params:
     870                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
     871                                for k, v in s.params.items()]
     872                    req['service'].append(sr)
     873
     874                for i, s in enumerate(export_svcs):
     875                    idx = 'export%d' % i
     876                    e_keys[idx] = s
     877                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
     878                    if s.params:
     879                        sr['fedAttr'] = [ { 'attribute': k, 'value': v }
     880                                for k, v in s.params.items()]
     881                    req['service'].append(sr)
    837882
    838883            # node resources if any
     
    886931                "uri": uri,
    887932                }
    888         if 'service' in r:
    889             for s in r['service']:
    890                 # Tag each service with the origin testbed
    891                 if s.has_key('fedAttr'):
    892                     # The else connects with the for
    893                     for a in s['fedAttr']:
    894                         if a.get('attribute', None) == 'testbed':
    895                             break
    896                     else:
    897                         s['fedAttr'].append({'attribute': 'testbed',
    898                             'value': tb})
    899                 else:
    900                     s['fedAttr'] = [ {'attribute': 'testbed', 'value': tb} ]
    901             services.extend(r['service'])
     933
     934        # Collect the responses corresponding to the services this testbed
     935        # exports.  These will be the service requests that we will include in
     936        # the start segment requests (with appropriate visibility values) to
     937        # import and export the segments.
     938        for s in r.get('service', []):
     939            id = s.get('id', None)
     940            if id and id in e_keys:
     941                e_keys[id].reqs.append(s)
    902942
    903943        # Add attributes to parameter space.  We don't allow attributes to
     
    9681008            self.response = None
    9691009
    970         def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
    971                 services=None):
     1010        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
    9721011            req = {
    9731012                    'allocID': { 'fedid' : aid },
     
    9751014                        'topdldescription': topo.to_dict(),
    9761015                    },
    977                     'master': master,
     1016                    # XXX: deprecated
     1017                    'master': False,
    9781018                }
    9791019
    9801020            if connInfo:
    9811021                req['connection'] = connInfo
    982             # Add services to request.  The master exports, everyone else
    983             # imports.
    984             if services:
    985                 svcs = [ x.copy() for x in services]
    986                 for s in svcs:
    987                     if master: s['visibility'] = 'export'
    988                     else: s['visibility'] = 'import'
    989                 req['service'] = svcs
     1022
     1023            import_svcs = [ s for m in masters.values() \
     1024                    for s in m if self.testbed in s.importers]
     1025
     1026            if import_svcs or self.testbed in masters:
     1027                req['service'] = []
     1028
     1029            for s in import_svcs:
     1030                for r in s.reqs:
     1031                    sr = copy.deepcopy(r)
     1032                    sr['visibility'] = 'import';
     1033                    req['service'].append(sr)
     1034
     1035            for s in masters.get(self.testbed, []):
     1036                for r in s.reqs:
     1037                    sr = copy.deepcopy(r)
     1038                    sr['visibility'] = 'export';
     1039                    req['service'].append(sr)
     1040
    9901041            if attrs:
    9911042                req['fedAttr'] = attrs
     
    10381089   
    10391090
    1040     def allocate_resources(self, allocated, master, eid, expid,
     1091    def allocate_resources(self, allocated, masters, eid, expid,
    10411092            tbparams, topo, tmpdir, alloc_log=None, log_collector=None,
    1042             attrs=None, connInfo={}, services=[]):
     1093            attrs=None, connInfo={}):
    10431094
    10441095        started = { }           # Testbeds where a sub-experiment started
     
    10761127                        caller=self.call_StartSegment,
    10771128                        log_collector=log_collector),
    1078                     args=(uri, aid, topo[tb], tb == master,
    1079                         attrs, connInfo[tb], services),
     1129                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
    10801130                    name=tb,
    10811131                    pdata=thread_pool, trace_file=self.trace_file)
     
    13071357        return hosts, ips
    13081358
    1309     def get_access_to_testbeds(self, testbeds, access_user,
    1310             export_project, master, allocated, tbparams, services):
     1359    def get_access_to_testbeds(self, testbeds, access_user, allocated,
     1360            tbparams, masters):
    13111361        """
    13121362        Request access to the various testbeds required for this instantiation
     
    13161366        """
    13171367        for tb in testbeds:
    1318             self.get_access(tb, None, tbparams, master,
    1319                     export_project, access_user, services)
     1368            self.get_access(tb, None, tbparams, access_user, masters)
    13201369            allocated[tb] = 1
    13211370
     
    14791528        return rv
    14801529
    1481     def get_master_project(self, req):
    1482         master= None
    1483         export_project = None
    1484         for e in [ s for s in req.get('service', []) \
    1485                 if s.get('name') == 'project_export']:
    1486             for a in e.get('fedAttr', []):
    1487                 attr = a.get('attribute', None)
    1488                 if attr == 'testbed':
    1489                     master = a.get('value', None)
    1490                 elif attr == 'project':
    1491                     export_project = a.get('value', None)
    1492        
    1493         return (master, export_project)
    1494 
    1495 
    1496 
    14971530    def create_experiment(self, req, fid):
    14981531        """
     
    15011534
    15021535        Creates a working directory, splits the incoming description using the
    1503         splitter script and parses out the avrious subsections using the
    1504         lcasses above.  Once each sub-experiment is created, use pooled threads
     1536        splitter script and parses out the various subsections using the
     1537        classes above.  Once each sub-experiment is created, use pooled threads
    15051538        to instantiate them and start it all up.
    15061539        """
     
    15901623                raise service_error(service_error.server_config,
    15911624                        "Bad key type (%s)" % self.ssh_type)
    1592             master, export_project = self.get_master_project(req)
    1593             # XXX get these out when master and project are optional
    1594             if not master:
    1595                 raise service_error(service_error.req,
    1596                         "No master testbed label")
    1597             if not export_project:
    1598                 raise service_error(service_error.req, "No export project")
    1599             # XXX
    1600 
     1625
     1626            # Copy the service request
     1627            tb_services = [ s for s in req.get('service',[]) ]
    16011628            # Translate to topdl
    16021629            if self.splitter_url:
     
    16071634            else:
    16081635                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x',
    1609                     str(self.muxmax), '-m', master]
     1636                    str(self.muxmax), '-m', 'dummy']
    16101637
    16111638                tclcmd.extend([pid, gid, eid, tclfile])
     
    16281655                        if a.attribute == 'testbed'])
    16291656
     1657            masters = { }           # testbeds exporting services
     1658            for s in tb_services:
     1659                # If this is a project_export request with the imports field
     1660                # blank, fill it in.
     1661                if s.get('name', '') == 'project_export':
     1662                    if 'import' not in s or len(s['import']) == 0:
     1663                        s['import'] = [ tb for tb in testbeds \
     1664                                if tb not in s.get('export',[])]
     1665                # Add the service to masters
     1666                for tb in s.get('export', []):
     1667                    if s.get('name', None) and s.get('import', None):
     1668                        if tb not in masters:
     1669                            masters[tb] = [ ]
     1670
     1671                        params = { }
     1672                        if 'fedAttr' in s:
     1673                            for a in s['fedAttr']:
     1674                                params[a.get('attribute', '')] = \
     1675                                        a.get('value','')
     1676
     1677                        masters[tb].append(federated_service(name=s['name'],
     1678                                exporter=tb, importers=s.get('import',[]),
     1679                                params=params))
     1680                    else:
     1681                        log.error('Testbed service does not have name " + \
     1682                                "and importers')
     1683
     1684
    16301685            allocated = { }         # Testbeds we can access
    16311686            topo ={ }               # Sub topologies
    16321687            connInfo = { }          # Connection information
    1633             services = [ ]
    1634             masters = { }           # testbeds exporting services
    1635             self.get_access_to_testbeds(testbeds, access_user,
    1636                     export_project, master, allocated, tbparams, services)
    1637 
    1638             # After this masters will hold a set of services exported by each
    1639             # testbed
    1640             for s in services:
    1641                 i = s.get('visibility', 'import')
    1642                 if i == 'export':
    1643                     for a in s.get('fedAttr', []):
    1644                         if a.get('attribute', '') == 'testbed':
    1645                             tb = a.get('value', None)
    1646                             if tb:
    1647                                 if masters.has_key(tb):
    1648                                     masters[tb].add(s.get('name',''))
    1649                                 else:
    1650                                     masters[tb] = set([s.get('name','')])
    1651                             else:
    1652                                 log.error('Testbed attribute with no value?')
    1653 
     1688            self.get_access_to_testbeds(testbeds, access_user, allocated,
     1689                    tbparams, masters)
    16541690
    16551691            self.split_topology(top, topo, testbeds)
     
    16971733                tb = t.get_attribute('testbed')
    16981734                if tb:
    1699                     self.get_access(tb, None, tbparams, master,
    1700                             export_project, access_user, services)
     1735                    self.get_access(tb, None, tbparams, export_project,
     1736                            access_user, masters)
    17011737                    tbparams[k] = tbparams[tb]
    17021738                    del tbparams[tb]
     
    17221758                        'name': [ { 'localname' : eid} ],
    17231759                        'allocID' : tbparams[k]['allocID'],
    1724                         'master' : k == master,
     1760                        # XXX:
     1761                        #'master' : k == master,
    17251762                        'uri': tbparams[k]['uri'],
    17261763                    }
     
    18031840        # Start a thread to do the resource allocation
    18041841        t  = Thread(target=self.allocate_resources,
    1805                 args=(allocated, master, eid, expid, tbparams,
    1806                     topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
    1807                     services),
     1842                args=(allocated, masters, eid, expid, tbparams,
     1843                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo),
    18081844                name=eid)
    18091845        t.start()
  • fedd/federation/experiment_partition.py

    r0c4b12c r43197eb  
    6464        """
    6565        seer_master = None
    66         for k, s in masters.items():
    67             if 'SEER' in s:
    68                 seer_master = k
    69                 break
     66        for m in masters.values():
     67            for s in m:
     68                if s.name == 'SEER':
     69                    seer_master = m
     70                    break
     71            if seer_master: break
    7072
    7173        if seer_master:
     
    8486
    8587        if (st in masters and dt not in masters) or \
    86                 ( st in masters and dt not in masters ):
     88                ( st not in masters and dt in masters ):
    8789            active = ("%s" % (st in masters))
    8890        else:
    8991            active = ("%s" % (st > dt))
     92
     93        print "%s %s" % (st, active)
    9094
    9195        ifaces = [ ]
     
    279283
    280284        seer_master = None
    281         for k, s in masters.items():
    282             if 'SEER' in s:
    283                 seer_master = k
    284                 break
     285        for m in masters.values():
     286            for s in m:
     287                if s.name == 'SEER':
     288                    seer_master = m
     289                    break
     290            if seer_master: break
    285291
    286292        if seer_master:
     
    378384
    379385                seer_master = None
    380                 for k, s in masters.items():
    381                     if 'SEER' in s:
    382                         seer_master = k
    383                         break
     386
     387                for m in masters.values():
     388                    for s in m:
     389                        if s.name == 'SEER':
     390                            seer_master = m
     391                            break
     392                    if seer_master: break
    384393
    385394                if seer_master:
     
    534543        # to the master for each service.
    535544        for mtb in [ t for t in tbparams if t in masters ]:
    536             for tb in [ t for tb in tbparams if t != mtb]:
     545            importers = set([])
     546            for m in masters[mtb]:
     547                importers |= set(m.importers)
     548            for tb in importers:
    537549                if len([e for e in topo[tb].elements \
    538550                        if isinstance(e, topdl.Computer) and \
  • fedd/federation/protogeni_access.py

    r0c4b12c r43197eb  
    455455
    456456
     457    def export_SMB(self, id, state, project, user):
     458        return {
     459                'id': id,
     460                'name': 'SMB',
     461                'visibility': 'export',
     462                'server': 'http://fs:139',
     463                'fedAttr': [
     464                        { 'attribute': 'SMBSHARE', 'value': 'USERS' },
     465                        { 'attribute': 'SMBUSER', 'value': user },
     466                        { 'attribute': 'SMBPROJ', 'value': project },
     467                    ]
     468                }
     469
     470    def export_seer(self, id, state, project, user):
     471        return {
     472                'id': id,
     473                'name': 'seer',
     474                'visibility': 'export',
     475                'server': 'http://control:16606',
     476                }
     477
     478    def export_tmcd(self, id, state, project, user):
     479        return {
     480                'id': id,
     481                'name': 'seer',
     482                'visibility': 'export',
     483                'server': 'http://boss:7777',
     484                }
     485
     486    def export_userconfig(self, id, state, project, user):
     487        if self.userconfdir and self.userconfcmd \
     488                and self.userconfurl:
     489            cid, cert = self.export_userconf(project)
     490            state['userconfig'] = unicode(cid)
     491            return {
     492                    'id': id,
     493                    'name': 'userconfig',
     494                    'visibility': 'export',
     495                    'server': "%s/%s" % (self.userconfurl, str(cid)),
     496                    'fedAttr': [
     497                        { 'attribute': 'cert', 'value': cert },
     498                    ]
     499                    }
     500        else:
     501            return None
     502
    457503    def export_services(self, sreq, project, user):
    458504        exp = [ ]
     
    465511            if svis == 'export':
    466512                if sname in self.exports:
    467                     outs = s.copy()
     513                    id = s.get('id', 'no_id')
    468514                    if sname == 'SMB':
    469                         outs = s.copy()
    470                         outs['server'] = "http://fs:139"
    471                         outs['fedAttr'] = [
    472                                 { 'attribute': 'SMBSHARE', 'value': 'USERS' },
    473                                 { 'attribute': 'SMBUSER', 'value': user },
    474                                 { 'attribute': 'SMBPROJ', 'value': project },
    475                             ]
     515                        exp.append(self.export_SMB(id, state, project, user))
    476516                    elif sname == 'seer':
    477                         outs['server'] = "http://control:16606"
     517                        exp.append(self.export_seer(id, state, project, user))
    478518                    elif sname == 'tmcd':
    479                         outs['server'] = "http://boss:7777"
     519                        exp.append(self.export_tmcd(id, state, project, user))
    480520                    elif sname == 'userconfig':
    481                         if self.userconfdir and self.userconfcmd \
    482                                 and self.userconfurl:
    483                             cid, cert = self.export_userconf(project)
    484                             outs['server'] = "%s/%s" % \
    485                                     (self.userconfurl, str(cid))
    486                             outs['fedAttr'] = [
    487                                     { 'attribute': 'cert', 'value': cert },
    488                                 ]
    489                             state['userconfig'] = unicode(cid)
    490                     exp.append(outs)
     521                        exp.append(self.export_userconfig(id, state,
     522                            project, user))
     523                    elif sname == 'project_export':
     524                        exp.append(self.export_SMB(id, state, project, user))
     525                        exp.append(self.export_seer(id, state, project, user))
     526                        exp.append(self.export_userconfig(id, state,
     527                            project, user))
    491528        return (exp, state)
    492529
Note: See TracChangeset for help on using the changeset viewer.