Changeset 06cc65b


Ignore:
Timestamp:
May 28, 2010 2:23:12 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
5bf359d
Parents:
d6990a4
Message:

more refactoring - beaking code into smaller chunks for digestibility

Location:
fedd/federation
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    rd6990a4 r06cc65b  
    205205                f = open(self.state_filename, 'w')
    206206                pickle.dump(self.state, f)
     207                self.log.debug("Wrote state to %s" % self.state_filename)
    207208            except EnvironmentError, e:
    208209                self.log.error("Can't write file %s: %s" % \
     
    224225                f = open(self.state_filename, "r")
    225226                self.state = pickle.load(f)
    226 
     227                self.log.debug("[read_state]: Read state from %s" % \
     228                        self.state_filename)
    227229            except EnvironmentError, e:
    228230                self.log.warning(("[read_state]: No saved state: " +\
     
    236238                        "Unpickling failed: %s") % e)
    237239
    238             self.log.debug("[read_state]: Read state from %s" % \
    239                     self.state_filename)
    240 
    241             self.allocation = self.state['allocation']
    242             self.projects = self.state['projects']
    243             self.keys = self.state['keys']
    244             self.types = self.state['types']
    245 
    246             # Add the ownership attributes to the authorizer.  Note that the
    247             # indices of the allocation dict are strings, but the attributes are
    248             # fedids, so there is a conversion.
    249             for k in self.allocation.keys():
    250                 for o in self.allocation[k].get('owners', []):
    251                     self.auth.set_attribute(o, fedid(hexstr=k))
    252                 if self.allocation[k].has_key('userconfig'):
    253                     sfid = self.allocation[k]['userconfig']
    254                     fid = fedid(hexstr=sfid)
    255                     self.auth.set_attribute(fid, "/%s" % sfid)
    256240
    257241
  • fedd/federation/emulab_access.py

    rd6990a4 r06cc65b  
    1010import logging
    1111import subprocess
     12import traceback
    1213
    1314from threading import *
     
    4849    dynamically.  This implements both direct requests and proxies.
    4950    """
    50 
    51     proxy_RequestAccess= service_caller('RequestAccess')
    52     proxy_ReleaseAccess= service_caller('ReleaseAccess')
    5351
    5452    def __init__(self, config=None, auth=None):
     
    106104
    107105        self.restricted = [ ]
    108         self.projects = { }
    109         self.keys = { }
    110         self.types = { }
    111         self.allocation = { }
    112         self.state = {
    113             'projects': self.projects,
    114             'allocation' : self.allocation,
    115             'keys' : self.keys,
    116             'types': self.types
    117         }
    118106        self.access = { }
    119107        if config.has_option("access", "accessdb"):
     
    138126                    self.make_access_project)
    139127
    140         # read state in the base_class
     128        # read_state in the base_class
    141129        self.state_lock.acquire()
     130        for a  in ('allocation', 'projects', 'keys', 'types'):
     131            if a not in self.state:
     132                self.state[a] = { }
    142133        self.allocation = self.state['allocation']
    143134        self.projects = self.state['projects']
     
    192183    @staticmethod
    193184    def make_access_project(str):
     185        """
     186        Convert a string of the form (id[:resources:resouces], id, id) into an
     187        access_project.  This is called by read_access to convert to local
     188        attributes.  It returns a tuple of the form (project, user, user) where
     189        users may be names or fedids.
     190        """
    194191        def parse_name(n):
    195192            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
     
    216213
    217214
     215    # RequestAccess support routines
     216
    218217    def lookup_access(self, req, fid):
    219218        """
    220         Determine the allowed access for this request.  Return the access and
    221         which fields are dynamic.
    222 
    223         The fedid is needed to construct the request
     219        Look up the local access control information mapped to this fedid and
     220        credentials.  In this case it is a (project, create_user, access_user)
     221        triple, and a triple of three booleans indicating which, if any will
     222        need to be dynamically created.  Finally a list of owners for that
     223        allocation is returned.
     224
     225        lookup_access_base pulls the first triple out, and it is parsed by this
     226        routine into the boolean map.  Owners is always the controlling fedid.
    224227        """
    225228        # Return values
     
    380383        return (pname, uname)
    381384
     385    # End of RequestAccess support routines
     386
    382387    def RequestAccess(self, req, fid):
    383388        """
     
    564569            raise service_error(service_error.req, "No such allocation")
    565570
    566 
     571    # These are subroutines for StartSegment
    567572    def generate_ns2(self, topo, expfn, softdir, connInfo):
     573        """
     574        Convert topo into an ns2 file, decorated with appropriate commands for
     575        the particular testbed setup.  Convert all requests for software, etc
     576        to point at the staged copies on this testbed and add the federation
     577        startcommands.
     578        """
    568579        class dragon_commands:
    569580            """
     
    593604                            # XXX: do netmask right
    594605                            if type =='link':
    595                                 s = ("tb-allow-external ${%s} dragonportal " + \
    596                                         "ip %s vlan %s netmask 255.255.255.0\n") % \
     606                                s = ("tb-allow-external ${%s} " + \
     607                                        "dragonportal ip %s vlan %s " + \
     608                                        "netmask 255.255.255.0\n") % \
    597609                                        (e.name, addr, vlan)
    598610                            elif type =='lan':
    599                                 s = ("tb-allow-external ${%s} dragonportal " + \
     611                                s = ("tb-allow-external ${%s} " + \
     612                                        "dragonportal " + \
    600613                                        "ip %s vlan %s usurp %s\n") % \
    601614                                        (e.name, addr, vlan, subs)
     
    606619
    607620        class not_dragon:
     621            """
     622            Return true if a node is in the given map of dragon nodes.
     623            """
    608624            def __init__(self, map):
    609625                self.nodes = set(map.keys())
     
    612628                return e.name not in self.nodes
    613629
    614 
     630        # Main line of generate_ns2
    615631        t = topo.clone()
    616632
     633        # Create the map of nodes that need direct connections (dragon
     634        # connections) from the connInfo
    617635        dragon_map = { }
    618636        for i in [ i for i in connInfo if i['type'] == 'transit']:
     
    752770        """
    753771        Add a seer node to the given topology, with the startup command passed
    754         in.
     772        in.  Used by configure seer_services.
    755773        """
    756774        c_node = topdl.Computer(
     
    810828            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
    811829
    812 
     830    def retrieve_software(self, topo, certfile, softdir):
     831        """
     832        Collect the software that nodes in the topology need loaded and stage
     833        it locally.  This implies retrieving it from the experiment_controller
     834        and placing it into softdir.  Certfile is used to prove that this node
     835        has access to that data (it's the allocation/segment fedid).  Finally
     836        local portal and federation software is also copied to the same staging
     837        directory for simplicity - all software needed for experiment creation
     838        is in softdir.
     839        """
     840        sw = set()
     841        for e in topo.elements:
     842            for s in getattr(e, 'software', []):
     843                sw.add(s.location)
     844        for s in sw:
     845            self.log.debug("Retrieving %s" % s)
     846            try:
     847                get_url(s, certfile, softdir)
     848            except:
     849                t, v, st = sys.exc_info()
     850                raise service_error(service_error.internal,
     851                        "Error retrieving %s: %s" % (s, v))
     852
     853        # Copy local federation and portal node software to the tempdir
     854        for s in (self.federation_software, self.portal_software):
     855            for l, f in s:
     856                base = os.path.basename(f)
     857                copy_file(f, "%s/%s" % (softdir, base))
     858
     859
     860    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
     861        """
     862        Gather common configuration files, retrieve or create an experiment
     863        name and project name, and return the ssh_key filenames.  Create an
     864        allocation log bound to the state log variable as well.
     865        """
     866        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
     867        ename = None
     868        pubkey_base = None
     869        secretkey_base = None
     870        proj = None
     871        user = None
     872        alloc_log = None
     873
     874        for a in attrs:
     875            if a['attribute'] in configs:
     876                try:
     877                    self.log.debug("Retrieving %s from %s" % \
     878                            (a['attribute'], a['value']))
     879                    get_url(a['value'], certfile, tmpdir)
     880                except:
     881                    t, v, st = sys.exc_info()
     882                    raise service_error(service_error.internal,
     883                            "Error retrieving %s: %s" % (a.get('value', ""), v))
     884            if a['attribute'] == 'ssh_pubkey':
     885                pubkey_base = a['value'].rpartition('/')[2]
     886            if a['attribute'] == 'ssh_secretkey':
     887                secretkey_base = a['value'].rpartition('/')[2]
     888            if a['attribute'] == 'experiment_name':
     889                ename = a['value']
     890
     891        if not ename:
     892            ename = ""
     893            for i in range(0,5):
     894                ename += random.choice(string.ascii_letters)
     895            self.log.warn("No experiment name: picked one randomly: %s" \
     896                    % ename)
     897
     898        if not pubkey_base:
     899            raise service_error(service_error.req,
     900                    "No public key attribute")
     901
     902        if not secretkey_base:
     903            raise service_error(service_error.req,
     904                    "No secret key attribute")
     905
     906        self.state_lock.acquire()
     907        if aid in self.allocation:
     908            proj = self.allocation[aid].get('project', None)
     909            if not proj:
     910                proj = self.allocation[aid].get('sproject', None)
     911            user = self.allocation[aid].get('user', None)
     912            self.allocation[aid]['experiment'] = ename
     913            self.allocation[aid]['log'] = [ ]
     914            # Create a logger that logs to the experiment's state object as
     915            # well as to the main log file.
     916            alloc_log = logging.getLogger('fedd.access.%s' % ename)
     917            h = logging.StreamHandler(
     918                    list_log.list_log(self.allocation[aid]['log']))
     919            # XXX: there should be a global one of these rather than
     920            # repeating the code.
     921            h.setFormatter(logging.Formatter(
     922                "%(asctime)s %(name)s %(message)s",
     923                        '%d %b %y %H:%M:%S'))
     924            alloc_log.addHandler(h)
     925            self.write_state()
     926        self.state_lock.release()
     927
     928        if not proj:
     929            raise service_error(service_error.internal,
     930                    "Can't find project for %s" %aid)
     931
     932        if not user:
     933            raise service_error(service_error.internal,
     934                    "Can't find creation user for %s" %aid)
     935
     936        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
     937
     938    def finalize_experiment(self, starter, topo, aid, alloc_id):
     939        """
     940        Store key bits of experiment state in the global repository, including
     941        the response that may need to be replayed, and return the response.
     942        """
     943        # Copy the assigned names into the return topology
     944        embedding = [ ]
     945        for n in starter.node:
     946            embedding.append({
     947                'toponame': n,
     948                'physname': ["%s%s" %  (starter.node[n], self.domain)],
     949                })
     950        # Grab the log (this is some anal locking, but better safe than
     951        # sorry)
     952        self.state_lock.acquire()
     953        logv = "".join(self.allocation[aid]['log'])
     954        # It's possible that the StartSegment call gets retried (!).
     955        # if the 'started' key is in the allocation, we'll return it rather
     956        # than redo the setup.
     957        self.allocation[aid]['started'] = {
     958                'allocID': alloc_id,
     959                'allocationLog': logv,
     960                'segmentdescription': {
     961                    'topdldescription': topo.clone().to_dict()
     962                    },
     963                'embedding': embedding
     964                }
     965        retval = copy.copy(self.allocation[aid]['started'])
     966        self.write_state()
     967        self.state_lock.release()
     968        return retval
     969
     970    def remove_dirs(self, dir):
     971        """
     972        Remove the directory tree and all files rooted at dir.  Log any errors,
     973        but continue.
     974        """
     975        self.log.debug("[removedirs]: removing %s" % dir)
     976        try:
     977            for path, dirs, files in os.walk(dir, topdown=False):
     978                for f in files:
     979                    os.remove(os.path.join(path, f))
     980                for d in dirs:
     981                    os.rmdir(os.path.join(path, d))
     982            os.rmdir(dir)
     983        except EnvironmentError, e:
     984            self.log.error("Error deleting directory tree in %s" % e);
     985   
     986    # End of StartSegment support routines
    813987
    814988    def StartSegment(self, req, fid):
    815 
    816         configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
    817 
    818989        err = None  # Any service_error generated after tmpdir is created
    819990        rv = None   # Return value from segment creation
     
    821992        try:
    822993            req = req['StartSegmentRequestBody']
     994            auth_attr = req['allocID']['fedid']
     995            topref = req['segmentdescription']['topdldescription']
    823996        except KeyError:
    824997            raise service_error(server_error.req, "Badly formed request")
     
    826999        connInfo = req.get('connection', [])
    8271000        services = req.get('service', [])
    828         auth_attr = req['allocID']['fedid']
    8291001        aid = "%s" % auth_attr
    8301002        attrs = req.get('fedAttr', [])
     
    8451017        # A new request.  Do it.
    8461018
    847         if req.has_key('segmentdescription') and \
    848                 req['segmentdescription'].has_key('topdldescription'):
    849             topo = \
    850                 topdl.Topology(**req['segmentdescription']['topdldescription'])
     1019        if topref: topo = topdl.Topology(**topref)
    8511020        else:
    8521021            raise service_error(service_error.req,
    8531022                    "Request missing segmentdescription'")
    8541023       
    855         master = req.get('master', False)
    856 
    8571024        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
    8581025        try:
     
    8651032        # Try block alllows us to clean up temporary files.
    8661033        try:
    867             sw = set()
    868             for e in topo.elements:
    869                 for s in getattr(e, 'software', []):
    870                     sw.add(s.location)
    871             for s in sw:
    872                 self.log.debug("Retrieving %s" % s)
    873                 try:
    874                     get_url(s, certfile, softdir)
    875                 except:
    876                     t, v, st = sys.exc_info()
    877                     raise service_error(service_error.internal,
    878                             "Error retrieving %s: %s" % (s, v))
    879 
    880             # Copy local federation and portal node software to the tempdir
    881             for s in (self.federation_software, self.portal_software):
    882                 for l, f in s:
    883                     base = os.path.basename(f)
    884                     copy_file(f, "%s/%s" % (softdir, base))
    885 
    886             ename = None
    887             pubkey_base = None
    888             secretkey_base = None
    889             for a in attrs:
    890                 if a['attribute'] in configs:
    891                     try:
    892                         self.log.debug("Retrieving %s from %s" % \
    893                                 (a['attribute'], a['value']))
    894                         get_url(a['value'], certfile, tmpdir)
    895                     except:
    896                         t, v, st = sys.exc_info()
    897                         raise service_error(service_error.internal,
    898                                 "Error retrieving %s: %s" % (s, v))
    899                 if a['attribute'] == 'ssh_pubkey':
    900                     pubkey_base = a['value'].rpartition('/')[2]
    901                 if a['attribute'] == 'ssh_secretkey':
    902                     secretkey_base = a['value'].rpartition('/')[2]
    903                 if a['attribute'] == 'experiment_name':
    904                     ename = a['value']
    905 
    906             if not ename:
    907                 ename = ""
    908                 for i in range(0,5):
    909                     ename += random.choice(string.ascii_letters)
    910                 self.log.warn("No experiment name: picked one randomly: %s" \
    911                         % ename)
    912 
    913             if not pubkey_base:
    914                 raise service_error(service_error.req,
    915                         "No public key attribute")
    916 
    917             if not secretkey_base:
    918                 raise service_error(service_error.req,
    919                         "No secret key attribute")
    920 
    921             # If the userconf service was imported, collect the configuration
    922             # data.
     1034            self.retrieve_software(topo, certfile, softdir)
     1035            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
     1036                    self.initialize_experiment_info(attrs, aid,
     1037                            certfile, tmpdir)
     1038
     1039            # Set up userconf and seer if needed
    9231040            self.configure_userconf(services)
    9241041            self.configure_seer_services(services, topo, softdir)
    925 
    926             proj = None
    927             user = None
    928             self.state_lock.acquire()
    929             if self.allocation.has_key(aid):
    930                 proj = self.allocation[aid].get('project', None)
    931                 if not proj:
    932                     proj = self.allocation[aid].get('sproject', None)
    933                 user = self.allocation[aid].get('user', None)
    934                 self.allocation[aid]['experiment'] = ename
    935                 self.allocation[aid]['log'] = [ ]
    936                 # Create a logger that logs to the experiment's state object as
    937                 # well as to the main log file.
    938                 alloc_log = logging.getLogger('fedd.access.%s' % ename)
    939                 h = logging.StreamHandler(
    940                         list_log.list_log(self.allocation[aid]['log']))
    941                 # XXX: there should be a global one of these rather than
    942                 # repeating the code.
    943                 h.setFormatter(logging.Formatter(
    944                     "%(asctime)s %(name)s %(message)s",
    945                             '%d %b %y %H:%M:%S'))
    946                 alloc_log.addHandler(h)
    947                 self.write_state()
    948             self.state_lock.release()
    949 
    950             if not proj:
    951                 raise service_error(service_error.internal,
    952                         "Can't find project for %s" %aid)
    953 
    954             if not user:
    955                 raise service_error(service_error.internal,
    956                         "Can't find creation user for %s" %aid)
    957 
     1042            # Get and send synch store variables
    9581043            self.export_store_info(certfile, proj, ename, connInfo)
    9591044            self.import_store_info(certfile, connInfo)
     
    9621047
    9631048            self.generate_portal_configs(topo, pubkey_base,
    964                     secretkey_base, tmpdir, master, proj, ename, connInfo,
    965                     services)
     1049                    secretkey_base, tmpdir, proj, ename, connInfo, services)
    9661050            self.generate_ns2(topo, expfile,
    9671051                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
     
    9701054                    debug=self.create_debug, log=alloc_log)
    9711055            rv = starter(self, ename, proj, user, expfile, tmpdir)
    972             rvtopo = topo.clone()
    973 
    974             # Copy the assigned names into the return topology
    975             embedding = [ ]
    976             for n in starter.node:
    977                 embedding.append({
    978                     'toponame': n,
    979                     'physname': ["%s%s" %  (starter.node[n], self.domain)],
    980                     })
    981 
    982 
    9831056        except service_error, e:
    9841057            err = e
    985         except e:
    986             err = service_error(service_error.internal, str(e))
     1058        except:
     1059            t, v, st = sys.exc_info()
     1060            err = service_error(service_error.internal, "%s: %s" % \
     1061                    (v, traceback.extract_tb(st)))
    9871062
    9881063        # Walk up tmpdir, deleting as we go
    989         if self.cleanup:
    990             self.log.debug("[StartSegment]: removing %s" % tmpdir)
    991             for path, dirs, files in os.walk(tmpdir, topdown=False):
    992                 for f in files:
    993                     os.remove(os.path.join(path, f))
    994                 for d in dirs:
    995                     os.rmdir(os.path.join(path, d))
    996             os.rmdir(tmpdir)
    997         else:
    998             self.log.debug("[StartSegment]: not removing %s" % tmpdir)
     1064        if self.cleanup: self.remove_dirs(tmpdir)
     1065        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
    9991066
    10001067        if rv:
    1001             # Grab the log (this is some anal locking, but better safe than
    1002             # sorry)
    1003             self.state_lock.acquire()
    1004             logv = "".join(self.allocation[aid]['log'])
    1005             # It's possible that the StartSegment call gets retried (!).
    1006             # if the 'started' key is in the allocation, we'll return it rather
    1007             # than redo the setup.
    1008             self.allocation[aid]['started'] = {
    1009                     'allocID': req['allocID'],
    1010                     'allocationLog': logv,
    1011                     'segmentdescription': {
    1012                         'topdldescription': rvtopo.to_dict()
    1013                         },
    1014                     'embedding': embedding
    1015                     }
    1016             retval = copy.copy(self.allocation[aid]['started'])
    1017             self.write_state()
    1018             self.state_lock.release()
    1019             return retval
     1068            return self.finalize_experiment(starter, topo, aid, req['allocID'])
    10201069        elif err:
    10211070            raise service_error(service_error.federant,
     
    10371086
    10381087        self.state_lock.acquire()
    1039         if self.allocation.has_key(aid):
     1088        if aid in self.allocation:
    10401089            proj = self.allocation[aid].get('project', None)
    10411090            if not proj:
Note: See TracChangeset for help on using the changeset viewer.