Ignore:
Timestamp:
Feb 28, 2010 12:31:25 PM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
109a32a
Parents:
ef252e9
Message:

Inital parameterization and synchronization. Tested for Emulabs, but not DRAGON.
Add get and set value synchronization.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    ref252e9 r2761484  
    2828from remote_service import xmlrpc_handler, soap_handler, service_caller
    2929from service_error import service_error
     30from synch_store import synch_store
    3031
    3132import topdl
     
    239240        self.state_filename = config.get("experiment_control",
    240241                "experiment_state")
     242        self.store_filename = config.get("experiment_control",
     243                "synch_store")
     244        self.store_url = config.get("experiment_control", "store_url")
    241245        self.splitter_url = config.get("experiment_control", "splitter_uri")
    242246        self.fedkit = parse_tarfile_list(\
     
    332336            self.read_state()
    333337
     338        if self.store_filename:
     339            self.read_store()
     340        else:
     341            self.log.warning("No saved synch store")
     342            self.synch_store = synch_store
     343
    334344        # Dispatch tables
    335345        self.soap_services = {\
     
    342352                'Terminate': soap_handler('Terminate',
    343353                    self.terminate_experiment),
     354                'GetValue': soap_handler('GetValue', self.GetValue),
     355                'SetValue': soap_handler('SetValue', self.SetValue),
    344356        }
    345357
     
    353365                'Terminate': xmlrpc_handler('Terminate',
    354366                    self.terminate_experiment),
     367                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
     368                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
    355369        }
    356370
     
    377391            self.log.error("Pickling problem (TypeError): %s" % e)
    378392
     393    @staticmethod
     394    def get_alloc_ids(state):
     395        """
     396        Pull the fedids of the identifiers of each allocation from the
     397        state.  Again, a dict dive that's best isolated.
     398
     399        Used by read_store and read state
     400        """
     401
     402        return [ f['allocID']['fedid']
     403                for f in state.get('federant',[]) \
     404                    if f.has_key('allocID') and \
     405                        f['allocID'].has_key('fedid')]
     406
    379407    # Call while holding self.state_lock
    380408    def read_state(self):
     
    399427            else:
    400428                return None
    401 
    402         def get_alloc_ids(state):
    403             """
    404             Pull the fedids of the identifiers of each allocation from the
    405             state.  Again, a dict dive that's best isolated.
    406             """
    407 
    408             return [ f['allocID']['fedid']
    409                     for f in state.get('federant',[]) \
    410                         if f.has_key('allocID') and \
    411                             f['allocID'].has_key('fedid')]
    412 
    413429
    414430        try:
     
    438454                    # Set permissions to allow reading of the software repo, if
    439455                    # any, as well.
    440                     for a in get_alloc_ids(s):
     456                    for a in self.get_alloc_ids(s):
    441457                        self.auth.set_attribute(a, 'repo/%s' % eid)
    442458                else:
     
    529545                    "open %s: %s" % (file, e))
    530546        f.close()
     547
     548    def read_store(self):
     549        try:
     550            self.synch_store = synch_store()
     551            self.synch_store.load(self.store_filename)
     552            self.log.debug("[read_store]: Read store from %s" % \
     553                    self.store_filename)
     554        except IOError, e:
     555            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
     556                    % (self.state_filename, e))
     557            self.synch_store = synch_store()
     558
     559        # Set the initial permissions on data in the store.  XXX: This ad hoc
     560        # authorization attribute initialization is getting out of hand.
     561        for k in self.synch_store.all_keys():
     562            try:
     563                if k.startswith('fedid:'):
     564                    fid = fedid(hexstr=k[6:46])
     565                    if self.state.has_key(fid):
     566                        for a in self.get_alloc_ids(self.state[fid]):
     567                            self.auth.set_attribute(a, k)
     568            except ValueError, e:
     569                self.log.warn("Cannot deduce permissions for %s" % k)
     570
     571
     572    def write_store(self):
     573        """
     574        Write a new copy of synch_store after writing current state
     575        to a backup.  We use the internal synch_store pickle method to avoid
     576        incinsistent data.
     577
     578        State format is a simple pickling of the store.
     579        """
     580        if os.access(self.store_filename, os.W_OK):
     581            copy_file(self.store_filename, \
     582                    "%s.bak" % self.store_filename)
     583        try:
     584            self.synch_store.save(self.store_filename)
     585        except IOError, e:
     586            self.log.error("Can't write file %s: %s" % \
     587                    (self.store_filename, e))
     588        except TypeError, e:
     589            self.log.error("Pickling problem (TypeError): %s" % e)
     590
    531591       
    532592    def generate_ssh_keys(self, dest, type="rsa" ):
     
    13241384
    13251385    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
    1326             portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[]):
     1386            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[],
     1387            expid=None):
    13271388        """
    13281389        Return a new internet portal node and a dict with the connectionInfo to
     
    13551416            ifaces.append(inf)
    13561417        if conn_type == "ssh":
     1418            try:
     1419                aid = tbparams[st]['allocID']['fedid']
     1420            except:
     1421                self.log.debug("Can't get allocation id?")
     1422                aid = None
    13571423            info = {
    13581424                    "type" : conn_type,
    13591425                    "portal": myname,
    1360                     'peer': desthost,
    13611426                    'fedAttr': [
    13621427                            { 'attribute': 'masterdomain', 'value': mdomain},
     
    13681433                            { 'attribute': 'smbshare', 'value': smbshare},
    13691434                        ],
     1435                    'parameter': [
     1436                        {
     1437                            'name': 'peer',
     1438                            'key': 'fedid:%s/%s' % (expid, myname),
     1439                            'store': self.store_url,
     1440                            'type': 'output',
     1441                        },
     1442                        {
     1443                            'name': 'peer',
     1444                            'key': 'fedid:%s/%s' % (expid, desthost),
     1445                            'store': self.store_url,
     1446                            'type': 'input',
     1447                        },
     1448                        ]
    13701449                    }
     1450            # Give this allocation the rights to access the key of the
     1451            # peers
     1452            if aid:
     1453                for h in (myname, desthost):
     1454                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
     1455            else:
     1456                self.log.error("No aid for %s in new_portal_node" % st)
    13711457        else:
    13721458            info = None
     
    13841470                ), info)
    13851471
    1386     def new_portal_substrate(self, st, dt, eid, tbparams):
     1472    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
    13871473        ddomain = tbparams[dt].get('domain', ".example.com")
    13881474        dproject = tbparams[dt].get('project', 'project')
     
    14141500        return (tsubstrate, segment_element)
    14151501
    1416     def new_dragon_topo(self, idx, sub, topo, tbs, tbparams):
     1502    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
    14171503        if sub.capacity is None:
    14181504            raise service_error(service_error.internal,
     
    14241510                    for n, v, in (\
    14251511                            ('vlan', 'unassigned%d' % idx),)])
     1512        name = "dragon%d" % idx
    14261513        for tb in tbs.keys():
    14271514            seg = topdl.Segment(
     
    14421529            segs.append(seg)
    14431530
    1444         topo["dragon%d" %idx] = \
     1531
     1532        try:
     1533            aid = tbparams[tb]['allocID']['fedid']
     1534        except:
     1535            self.log.debug("Can't get allocation id?")
     1536            aid = None
     1537        connInfo[name] = [ {
     1538            'parameter': [ {
     1539                'name': 'vlan_id',
     1540                'key': "fedid:%s/vlan%d" % (expid, idx),
     1541                'store': self.store_url,
     1542                'type': 'output'
     1543                } ]
     1544            } ]
     1545
     1546        # Give this allocation the rights to access the key of the
     1547        # vlan_id
     1548        if aid:
     1549            self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, idx))
     1550
     1551        topo[name] = \
    14451552                topdl.Topology(substrates=[substr], elements=segs,
    14461553                        attribute=[
     
    14521559
    14531560    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
    1454             connInfo, peer={ }):
     1561            connInfo, peer={ }, expid=None):
    14551562        """
    14561563        Add attribiutes to the various elements indicating that they are to be
     
    14861593                if not connInfo.has_key(tb):
    14871594                    connInfo[tb] = [ ]
     1595
     1596                try:
     1597                    aid = tbparams[tb]['allocID']['fedid']
     1598                except:
     1599                    self.log.debug("Can't get allocation id?")
     1600                    aid = None
    14881601
    14891602                # This may need another look, but only a service gateway will
     
    15061619                            { 'attribute': 'active', 'value': active},
    15071620                            ],
     1621                        'parameter': [ {
     1622                            'name': 'vlan_id',
     1623                            'key': 'fedid:%s/vlan%d' % (expid, dn),
     1624                            'store': self.store_url,
     1625                            'type': 'input',
     1626                            } ]
    15081627                        }
    15091628                if peer.has_key(tb):
    15101629                    info['peer'] = peer[tb]
    15111630                connInfo[tb].append(info)
     1631
     1632                # Give this allocation the rights to access the key of the
     1633                # vlan_id
     1634                if aid:
     1635                    self.auth.set_attribute(aid,
     1636                            'fedid:%s/vlan%d' % (expid, dn))
    15121637            else:
    15131638                raise service_error(service_error.internal,
     
    15171642
    15181643    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
    1519             segment_substrate, portals, connInfo):
     1644            segment_substrate, portals, connInfo, expid):
    15201645        # More than one testbed is on this substrate.  Insert
    15211646        # some portals into the subtopologies.  st == source testbed,
     
    15461671                    # testbed in there.
    15471672                    tsubstrate, segment_element = \
    1548                             self.new_portal_substrate(st, dt, eid, tbparams)
     1673                            self.new_portal_substrate(st, dt, eid, tbparams,
     1674                                    expid)
    15491675                    segment_substrate[st][dt] = tsubstrate
    15501676                    topo[st].substrates.append(tsubstrate)
     
    15671693                        portal_type = "experiment"
    15681694                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
    1569                         desthost = "%stunnel%d.%s.%s%s" % (st,
    1570                                 len(portals[st][dt]), eid.lower(),
    1571                                 dproject.lower(), ddomain.lower())
     1695                        desthost = "%stunnel%d" % (st.lower(),
     1696                                len(portals[st][dt]))
    15721697                    else:
    15731698                        new_i = topdl.Interface(
     
    15861711                    portals[st][dt] = [ ]
    15871712                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
    1588                     desthost = "%stunnel%d.%s.%s%s" % (st.lower(),
    1589                             len(portals[st][dt]), eid.lower(),
    1590                             dproject.lower(), ddomain.lower())
     1713                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
    15911714
    15921715                    if dt == master or st == master: portal_type = "both"
     
    16021725                    portal, info  =  self.new_portal_node(st, dt, tbparams,
    16031726                            master, eid, myname, desthost, portal_type,
    1604                             infs)
     1727                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
    16051728                    if self.fedkit:
    16061729                        self.add_kit(portal, self.fedkit)
     
    16121735                    connInfo[st].append(info)
    16131736
    1614     def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo):
     1737    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
    16151738        # Add to the master testbed
    16161739        tsubstrate, segment_element = \
    1617                 self.new_portal_substrate(st, dt, eid, tbparams)
     1740                self.new_portal_substrate(st, dt, eid, tbparams, expid)
    16181741        myname = "%stunnel" % dt
    16191742        desthost = "%stunnel" % st
     
    16211744        portal, info = self.new_portal_node(st, dt, tbparams, master,
    16221745                eid, myname, desthost, "control",
    1623                 ((tsubstrate.name,(('portal','true'),)),))
     1746                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
     1747                conn_attrs=[], expid=expid)
    16241748        if self.fedkit:
    16251749            self.add_kit(portal, self.fedkit)
     
    16351759
    16361760    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx,
    1637             substrate, tbparams):
     1761            substrate, tbparams, expid):
    16381762        # Add to the master testbed
    16391763        myname = "%stunnel" % dt
     
    16451769                    ('portal','true'),
    16461770                    ('ip4_address', "%s" % ip_addr(myip)),)),),
    1647                 conn_type="transit")
     1771                conn_type="transit", conn_attrs=[], expid=expid)
    16481772        if self.fedkit:
    16491773            self.add_kit(portal, self.fedkit)
     
    16541778
    16551779    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator,
    1656             connInfo):
     1780            connInfo, expid):
    16571781        """
    16581782        For each substrate in the main topology, find those that
     
    16801804                    all([tbparams[x].has_key('dragon') for x in tbs]):
    16811805                self.create_dragon_substrate(s, topo, tbs, tbparams,
    1682                         master, eid, connInfo)
     1806                        master, eid, connInfo, expid)
    16831807            else:
    16841808                self.insert_internet_portals(s, topo, tbs, tbparams, master,
    1685                         eid, segment_substrate, portals, connInfo)
     1809                        eid, segment_substrate, portals, connInfo, expid)
    16861810
    16871811        # Make sure that all the slaves have a control portal back to the
     
    17331857                            )
    17341858                    portal = self.new_dragon_portal(tb, master,
    1735                             master, eid, dip, mip, idx, csub, tbparams)
     1859                            master, eid, dip, mip, idx, csub, tbparams, expid)
    17361860                    topo[tb].substrates.append(csub)
    17371861                    topo[tb].elements.append(portal)
     
    17601884                            )
    17611885                    portal = self.new_dragon_portal(master, tb, master,
    1762                             eid, mip, dip, idx, mcsub, tbparams)
     1886                            eid, mip, dip, idx, mcsub, tbparams, expid)
    17631887                    topo[master].substrates.append(mcsub)
    17641888                    topo[master].elements.append(portal)
     
    17691893                    self.create_dragon_substrate(csub, topo,
    17701894                            {tb: 1, master:1}, tbparams, master, eid, connInfo,
    1771                             {tb: ip_addr(mip), master: ip_addr(dip)})
     1895                            {tb: ip_addr(mip), master: ip_addr(dip)}, expid)
    17721896                else:
    17731897                    self.add_control_portal(master, tb, master, eid, topo,
    1774                             tbparams, connInfo)
     1898                            tbparams, connInfo, expid)
    17751899                    self.add_control_portal(tb, master, master, eid, topo,
    1776                             tbparams, connInfo)
     1900                            tbparams, connInfo, expid)
    17771901
    17781902        # Connect the portal nodes into the topologies and clear out
     
    21032227
    21042228            self.add_portals(top, topo, eid, master, tbparams, ip_allocator,
    2105                     connInfo)
     2229                    connInfo, expid)
    21062230            # Now get access to the dynamic testbeds
    21072231            for k, t in topo.items():
     
    25852709            self.state_lock.release()
    25862710
     2711            # Delete any synch points associated with this experiment.  All
     2712            # synch points begin with the fedid of the experiment.
     2713            fedid_keys = set(["fedid:%s" % f for f in ids \
     2714                    if isinstance(f, fedid)])
     2715            for k in self.synch_store.all_keys():
     2716                try:
     2717                    if len(k) > 45 and k[0:46] in fedid_keys:
     2718                        self.synch_store.del_value(k)
     2719                except synch_store.BadDeleteionError:
     2720                    pass
     2721            self.write_store()
     2722               
    25872723            return {
    25882724                    'experiment': exp ,
     
    25932729            self.state_lock.release()
    25942730            raise service_error(service_error.req, "No saved state")
     2731
     2732
     2733    def GetValue(self, req, fid):
     2734        """
     2735        Get a value from the synchronized store
     2736        """
     2737        req = req.get('GetValueRequestBody', None)
     2738        if not req:
     2739            raise service_error(service_error.req,
     2740                    "Bad request format (no GetValueRequestBody)")
     2741       
     2742        name = req['name']
     2743        wait = req['wait']
     2744        rv = { 'name': name }
     2745
     2746        if self.auth.check_attribute(fid, name):
     2747            v = self.synch_store.get_value(name, wait)
     2748            if v is not None:
     2749                rv['value'] = v
     2750            return rv
     2751        else:
     2752            raise service_error(service_error.access, "Access Denied")
     2753       
     2754
     2755    def SetValue(self, req, fid):
     2756        """
     2757        Set a value in the synchronized store
     2758        """
     2759        req = req.get('SetValueRequestBody', None)
     2760        if not req:
     2761            raise service_error(service_error.req,
     2762                    "Bad request format (no SetValueRequestBody)")
     2763       
     2764        name = req['name']
     2765        v = req['value']
     2766
     2767        if self.auth.check_attribute(fid, name):
     2768            try:
     2769                self.synch_store.set_value(name, v)
     2770                self.write_store()
     2771            except synch_store.CollisionError:
     2772                # Translate into a service_error
     2773                raise service_error(service_error.req,
     2774                        "Value already set: %s" %name)
     2775            return { 'name': name, 'value': v }
     2776        else:
     2777            raise service_error(service_error.access, "Access Denied")
Note: See TracChangeset for help on using the changeset viewer.