source: fedd/federation/remote_service.py @ 0dc62df

compt_changesinfo-ops
Last change on this file since 0dc62df was 0dc62df, checked in by Ted Faber <faber@…>, 12 years ago

Significantly improve resilience to SSL failures. #35

  • Property mode set to 100644
File size: 20.3 KB
Line 
1#!/usr/local/bin/python
2
3import copy
4
5from socket import error as socket_error
6from socket import sslerror
7from socket import SHUT_RDWR
8
9import M2Crypto.httpslib
10from M2Crypto import SSL
11from M2Crypto.m2xmlrpclib import SSL_Transport
12from M2Crypto.SSL import SSLError
13from M2Crypto.BIO import BIOError
14from ZSI import ParseException, FaultException, SoapWriter
15
16# Underlying SOAP comms use this and we need to catch their exceptions
17import httplib
18
19from proof import proof
20from service_error import service_error
21from xmlrpclib import ServerProxy, dumps, loads, Fault, Error, Binary
22try:
23    import fedd_services
24    import fedd_internal_services
25    service_port_name = 'getfeddPortType'
26    internal_service_port_name = 'getfeddInternalPortType'
27except ImportError:
28    import fedd_client
29    import fedd_internal_client
30    fedd_services = fedd_client
31    fedd_internal_services = fedd_internal_client
32    service_port_name = 'getfeddPort'
33    internal_service_port_name = 'getfedd_internal_port'
34from util import fedd_ssl_context
35from fedid import fedid
36import parse_detail
37
38# Turn off the matching of hostname to certificate ID
39SSL.Connection.clientPostConnectionCheck = None
40
41# Used by the remote_service_base class.
42def to_binary(o):
43    """
44    A function that converts an object into an xmlrpclib.Binary using
45    either its internal packing method, or the standard Binary constructor.
46    """
47    pack = getattr(o, 'pack_xmlrpc', None)
48    if callable(pack): return Binary(pack())
49    else: return Binary(o)
50
51# Classes that encapsulate the process of making and dealing with requests to
52# WSDL-generated and XMLRPC remote accesses. 
53
54class remote_service_base:
55    """
56    This invisible base class encapsulates the functions used to massage the
57    dictionaries used to pass parameters into and out of the RPC formats.  It's
58    mostly a container for the static methods to do that work, but defines some
59    maps sued by sub classes on apply_to_tags
60    """
61    # A map used to convert fedid fields to fedid objects (when the field is
62    # already a string)
63    fedid_to_object = {'fedid': lambda x: fedid(bits=x)}
64    # A map used by apply_to_tags to convert fedids from xmlrpclib.Binary
65    # objects to fedid objects in one sweep.
66    decap_fedids = {'fedid': lambda x: fedid(bits=x.data), 
67            'credential': lambda x: x.data}
68    # A map used to encapsulate fedids into xmlrpclib.Binary objects
69    encap_fedids = {'fedid': to_binary, 'credential': to_binary}
70
71    # fields that are never unicoded, because they represent non strings.
72    do_not_unicode = set(['credential'])
73
74    @staticmethod
75    def pack_soap(container, name, contents):
76        """
77        Convert the dictionary in contents into a tree of ZSI classes.
78
79        The holder classes are constructed from factories in container and
80        assigned to either the element or attribute name.  This is used to
81        recursively create the SOAP message.
82        """
83        if getattr(contents, "__iter__", None) != None:
84            attr =getattr(container, "new_%s" % name, None)
85            if attr: obj = attr()
86            else:
87                raise TypeError("%s does not have a new_%s attribute" % \
88                        (container, name))
89            for e, v in contents.iteritems():
90                assign = getattr(obj, "set_element_%s" % e, None) or \
91                        getattr(obj, "set_attribute_%s" % e, None)
92                if isinstance(v, type(dict())):
93                    assign(remote_service_base.pack_soap(obj, e, v))
94                elif getattr(v, "__iter__", None) != None:
95                    assign([ remote_service_base.pack_soap(obj, e, val ) \
96                            for val in v])
97                elif getattr(v, "pack_soap", None) != None:
98                    assign(v.pack_soap())
99                else:
100                    assign(v)
101            return obj
102        else: return contents
103
104    @staticmethod
105    def unpack_soap(element):
106        """
107        Convert a tree of ZSI SOAP classes intro a hash.  The inverse of
108        pack_soap
109
110        Elements or elements that are empty are ignored.
111        """
112        methods = [ m for m in dir(element) \
113                if m.startswith("get_element") or m.startswith("get_attribute")]
114        if len(methods) > 0:
115            rv = { }
116            for m in methods:
117                if m.startswith("get_element_"):
118                    n = m.replace("get_element_","",1)
119                else:
120                    n = m.replace("get_attribute_", "", 1)
121                sub = getattr(element, m)()
122                if sub != None:
123                    if isinstance(sub, basestring):
124                        rv[n] = sub
125                    elif getattr(sub, "__iter__", None) != None:
126                        if len(sub) > 0: rv[n] = \
127                                [remote_service_base.unpack_soap(e) \
128                                    for e in sub]
129                    else:
130                        rv[n] = remote_service_base.unpack_soap(sub)
131            return rv
132        else: 
133            return element
134
135    @staticmethod
136    def apply_to_tags(e, map):
137        """
138        Map is an iterable of ordered pairs (tuples) that map a key to a
139        function.
140        This function walks the given message and replaces any object with a
141        key in the map with the result of applying that function to the object.
142        """
143        if isinstance(e, dict):
144            for k in e.keys():
145                if k in map:
146                    fcn = map[k]
147                    if isinstance(e[k], list):
148                        e[k] = [ fcn(b) for b in e[k]]
149                    else:
150                        e[k] = fcn(e[k])
151                elif isinstance(e[k], dict):
152                    remote_service_base.apply_to_tags(e[k], map)
153                elif isinstance(e[k], list):
154                    for ee in e[k]:
155                        remote_service_base.apply_to_tags(ee, map)
156        # Other types end the recursion - they should be leaves
157        return e
158
159    @staticmethod
160    def strip_unicode(obj):
161        """Walk through a message and convert all strings to non-unicode
162        strings"""
163        if isinstance(obj, dict):
164            for k in obj.keys():
165                obj[k] = remote_service_base.strip_unicode(obj[k])
166            return obj
167        elif isinstance(obj, basestring) and not isinstance(obj, str):
168            return str(obj)
169        elif getattr(obj, "__iter__", None):
170            return [ remote_service_base.strip_unicode(x) for x in obj]
171        else:
172            return obj
173
174    @staticmethod
175    def make_unicode(obj):
176        """Walk through a message and convert all strings to unicode"""
177        if isinstance(obj, dict):
178            for k in obj.keys():
179                if k not in remote_service_base.do_not_unicode:
180                    obj[k] = remote_service_base.make_unicode(obj[k])
181            return obj
182        elif isinstance(obj, basestring) and not isinstance(obj, unicode):
183            return unicode(obj)
184        elif getattr(obj, "__iter__", None):
185            return [ remote_service_base.make_unicode(x) for x in obj]
186        else:
187            return obj
188
189
190
191class soap_handler(remote_service_base):
192    """
193    Encapsulate the handler code to unpack and pack SOAP requests and responses
194    and call the given method.
195
196    The code to decapsulate and encapsulate parameters encoded in SOAP is the
197    same modulo a few parameters.  This is a functor that calls a fedd service
198    trhough a soap interface.  The parameters are the typecode of the request
199    parameters, the method to call (usually a bound instance of a method on a
200    fedd service providing class), the constructor of a response packet and the
201    name of the body element of that packet.  The handler takes a ParsedSoap
202    object (the request) and returns an instance of the class created by
203    constructor containing the response.  Failures of the constructor or badly
204    created constructors will result in None being returned.
205    """
206    def __init__(self, service_name, method, typecode=None,
207            constructor=None, body_name=None):
208        self.method = method
209
210        response_class_name = "%sResponseMessage" % service_name
211        request_class_name = "%sRequestMessage" % service_name
212
213        if body_name: self.body_name = body_name
214        else: self.body_name = "%sResponseBody" % service_name
215
216        if constructor: self.constructor = constructor
217        else:
218            self.constructor = self.get_class(response_class_name)
219            if not self.constructor:
220                raise service_error(service_error.internal,
221                        "Cannot find class for %s" % response_class_name)
222
223        if typecode: self.typecode = typecode
224        else: 
225            req = self.get_class(request_class_name)
226            if req:
227                self.typecode = req.typecode
228            else:
229                raise service_error(service_error.internal,
230                        "Cannot find class for %s" % request_class_name)
231
232            if not self.typecode:
233                raise service_error(service_error.internal,
234                        "Cannot get typecode for %s" % class_name)
235
236    def get_class(self, class_name):
237        return getattr(fedd_services, class_name, None) or \
238                getattr(fedd_internal_services, class_name, None)
239
240    def __call__(self, ps, fid):
241        req = ps.Parse(self.typecode)
242        # Convert the message to a dict with the fedid strings converted to
243        # fedid objects
244        req = self.apply_to_tags(self.unpack_soap(req), self.fedid_to_object)
245
246        msg = self.method(req, fid)
247
248        resp = self.constructor()
249        set_element = getattr(resp, "set_element_%s" % self.body_name, None)
250        if set_element and callable(set_element):
251            try:
252                set_element(self.pack_soap(resp, self.body_name, msg))
253                return resp
254            except (NameError, TypeError):
255                return None
256        else:
257            return None
258
259class xmlrpc_handler(remote_service_base):
260    """
261    Generate the handler code to unpack and pack XMLRPC requests and responses
262    and call the given method.
263
264    The code to marshall and unmarshall XMLRPC parameters to and from a fedd
265    service is largely the same.  This helper creates such a handler.  The
266    parameters are the method name, and the name of the body struct that
267    contains the response.  A handler is created that takes the params response
268    from an xmlrpclib.loads on the incoming rpc and a fedid and responds with
269    a hash representing the struct ro be returned to the other side.  On error
270    None is returned.  Fedid fields are decapsulated from binary and converted
271    to fedid objects on input and encapsulated as Binaries on output.
272    """
273    def __init__(self, service_name, method):
274        self.method = method
275        self.body_name = "%sResponseBody" % service_name
276
277    def __call__(self, params, fid):
278        msg = None
279
280        p = self.apply_to_tags(params[0], self.decap_fedids)
281        try:
282            msg = self.method(p, fid)
283        except service_error, e:
284            raise Fault(e.code, "%s: %s" % (e.code_string(), e.desc))
285        if msg != None:
286            return self.make_unicode(self.apply_to_tags(\
287                    { self.body_name: msg }, self.encap_fedids))
288        else:
289            return None
290
291class service_caller(remote_service_base):
292    def __init__(self, service_name, request_message=None, 
293            request_body_name=None, tracefile=None, strict=True,
294            log=None, max_retries=None, fedd_encapsulation=True):
295        self.service_name = service_name
296
297        if getattr(fedd_services.feddBindingSOAP, service_name, None):
298            self.locator = fedd_services.feddServiceLocator
299            self.port_name = service_port_name
300        elif getattr(fedd_internal_services.feddInternalBindingSOAP, 
301                service_name, None):
302            self.locator = fedd_internal_services.feddInternalServiceLocator
303            self.port_name = internal_service_port_name
304
305        if request_message: self.request_message = request_message
306        else:
307            request_message_name = "%sRequestMessage" % service_name
308            self.request_message = \
309                    getattr(fedd_services, request_message_name, None) or \
310                    getattr(fedd_internal_services, request_message_name,
311                            None)
312            if not self.request_message and strict:
313                raise service_error(service_error.internal,
314                        "Cannot find class for %s" % request_message_name)
315
316        if request_body_name is not None:
317            self.request_body_name = request_body_name
318        else: 
319            self.request_body_name = "%sRequestBody" % service_name
320
321        self.tracefile = tracefile
322        self.__call__ = self.call_service
323        if max_retries is not None: self.max_retries = max_retries
324        else: self.max_retries = 5
325        self.log = log
326        if not fedd_encapsulation:
327            self.fedid_to_object = {}
328            self.decap_fedids = {}
329            self.encap_fedids = {}
330            self.do_not_unicode = set()
331
332
333    def serialize_soap(self, req):
334        """
335        Return a string containing the message that would be sent to call this
336        service with the given request.
337        """
338        msg = self.request_message()
339        set_element = getattr(msg, "set_element_%s" % self.request_body_name,
340                None)
341        if not set_element:
342            raise service_error(service_error.internal,
343                    "Cannot get element setting method for %s" % \
344                            self.request_body_name)
345        set_element(self.pack_soap(msg, self.request_body_name, req))
346        sw = SoapWriter()
347        sw.serialize(msg)
348        return unicode(sw)
349
350    def call_xmlrpc_service(self, url, req, cert_file=None, cert_pwd=None, 
351            trusted_certs=None, context=None, tracefile=None):
352        """Send an XMLRPC request.  """
353
354
355        # If a context is given, use it.  Otherwise construct one from
356        # components.  The construction shouldn't call out for passwords.
357        if context:
358            ctx = context
359        else:
360            try:
361                ctx = fedd_ssl_context(cert_file, trusted_certs, 
362                        password=cert_pwd)
363            except SSL.SSLError, e:
364                raise service_error(service_error.server_config,
365                        "Certificates misconfigured: %s" % e)
366
367        # Of all the dumbass things.  The XMLRPC library in use here won't
368        # properly encode unicode strings, so we make a copy of req with
369        # the unicode objects converted.  We also convert the url to a
370        # basic string if it isn't one already.
371        r = self.strip_unicode(copy.deepcopy(req))
372        if self.request_body_name:
373            r  = self.apply_to_tags(\
374                    { self.request_body_name: r}, self.encap_fedids)
375        else:
376            r = self.apply_to_tags(r, self.encap_fedids)
377
378        url = str(url)
379        ok = False
380        retries = 0
381
382        while not ok and retries < self.max_retries:
383            try:
384                transport = SSL_Transport(ctx)
385                port = ServerProxy(url, transport=transport)
386                remote_method = getattr(port, self.service_name, None)
387                resp = remote_method(r)
388                ok = True
389            except socket_error, e:
390                raise service_error(service_error.connect, 
391                        "Cannot connect to %s: %s" % (url, e[1]))
392            except BIOError, e:
393                if self.log:
394                    self.log.warn("BIO error contacting %s: %s" % (url, e))
395                retries += 1
396            except sslerror, e:
397                if self.log:
398                    self.log.warn("SSL (socket) error contacting %s: %s" % 
399                            (url, e))
400                retries += 1
401            except SSLError, e:
402                if self.log:
403                    self.log.warn("SSL error contacting %s: %s" % (url, e))
404                retries += 1
405            except httplib.HTTPException, e:
406                if self.log:
407                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
408                retries +=1
409            except Fault, f:
410                raise service_error(f.faultCode, f.faultString)
411            except Error, e:
412                raise service_error(service_error.protocol, 
413                        "Remote XMLRPC Fault: %s" % e)
414
415        if retries >= self.max_retries :
416            raise service_error(service_error.connect, "Too many SSL failures")
417
418        return self.apply_to_tags(resp, self.decap_fedids) 
419
420    def hammer_port_shut(self, port):
421        """
422        If there is an ssl failure, the connection to the service often stays
423        open and idle, confusing subsequent attempts to contact it (the symptom
424        is a stuck connection).  This routine walks through the open port and
425        hammers everything as closed as it can.  It is called a couple places
426        in call_soap_service.  This is dirty code, walking the internals of a
427        couple data structures, but without it, the daemon can lock up.
428        """
429        try:
430            if port is not None:
431                binding = getattr(port, 'binding', None)
432                if binding is not None: connection = getattr(binding, 'h', None)
433                else: connection = None
434
435                if connection is not None:
436                    csock = getattr(connection, 'sock', None)
437                    if csock is not None:
438                        csock.clear()
439                        csock.close()
440                        cssocket = getattr(csock, 'socket', None)
441                        if cssocket is not None:
442                            cssocket.shutdown(SHUT_RDWR)
443                            cssocket.close()
444                        del csock
445                        connection.sock = None
446                    httplib.HTTPConnection.close(connection)
447                    if self.log:
448                        self.log.debug("Closed connection with prejudice")
449
450                del port
451                port = None
452                if self.log:
453                    self.log.debug("Annihilated port")
454        except Exception, e:
455            port = None
456
457        return port
458
459
460    def call_soap_service(self, url, req, cert_file=None, cert_pwd=None,
461            trusted_certs=None, context=None, tracefile=None):
462        """
463        Send req on to the real destination in dt and return the response
464
465        Req is just the requestType object.  This function re-wraps it.  It
466        also rethrows any faults.
467        """
468
469        tf = tracefile or self.tracefile or None
470
471        if not self.request_body_name:
472            raise service_error(service_error.internal, 
473                    "Call to soap service without a configured request body");
474
475        ok = False
476        retries = 0
477        port = None
478        while not ok and retries < self.max_retries:
479            try:
480                # If this is a retry, close the request and annihilate the port
481                port = self.hammer_port_shut(port)
482
483                # Reconstruct the full request message
484                msg = self.request_message()
485                set_element = getattr(msg, "set_element_%s" % \
486                        self.request_body_name,
487                        None)
488                if not set_element:
489                    raise service_error(service_error.internal,
490                            "Cannot get element setting method for %s" % \
491                                    self.request_body_name)
492                set_element(self.pack_soap(msg, self.request_body_name, req))
493                # If a context is given, use it.  Otherwise construct one from
494                # components.  The construction shouldn't call out for
495                # passwords.
496                if context:
497                    if self.log:
498                        self.log.debug("Context passed in to call_soap")
499                    ctx = context
500                else:
501                    if self.log:
502                        self.log.debug(
503                                "Constructing context in call_soap: %s" % \
504                                        cert_file)
505                    try:
506                        ctx = fedd_ssl_context(cert_file, trusted_certs, 
507                                password=cert_pwd)
508                    except SSL.SSLError, e:
509                        if self.log:
510                            self.log.debug("Certificate error: %s" % e)
511                        raise service_error(service_error.server_config,
512                                "Certificates misconfigured: %s" % e)
513                loc = self.locator()
514                get_port = getattr(loc, self.port_name, None)
515                if not get_port:
516                    raise service_error(service_error.internal, 
517                            "Cannot get port %s from locator" % self.port_name)
518                port = get_port(url,
519                        transport=M2Crypto.httpslib.HTTPSConnection, 
520                        transdict={ 'ssl_context' : ctx },
521                        tracefile=tf)
522                remote_method = getattr(port, self.service_name, None)
523                if not remote_method:
524                    raise service_error(service_error.internal,
525                            "Cannot get service from SOAP port")
526
527                fail_exc = None
528                if self.log:
529                    self.log.debug("Calling %s (retry %d)" % \
530                            (self.service_name, retries))
531                resp = remote_method(msg)
532                ok = True
533            except socket_error, e:
534                self.hammer_port_shut(port)
535                raise service_error(service_error.connect, 
536                        "Cannot connect to %s: %s" % (url, e[1]))
537            except BIOError, e:
538                if self.log:
539                    self.log.warn("BIO error contacting %s: %s" % (url, e))
540                fail_exc = e
541                retries += 1
542            except sslerror, e:
543                if self.log:
544                    self.log.warn("SSL (socket) error contacting %s: %s" % 
545                            (url, e))
546                retries += 1
547            except SSLError, e:
548                if self.log:
549                    self.log.warn("SSL error contacting %s: %s" % (url, e))
550                fail_exc = e
551                retries += 1
552            except httplib.HTTPException, e:
553                if self.log:
554                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
555                fail_exc = e
556                retries +=1
557            except ParseException, e:
558                port = self.hammer_port_shut(port)
559                raise service_error(service_error.protocol,
560                        "Bad format message (XMLRPC??): %s" % e)
561            except FaultException, e:
562                port = self.hammer_port_shut(port)
563                # If the method isn't implemented we get a FaultException
564                # without a detail (which would be a FeddFault).  If that's the
565                # case construct a service_error out of the SOAP fields of the
566                # fault, if they're present.
567                if e.fault.detail:
568                    det = e.fault.detail[0]
569                    ee = self.unpack_soap(det).get('FeddFaultBody', { })
570                else:
571                    ee = { 'code': service_error.internal, 
572                            'desc': e.fault.string or "Something Weird" }
573                if ee:
574                    if 'proof' in ee: 
575                        pl = [ proof.from_dict(p) for p in ee['proof']]
576                    else: 
577                        pl = None
578                    raise service_error(ee.get('code', 'no code'), 
579                            ee.get('desc','no desc'), proof=pl)
580                else:
581                    raise service_error(service_error.internal,
582                            "Unexpected fault body")
583
584        if retries >= self.max_retries and fail_exc and not ok:
585            port = self.hammer_port_shut(port)
586            raise service_error(service_error.connect, 
587                    "Too many failures: %s" % fail_exc)
588
589        # Unpack and convert fedids to objects
590        r = self.apply_to_tags(self.unpack_soap(resp), self.fedid_to_object)
591
592        #  Make sure all strings are unicode
593        r = self.make_unicode(r)
594        return r
595
596    def call_service(self, url, req, cert_file=None, cert_pwd=None, 
597        trusted_certs=None, context=None, tracefile=None):
598        p_fault = None  # Any SOAP failure (sent unless XMLRPC works)
599        resp = None
600        try:
601            # Try the SOAP request
602            resp = self.call_soap_service(url, req, 
603                    cert_file, cert_pwd, trusted_certs, context, tracefile)
604            return resp
605        except service_error, e:
606            if e.code == service_error.protocol: p_fault = None
607            else: raise
608        except FaultException, f:
609            p_fault = f.fault.detail[0]
610               
611
612        # If we could not get a valid SOAP response to the request above,
613        # try the same address using XMLRPC and let any faults flow back
614        # out.
615        if p_fault == None:
616            resp = self.call_xmlrpc_service(url, req, cert_file,
617                    cert_pwd, trusted_certs, context, tracefile)
618            return resp
619        else:
620            # Build the fault
621            ee = unpack_soap(p_fault).get('FeddFaultBody', { })
622            if ee:
623                raise service_error(ee['code'], ee['desc'])
624            else:
625                raise service_error(service_error.internal,
626                        "Unexpected fault body")
Note: See TracBrowser for help on using the repository browser.