Changeset d81971a


Ignore:
Timestamp:
Nov 17, 2008 6:19:40 PM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
7583a62
Parents:
afa43a8
Message:

checkpoint of the resource management stuff

Location:
fedd
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd_access.py

    rafa43a8 rd81971a  
    1414import string
    1515import copy
     16import pickle
    1617
    1718from fedd_access_project import access_project
     
    224225        self.restricted = [ ]
    225226        self.fedid_category = { }
     227        self.projects = { }
     228        self.keys = { }
     229        self.allocation = { }
     230        self.state = {
     231            'projects': self.projects,
     232            'allocation' : self.allocation,
     233            'keys' : self.keys
     234        }
    226235        self.fedid_default = "testbed"
    227236        if config.has_option("access", "accessdb"):
     
    230239            self.read_trust(config.get("access", "trustdb"))
    231240
     241        self.state_filename = config.get("access", "access_state", "")
    232242        self.log = logging.getLogger("fedd.access")
     243        self.read_state()
     244
    233245
    234246        # Certs are promoted from the generic to the specific, so without a
     
    279291                RequestAccessRequestMessage.typecode,\
    280292                self.RequestAccess, RequestAccessResponseMessage,\
    281                 "RequestAccessResponseBody")\
     293                "RequestAccessResponseBody"), \
     294            'ReleaseAccess': make_soap_handler(\
     295                ReleaseAccessRequestMessage.typecode,\
     296                self.ReleaseAccess, ReleaseAccessResponseMessage,\
     297                "ReleaseAccessResponseBody")\
    282298            }
    283299        self.xmlrpc_services =  {\
    284300            'RequestAccess': make_xmlrpc_handler(\
    285                 self.RequestAccess, "RequestAccessResponseBody")\
     301                self.RequestAccess, "RequestAccessResponseBody"),\
     302            'ReleaseAccess': make_xmlrpc_handler(\
     303                self.ReleaseAccess, "ReleaseAccessResponseBody")\
    286304            }
    287305
     
    326344        else:
    327345            return None
     346
     347    def write_state(self):
     348        try:
     349            f = open(self.state_filename, 'w')
     350            pickle.dump(self.state, f)
     351        except IOError, e:
     352            self.log.error("Can't write file %s: %s" % \
     353                    (self.state_filename, e))
     354        except pickle.PicklingError, e:
     355            self.log.error("Pickling problem: %s" % e)
     356        except TypeError, e:
     357            self.log.error("Pickling problem (TypeError): %s" % e)
     358
     359
     360    def read_state(self):
     361        """
     362        Read a new copy of access state.  Old state is overwritten.
     363
     364        State format is a simple pickling of the state dictionary.
     365        """
     366        try:
     367            f = open(self.state_filename, "r")
     368            self.state = pickle.load(f)
     369
     370            self.allocation = self.state['allocation']
     371            self.projects = self.state['projects']
     372            self.keys = self.state['keys']
     373
     374            self.log.debug("[read_state]: Read state from %s" % \
     375                    self.state_filename)
     376        except IOError, e:
     377            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
     378                    % (self.state_filename, e))
     379        except EOFError, e:
     380            self.log.warning("[read_state]: Empty or damaged state file: %s:"\
     381                    % self.state_filename)
     382        except pickle.UnpicklingError, e:
     383            self.log.warning(("[read_state]: No saved state: " + \
     384                    "Unpickling failed: %s") % e)
     385
    328386
    329387    def proxy_xmlrpc_request(self, dt, req):
     
    649707                    ap = self.allocate_project.dynamic_project(preq)
    650708                else:
    651                     # XXX ssh key additions
    652709                    preq = {'StaticProjectRequestBody' : \
    653710                            { 'project': \
     
    677734                raise service_error(service_error.req,
    678735                        "SSH access parameters required")
    679 
     736            # keep track of what's been added
     737            if req['allocID'].has_key('fedid'):
     738                aid = unicode(req['allocId']['fedid'])
     739            elif req['allocID'].has_key('localname'):
     740                aid = req['allocID']['localname']
     741            else:
     742                raise service_error(service_error.req, "Bad allocation ID")
     743
     744            self.allocation[aid] = { }
     745            if dyn[1]:
     746                try:
     747                    pname = ap['project']['name']['localname']
     748                except KeyError:
     749                    raise service_error(service_error.internal,
     750                            "Misformed allocation response?")
     751                if self.projects.has_key(pname): self.projects[pname] += 1
     752                else: self.projects[pname] = 1
     753                self.allocation[aid]['project'] = pname
     754
     755            self.allocation[aid]['keys'] = [ ]
     756
     757            try:
     758                for u in ap['project']['user']:
     759                    uname = u['userID']['localname']
     760                    for k in [ k['sshPubkey'] for k in u['access'] \
     761                            if k.has_key('sshPubkey') ]:
     762                        kv = "%s:%s" % (uname, k)
     763                        if self.keys.has_key(kv): self.keys[kv] += 1
     764                        else: self.keys[kv] = 1
     765                        self.allocation[aid]['keys'].append((uname, k))
     766            except KeyError:
     767                raise service_error(service_error.internal,
     768                        "Misformed allocation response?")
     769
     770            self.write_state()
    680771            resp = self.build_response(req['allocID'], ap)
    681772            return resp
     
    707798                            service_error.proxy,
    708799                            "Undefined fault from proxy??");
     800
     801    def ReleaseAccess(self, req, fid):
     802        # The dance to get into the request body
     803        if req.has_key('ReleaseAccessRequestBody'):
     804            req = req['ReleaseAccessRequestBody']
     805        else:
     806            raise service_error(service_error.req, "No request!?")
     807
     808        print req
     809        try:
     810            if req['allocID'].has_key('localname'):
     811                aid = req['allocID']['localname']
     812            elif req['allocID'].has_key('fedid'):
     813                aid = unicode(req['allocID']['fedid'])
     814            else:
     815                raise service_error(service_error.req,
     816                        "Only localnames and fedids are understood")
     817        except KeyError:
     818            raise service_error(service_error.req, "Badly formed request")
     819
     820        # If we know this allocation, reduce the reference counts and remove
     821        # the local allocations.  Otherwise report an error.  If there is an
     822        # allocation to delete, del_users will be a dictonary of sets where the
     823        # key is the user that owns the keys in the set.  We use a set to avoid
     824        # duplicates.  del_project is just the name of any dynamic project to
     825        # delete.
     826        del_users = { }
     827        del_project = None
     828        if self.allocation.has_key(aid):
     829            for k in self.allocation[aid]['keys']:
     830                kk = "%s:%s" % k
     831                self.keys[kk] -= 1
     832                if self.keys[kk] == 0:
     833                    if not del_users.has_key(k[0]):
     834                        del_users[k[0]] = set()
     835                    del_users[k[0]].add(k[1])
     836                    del self.keys[kk]
     837
     838            if self.allocation[aid].has_key('project'):
     839                pname = self.allocation[aid]['project']
     840                self.projects[pname] -= 1
     841                if self.projects[pname] == 0:
     842                    del_project = pname
     843                    del self.projects[pname]
     844
     845            del self.allocation[aid]
     846            # If we actually have resources to deallocate, prepare the call.
     847            if del_project or del_users:
     848                msg = { 'project': { }}
     849                if del_project:
     850                    msg['project']['name']['localname'] =  del_project
     851                users = [ ]
     852                for u in del_users.keys():
     853                    users.append({ 'userID': { 'localname': u },\
     854                        'access' :  \
     855                                [ {'sshPubkey' : s } for s in del_users[u]]\
     856                    })
     857                if users:
     858                    msg['project']['user'] = users
     859                print msg
     860            self.write_state()
     861            return { 'allocID': req['allocID'] }
     862        else:
     863            raise service_error(service_error.req, "No such allocation")
     864
     865
     866
  • fedd/fedd_bindings.wsdl

    rafa43a8 rd81971a  
    2828        <output>
    2929          <soap:body use="encoded" parts="tns:RequestAccessResponseBody"
     30            namespace="http://www.isi.edu/faber/fedd.wsdl"
     31            encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"/>
     32        </output>
     33        <fault>
     34          <soap:fault use="encoded"  name="tns:FeddFault"
     35            namespace="http://www.isi.edu/faber/fedd.wsdl"
     36            encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"/>
     37        </fault>
     38      </operation>
     39      <operation name="ReleaseAccess">
     40        <documentation>
     41          The bindings of this operation are straightforward SOAP RPC 1.1.
     42        </documentation>
     43        <soap:operation soapAction="ReleaseAccess"/>
     44        <input>
     45          <soap:body use="encoded" parts="tns:ReleaseAccessRequestBody"
     46            namespace="http://www.isi.edu/faber/fedd.wsdl"
     47            encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"/>
     48        </input>
     49        <output>
     50          <soap:body use="encoded" parts="tns:ReleaseAccessResponseBody"
    3051            namespace="http://www.isi.edu/faber/fedd.wsdl"
    3152            encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"/>
  • fedd/fedd_experiment_control.py

    rafa43a8 rd81971a  
    869869                "eventserver": e['eventServer'],
    870870                "project": unpack_id(p['name']),
    871                 "emulab" : e
     871                "emulab" : e,
     872                "allocID" : r['allocID'],
    872873                }
    873874        # Make the testbed name be the label the user applied
     
    883884                    tbparam[tb][key]= a['value']
    884885       
     886    def release_access(self, tb, aid):
     887        """
     888        Release access to testbed through fedd
     889        """
     890
     891        uri = self.tbmap.get(tb, None)
     892        if not uri:
     893            raise service_error(serice_error.server_config,
     894                    "Unknown testbed: %s" % tb)
     895
     896        # The basic request
     897        req = { 'allocID' : aid }
     898       
     899        # No retry loop here.  Proxy servers must correctly authenticate
     900        # themselves without help
     901
     902        try:
     903            ctx = fedd_ssl_context(self.cert_file,
     904                    self.trusted_certs, password=self.cert_pwd)
     905        except SSL.SSLError:
     906            raise service_error(service_error.server_config,
     907                    "Server certificates misconfigured")
     908
     909        loc = feddServiceLocator();
     910        port = loc.getfeddPortType(uri,
     911                transport=M2Crypto.httpslib.HTTPSConnection,
     912                transdict={ 'ssl_context' : ctx })
     913
     914        # Reconstruct the full request message
     915        msg = ReleaseAccessRequestMessage()
     916        msg.set_element_ReleaseAccessRequestBody(
     917                pack_soap(msg, "ReleaseAccessRequestBody", req))
     918
     919        try:
     920            resp = port.ReleaseAccess(msg)
     921        except ZSI.ParseException, e:
     922            raise service_error(service_error.req,
     923                    "Bad format message (XMLRPC??): %s" % str(e))
     924        except ZSI.FaultException, e:
     925            resp = e.fault.detail[0]
     926
     927        # better error coding
     928
     929
     930
    885931    def remote_splitter(self, uri, desc, master):
    886932
     
    15101556                    'name': [ { 'localname' : eid} ],\
    15111557                    'emulab': tbparams[k]['emulab'],\
     1558                    'allocID' : tbparams[k]['allocID'],\
    15121559                    'master' : k == master,\
    15131560                }
     
    17891836                    domain = fed['emulab']['domain']
    17901837                    host  = "%s%s" % (fed['emulab']['ops'], domain)
     1838                    aid = fed['allocID']
    17911839                except KeyError, e:
    17921840                    continue
     
    17971845                        'host': host,\
    17981846                        'eid': eid,\
     1847                        'aid': aid,\
    17991848                    }
    18001849            self.state_lock.release()
     
    18041853                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
    18051854
    1806             # Remove teh terminated experiment
     1855            # release the allocations
     1856            for tb in tbparams.keys():
     1857                self.release_access(tb, tbparams[tb]['aid'])
     1858
     1859            # Remove the terminated experiment
    18071860            self.state_lock.acquire()
    18081861            for id in ids:
  • fedd/fedd_messages.wsdl

    rafa43a8 rd81971a  
    1919  </message>
    2020
     21  <message name="ReleaseAccessRequestMessage">
     22    <part name="ReleaseAccessRequestBody" type="xsd1:releaseRequestType"/>
     23  </message>
     24
     25  <message name="ReleaseAccessResponseMessage">
     26    <part name="ReleaseAccessResponseBody" type="xsd1:releaseResponseType"/>
     27  </message>
    2128  <message name="CreateRequestMessage">
    2229    <part name="CreateRequestBody" type="xsd1:createRequestType"/>
     
    7784      <fault name="FeddFault" message="tns:FaultMessage"/>
    7885    </operation>
     86    <operation name="ReleaseAccess">
     87      <documentation>
     88        Release an allocation of access to a testbed. This informs the testbed
     89        that it is no longer necessary to allow the access mthods negotiated by
     90        a RequestAccess cal.
     91      </documentation>
     92      <input message="tns:ReleaseAccessRequestMessage"/>
     93      <output message="tns:ReleaseAccessResponseMessage"/>
     94      <fault name="FeddFault" message="tns:FaultMessage"/>
     95    </operation>
    7996    <operation name="Create">
    8097      <documentation>
  • fedd/fedd_types.xsd

    rafa43a8 rd81971a  
    395395      <xsd:element name="until" type="xsd:dateTime" minOccurs="0"
    396396        maxOccurs="1"/>
     397    </xsd:sequence>
     398  </xsd:complexType>
     399
     400  <xsd:complexType name="releaseRequestType">
     401    <xsd:annotation>
     402      <xsd:documentation>
     403        A request to release the access rights allocated by an earlier
     404        RequestAccess call.
     405      </xsd:documentation>
     406    </xsd:annotation>
     407    <xsd:sequence>
     408      <xsd:element name="allocID" type="tns:IDType"/>
     409    </xsd:sequence>
     410  </xsd:complexType>
     411
     412  <xsd:complexType name="releaseResponseType">
     413    <xsd:annotation>
     414      <xsd:documentation>
     415        Indication that the access has been terminated.
     416      </xsd:documentation>
     417    </xsd:annotation>
     418    <xsd:sequence>
     419      <xsd:element name="allocID" type="tns:IDType"/>
    397420    </xsd:sequence>
    398421  </xsd:complexType>
Note: See TracChangeset for help on using the changeset viewer.