source: fedd/federation/remote_service.py @ 0863dd1

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0863dd1 was 11867dde, checked in by Ted Faber <faber@…>, 14 years ago

Make the number of retries a parameter

  • Property mode set to 100644
File size: 18.0 KB
Line 
1#!/usr/local/bin/python
2
3import copy
4
5from socket import error as socket_error
6from socket import sslerror
7
8import M2Crypto.httpslib
9from M2Crypto import SSL
10from M2Crypto.m2xmlrpclib import SSL_Transport
11from M2Crypto.SSL import SSLError
12from M2Crypto.BIO import BIOError
13from ZSI import ParseException, FaultException, SoapWriter
14
15# Underlying SOAP comms use this and we need to catch their exceptions
16import httplib
17
18from service_error import service_error
19from xmlrpclib import ServerProxy, dumps, loads, Fault, Error, Binary
20
21import fedd_services
22import fedd_internal_services
23from util import fedd_ssl_context
24from fedid import fedid
25import parse_detail
26
27# Turn off the matching of hostname to certificate ID
28SSL.Connection.clientPostConnectionCheck = None
29
30# Used by the remote_service_base class.
31def to_binary(o):
32    """
33    A function that converts an object into an xmlrpclib.Binary using
34    either its internal packing method, or the standard Binary constructor.
35    """
36    pack = getattr(o, 'pack_xmlrpc', None)
37    if callable(pack): return Binary(pack())
38    else: return Binary(o)
39
40# Classes that encapsulate the process of making and dealing with requests to
41# WSDL-generated and XMLRPC remote accesses. 
42
43class remote_service_base:
44    """
45    This invisible base class encapsulates the functions used to massage the
46    dictionaries used to pass parameters into and out of the RPC formats.  It's
47    mostly a container for the static methods to do that work, but defines some
48    maps sued by sub classes on apply_to_tags
49    """
50    # A map used to convert fedid fields to fedid objects (when the field is
51    # already a string)
52    fedid_to_object = ( ('fedid', lambda x: fedid(bits=x)),)
53    # A map used by apply_to_tags to convert fedids from xmlrpclib.Binary
54    # objects to fedid objects in one sweep.
55    decap_fedids = (('fedid', lambda x: fedid(bits=x.data)),)
56    # A map used to encapsulate fedids into xmlrpclib.Binary objects
57    encap_fedids = (('fedid', to_binary),)
58
59    @staticmethod
60    def pack_soap(container, name, contents):
61        """
62        Convert the dictionary in contents into a tree of ZSI classes.
63
64        The holder classes are constructed from factories in container and
65        assigned to either the element or attribute name.  This is used to
66        recursively create the SOAP message.
67        """
68        if getattr(contents, "__iter__", None) != None:
69            attr =getattr(container, "new_%s" % name, None)
70            if attr: obj = attr()
71            else:
72                raise TypeError("%s does not have a new_%s attribute" % \
73                        (container, name))
74            for e, v in contents.iteritems():
75                assign = getattr(obj, "set_element_%s" % e, None) or \
76                        getattr(obj, "set_attribute_%s" % e, None)
77                if isinstance(v, type(dict())):
78                    assign(remote_service_base.pack_soap(obj, e, v))
79                elif getattr(v, "__iter__", None) != None:
80                    assign([ remote_service_base.pack_soap(obj, e, val ) \
81                            for val in v])
82                elif getattr(v, "pack_soap", None) != None:
83                    assign(v.pack_soap())
84                else:
85                    assign(v)
86            return obj
87        else: return contents
88
89    @staticmethod
90    def unpack_soap(element):
91        """
92        Convert a tree of ZSI SOAP classes intro a hash.  The inverse of
93        pack_soap
94
95        Elements or elements that are empty are ignored.
96        """
97        methods = [ m for m in dir(element) \
98                if m.startswith("get_element") or m.startswith("get_attribute")]
99        if len(methods) > 0:
100            rv = { }
101            for m in methods:
102                if m.startswith("get_element_"):
103                    n = m.replace("get_element_","",1)
104                else:
105                    n = m.replace("get_attribute_", "", 1)
106                sub = getattr(element, m)()
107                if sub != None:
108                    if isinstance(sub, basestring):
109                        rv[n] = sub
110                    elif getattr(sub, "__iter__", None) != None:
111                        if len(sub) > 0: rv[n] = \
112                                [remote_service_base.unpack_soap(e) \
113                                    for e in sub]
114                    else:
115                        rv[n] = remote_service_base.unpack_soap(sub)
116            return rv
117        else: 
118            return element
119
120    @staticmethod
121    def apply_to_tags(e, map):
122        """
123        Map is an iterable of ordered pairs (tuples) that map a key to a
124        function.
125        This function walks the given message and replaces any object with a
126        key in the map with the result of applying that function to the object.
127        """
128        if isinstance(e, dict):
129            for k in e.keys():
130                for tag, fcn in map:
131                    if k == tag:
132                        if isinstance(e[k], list):
133                            e[k] = [ fcn(b) for b in e[k]]
134                        else:
135                            e[k] = fcn(e[k])
136                    elif isinstance(e[k], dict):
137                        remote_service_base.apply_to_tags(e[k], map)
138                    elif isinstance(e[k], list):
139                        for ee in e[k]:
140                            remote_service_base.apply_to_tags(ee, map)
141        # Other types end the recursion - they should be leaves
142        return e
143
144    @staticmethod
145    def strip_unicode(obj):
146        """Walk through a message and convert all strings to non-unicode
147        strings"""
148        if isinstance(obj, dict):
149            for k in obj.keys():
150                obj[k] = remote_service_base.strip_unicode(obj[k])
151            return obj
152        elif isinstance(obj, basestring) and not isinstance(obj, str):
153            return str(obj)
154        elif getattr(obj, "__iter__", None):
155            return [ remote_service_base.strip_unicode(x) for x in obj]
156        else:
157            return obj
158
159    @staticmethod
160    def make_unicode(obj):
161        """Walk through a message and convert all strings to unicode"""
162        if isinstance(obj, dict):
163            for k in obj.keys():
164                obj[k] = remote_service_base.make_unicode(obj[k])
165            return obj
166        elif isinstance(obj, basestring) and not isinstance(obj, unicode):
167            return unicode(obj)
168        elif getattr(obj, "__iter__", None):
169            return [ remote_service_base.make_unicode(x) for x in obj]
170        else:
171            return obj
172
173
174
175class soap_handler(remote_service_base):
176    """
177    Encapsulate the handler code to unpack and pack SOAP requests and responses
178    and call the given method.
179
180    The code to decapsulate and encapsulate parameters encoded in SOAP is the
181    same modulo a few parameters.  This is a functor that calls a fedd service
182    trhough a soap interface.  The parameters are the typecode of the request
183    parameters, the method to call (usually a bound instance of a method on a
184    fedd service providing class), the constructor of a response packet and the
185    name of the body element of that packet.  The handler takes a ParsedSoap
186    object (the request) and returns an instance of the class created by
187    constructor containing the response.  Failures of the constructor or badly
188    created constructors will result in None being returned.
189    """
190    def __init__(self, service_name, method, typecode=None,
191            constructor=None, body_name=None):
192        self.method = method
193
194        response_class_name = "%sResponseMessage" % service_name
195        request_class_name = "%sRequestMessage" % service_name
196
197        if body_name: self.body_name = body_name
198        else: self.body_name = "%sResponseBody" % service_name
199
200        if constructor: self.constructor = constructor
201        else:
202            self.constructor = self.get_class(response_class_name)
203            if not self.constructor:
204                raise service_error(service_error.internal,
205                        "Cannot find class for %s" % response_class_name)
206
207        if typecode: self.typecode = typecode
208        else: 
209            req = self.get_class(request_class_name)
210            if req:
211                self.typecode = req.typecode
212            else:
213                raise service_error(service_error.internal,
214                        "Cannot find class for %s" % request_class_name)
215
216            if not self.typecode:
217                raise service_error(service_error.internal,
218                        "Cannot get typecode for %s" % class_name)
219
220    def get_class(self, class_name):
221        return getattr(fedd_services, class_name, None) or \
222                getattr(fedd_internal_services, class_name, None)
223
224    def __call__(self, ps, fid):
225        req = ps.Parse(self.typecode)
226        # Convert the message to a dict with the fedid strings converted to
227        # fedid objects
228        req = self.apply_to_tags(self.unpack_soap(req), self.fedid_to_object)
229
230        msg = self.method(req, fid)
231
232        resp = self.constructor()
233        set_element = getattr(resp, "set_element_%s" % self.body_name, None)
234        if set_element and callable(set_element):
235            try:
236                set_element(self.pack_soap(resp, self.body_name, msg))
237                return resp
238            except (NameError, TypeError):
239                return None
240        else:
241            return None
242
243class xmlrpc_handler(remote_service_base):
244    """
245    Generate the handler code to unpack and pack XMLRPC requests and responses
246    and call the given method.
247
248    The code to marshall and unmarshall XMLRPC parameters to and from a fedd
249    service is largely the same.  This helper creates such a handler.  The
250    parameters are the method name, and the name of the body struct that
251    contains the response.  A handler is created that takes the params response
252    from an xmlrpclib.loads on the incoming rpc and a fedid and responds with
253    a hash representing the struct ro be returned to the other side.  On error
254    None is returned.  Fedid fields are decapsulated from binary and converted
255    to fedid objects on input and encapsulated as Binaries on output.
256    """
257    def __init__(self, service_name, method):
258        self.method = method
259        self.body_name = "%sResponseBody" % service_name
260
261    def __call__(self, params, fid):
262        msg = None
263
264        p = self.apply_to_tags(params[0], self.decap_fedids)
265        try:
266            msg = self.method(p, fid)
267        except service_error, e:
268            raise Fault(e.code, "%s: %s" % (e.code_string(), e.desc))
269        if msg != None:
270            return self.make_unicode(self.apply_to_tags(\
271                    { self.body_name: msg }, self.encap_fedids))
272        else:
273            return None
274
275class service_caller(remote_service_base):
276    def __init__(self, service_name, request_message=None, 
277            request_body_name=None, tracefile=None, strict=True,
278            log=None, max_retries=None):
279        self.service_name = service_name
280
281        if getattr(fedd_services.feddBindingSOAP, service_name, None):
282            self.locator = fedd_services.feddServiceLocator
283            self.port_name = 'getfeddPortType'
284        elif getattr(fedd_internal_services.feddInternalBindingSOAP, 
285                service_name, None):
286            self.locator = fedd_internal_services.feddInternalServiceLocator
287            self.port_name = 'getfeddInternalPortType'
288
289        if request_message: self.request_message = request_message
290        else:
291            request_message_name = "%sRequestMessage" % service_name
292            self.request_message = \
293                    getattr(fedd_services, request_message_name, None) or \
294                    getattr(fedd_internal_services, request_message_name, None)
295            if not self.request_message and strict:
296                raise service_error(service_error.internal,
297                        "Cannot find class for %s" % request_message_name)
298
299        if request_body_name is not None:
300            self.request_body_name = request_body_name
301        else: 
302            self.request_body_name = "%sRequestBody" % service_name
303
304        self.tracefile = tracefile
305        self.__call__ = self.call_service
306        if max_retries is not None: self.max_retries = max_retries
307        else: self.max_retries = 5
308        self.log = log
309
310    def serialize_soap(self, req):
311        """
312        Return a string containing the message that would be sent to call this
313        service with the given request.
314        """
315        msg = self.request_message()
316        set_element = getattr(msg, "set_element_%s" % self.request_body_name,
317                None)
318        if not set_element:
319            raise service_error(service_error.internal,
320                    "Cannot get element setting method for %s" % \
321                            self.request_body_name)
322        set_element(self.pack_soap(msg, self.request_body_name, req))
323        sw = SoapWriter()
324        sw.serialize(msg)
325        return unicode(sw)
326
327    def call_xmlrpc_service(self, url, req, cert_file=None, cert_pwd=None, 
328            trusted_certs=None, context=None, tracefile=None):
329        """Send an XMLRPC request.  """
330
331
332        # If a context is given, use it.  Otherwise construct one from
333        # components.  The construction shouldn't call out for passwords.
334        if context:
335            ctx = context
336        else:
337            try:
338                ctx = fedd_ssl_context(cert_file, trusted_certs, 
339                        password=cert_pwd)
340            except SSL.SSLError:
341                raise service_error(service_error.server_config,
342                        "Certificates misconfigured")
343
344        # Of all the dumbass things.  The XMLRPC library in use here won't
345        # properly encode unicode strings, so we make a copy of req with
346        # the unicode objects converted.  We also convert the url to a
347        # basic string if it isn't one already.
348        r = self.strip_unicode(copy.deepcopy(req))
349        if self.request_body_name:
350            r  = self.apply_to_tags(\
351                    { self.request_body_name: r}, self.encap_fedids)
352        else:
353            r = self.apply_to_tags(r, self.encap_fedids)
354
355        url = str(url)
356        ok = False
357        retries = 0
358
359        while not ok and retries < self.max_retries:
360            try:
361                transport = SSL_Transport(ctx)
362                port = ServerProxy(url, transport=transport)
363                remote_method = getattr(port, self.service_name, None)
364                resp = remote_method(r)
365                ok = True
366            except socket_error, e:
367                raise service_error(service_error.connect, 
368                        "Cannot connect to %s: %s" % (url, e[1]))
369            except BIOError, e:
370                if self.log:
371                    self.log.warn("BIO error contacting %s: %s" % (url, e))
372                retries += 1
373            except sslerror, e:
374                if self.log:
375                    self.log.warn("SSL (socket) error contacting %s: %s" % 
376                            (url, e))
377                retries += 1
378            except SSLError, e:
379                if self.log:
380                    self.log.warn("SSL error contacting %s: %s" % (url, e))
381                retries += 1
382            except httplib.HTTPException, e:
383                if self.log:
384                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
385                retries +=1
386            except Fault, f:
387                raise service_error(f.faultCode, f.faultString)
388            except Error, e:
389                raise service_error(service_error.protocol, 
390                        "Remote XMLRPC Fault: %s" % e)
391
392        if retries >= self.max_retries :
393            raise service_error(service_error.connect, "Too many SSL failures")
394
395        return self.apply_to_tags(resp, self.decap_fedids) 
396
397    def call_soap_service(self, url, req, cert_file=None, cert_pwd=None,
398            trusted_certs=None, context=None, tracefile=None):
399        """
400        Send req on to the real destination in dt and return the response
401
402        Req is just the requestType object.  This function re-wraps it.  It
403        also rethrows any faults.
404        """
405
406        tf = tracefile or self.tracefile or None
407
408        if not self.request_body_name:
409            raise service_error(service_error.internal, 
410                    "Call to soap service without a configured request body");
411
412        ok = False
413        retries = 0
414        while not ok and retries < self.max_retries:
415            try:
416                # Reconstruct the full request message
417                msg = self.request_message()
418                set_element = getattr(msg, "set_element_%s" % \
419                        self.request_body_name,
420                        None)
421                if not set_element:
422                    raise service_error(service_error.internal,
423                            "Cannot get element setting method for %s" % \
424                                    self.request_body_name)
425                set_element(self.pack_soap(msg, self.request_body_name, req))
426                # If a context is given, use it.  Otherwise construct one from
427                # components.  The construction shouldn't call out for
428                # passwords.
429                if context:
430                    if self.log:
431                        self.log.debug("Context passed in to call_soap")
432                    ctx = context
433                else:
434                    if self.log:
435                        self.log.debug(
436                                "Constructing context in call_soap: %s" % \
437                                        cert_file)
438                    try:
439                        ctx = fedd_ssl_context(cert_file, trusted_certs, 
440                                password=cert_pwd)
441                    except SSL.SSLError, e:
442                        if self.log:
443                            self.log.debug("Certificate error: %s" % e)
444                        raise service_error(service_error.server_config,
445                                "Certificates misconfigured")
446                loc = self.locator()
447                get_port = getattr(loc, self.port_name, None)
448                if not get_port:
449                    raise service_error(service_error.internal, 
450                            "Cannot get port %s from locator" % self.port_name)
451                port = get_port(url,
452                        transport=M2Crypto.httpslib.HTTPSConnection, 
453                        transdict={ 'ssl_context' : ctx },
454                        tracefile=tf)
455                remote_method = getattr(port, self.service_name, None)
456                if not remote_method:
457                    raise service_error(service_error.internal,
458                            "Cannot get service from SOAP port")
459
460                fail_exc = None
461                if self.log:
462                    self.log.debug("Calling %s (retry %d)" % \
463                            (self.service_name, retries))
464                resp = remote_method(msg)
465                ok = True
466            except socket_error, e:
467                raise service_error(service_error.connect, 
468                        "Cannot connect to %s: %s" % (url, e[1]))
469            except BIOError, e:
470                if self.log:
471                    self.log.warn("BIO error contacting %s: %s" % (url, e))
472                fail_exc = e
473                retries += 1
474            except sslerror, e:
475                if self.log:
476                    self.log.warn("SSL (socket) error contacting %s: %s" % 
477                            (url, e))
478                retries += 1
479            except SSLError, e:
480                if self.log:
481                    self.log.warn("SSL error contacting %s: %s" % (url, e))
482                fail_exc = e
483                retries += 1
484            except httplib.HTTPException, e:
485                if self.log:
486                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
487                fail_exc = e
488                retries +=1
489            except ParseException, e:
490                raise service_error(service_error.protocol,
491                        "Bad format message (XMLRPC??): %s" % e)
492            except FaultException, e:
493                # If the method isn't implemented we get a FaultException
494                # without a detail (which would be a FeddFault).  If that's the
495                # case construct a service_error out of the SOAP fields of the
496                # fault, if they're present.
497                if e.fault.detail:
498                    det = e.fault.detail[0]
499                    ee = self.unpack_soap(det).get('FeddFaultBody', { })
500                else:
501                    ee = { 'code': service_error.internal, 
502                            'desc': e.fault.string or "Something Weird" }
503                if ee:
504                    raise service_error(ee['code'], ee['desc'])
505                else:
506                    raise service_error(service_error.internal,
507                            "Unexpected fault body")
508
509        if retries >= self.max_retries and fail_exc and not ok:
510            raise service_error(service_error.connect, 
511                    "Too many failures: %s" % fail_exc)
512
513        # Unpack and convert fedids to objects
514        r = self.apply_to_tags(self.unpack_soap(resp), self.fedid_to_object)
515
516        #  Make sure all strings are unicode
517        r = self.make_unicode(r)
518        return r
519
520    def call_service(self, url, req, cert_file=None, cert_pwd=None, 
521        trusted_certs=None, context=None, tracefile=None):
522        p_fault = None  # Any SOAP failure (sent unless XMLRPC works)
523        resp = None
524        try:
525            # Try the SOAP request
526            resp = self.call_soap_service(url, req, 
527                    cert_file, cert_pwd, trusted_certs, context, tracefile)
528            return resp
529        except service_error, e:
530            if e.code == service_error.protocol: p_fault = None
531            else: raise
532        except FaultException, f:
533            p_fault = f.fault.detail[0]
534               
535
536        # If we could not get a valid SOAP response to the request above,
537        # try the same address using XMLRPC and let any faults flow back
538        # out.
539        if p_fault == None:
540            resp = self.call_xmlrpc_service(url, req, cert_file,
541                    cert_pwd, trusted_certs, context, tracefile)
542            return resp
543        else:
544            # Build the fault
545            ee = unpack_soap(p_fault).get('FeddFaultBody', { })
546            if ee:
547                raise service_error(ee['code'], ee['desc'])
548            else:
549                raise service_error(service_error.internal,
550                        "Unexpected fault body")
Note: See TracBrowser for help on using the repository browser.