source: fedd/federation/remote_service.py @ a80a4a7

Last change on this file since a80a4a7 was 75605c7, checked in by Ted Faber <faber@…>, 11 years ago

Correct logging on the weird branch

  • Property mode set to 100644
File size: 20.6 KB
Line 
1#!/usr/local/bin/python
2
3import copy
4import traceback
5
6from socket import error as socket_error
7from socket import sslerror
8from socket import SHUT_RDWR
9
10import M2Crypto.httpslib
11from M2Crypto import SSL
12from M2Crypto.m2xmlrpclib import SSL_Transport
13from M2Crypto.SSL import SSLError
14from M2Crypto.BIO import BIOError
15from ZSI import ParseException, FaultException, SoapWriter
16
17# Underlying SOAP comms use this and we need to catch their exceptions
18import httplib
19
20from proof import proof
21from service_error import service_error
22from xmlrpclib import ServerProxy, dumps, loads, Fault, Error, Binary
23try:
24    import fedd_services
25    import fedd_internal_services
26    service_port_name = 'getfeddPortType'
27    internal_service_port_name = 'getfeddInternalPortType'
28except ImportError:
29    import fedd_client
30    import fedd_internal_client
31    fedd_services = fedd_client
32    fedd_internal_services = fedd_internal_client
33    service_port_name = 'getfeddPort'
34    internal_service_port_name = 'getfedd_internal_port'
35from util import fedd_ssl_context
36from deter import fedid
37import parse_detail
38
39# Turn off the matching of hostname to certificate ID
40SSL.Connection.clientPostConnectionCheck = None
41
42# Used by the remote_service_base class.
43def to_binary(o):
44    """
45    A function that converts an object into an xmlrpclib.Binary using
46    either its internal packing method, or the standard Binary constructor.
47    """
48    pack = getattr(o, 'pack_xmlrpc', None)
49    if callable(pack): return Binary(pack())
50    else: return Binary(o)
51
52# Classes that encapsulate the process of making and dealing with requests to
53# WSDL-generated and XMLRPC remote accesses. 
54
55class remote_service_base:
56    """
57    This invisible base class encapsulates the functions used to massage the
58    dictionaries used to pass parameters into and out of the RPC formats.  It's
59    mostly a container for the static methods to do that work, but defines some
60    maps sued by sub classes on apply_to_tags
61    """
62    # A map used to convert fedid fields to fedid objects (when the field is
63    # already a string)
64    fedid_to_object = {'fedid': lambda x: fedid(bits=x)}
65    # A map used by apply_to_tags to convert fedids from xmlrpclib.Binary
66    # objects to fedid objects in one sweep.
67    decap_fedids = {'fedid': lambda x: fedid(bits=x.data), 
68            'credential': lambda x: x.data}
69    # A map used to encapsulate fedids into xmlrpclib.Binary objects
70    encap_fedids = {'fedid': to_binary, 'credential': to_binary}
71
72    # fields that are never unicoded, because they represent non strings.
73    do_not_unicode = set(['credential'])
74
75    @staticmethod
76    def pack_soap(container, name, contents):
77        """
78        Convert the dictionary in contents into a tree of ZSI classes.
79
80        The holder classes are constructed from factories in container and
81        assigned to either the element or attribute name.  This is used to
82        recursively create the SOAP message.
83        """
84        if getattr(contents, "__iter__", None) != None:
85            attr =getattr(container, "new_%s" % name, None)
86            if attr: obj = attr()
87            else:
88                raise TypeError("%s does not have a new_%s attribute" % \
89                        (container, name))
90            for e, v in contents.iteritems():
91                assign = getattr(obj, "set_element_%s" % e, None) or \
92                        getattr(obj, "set_attribute_%s" % e, None)
93                if isinstance(v, type(dict())):
94                    assign(remote_service_base.pack_soap(obj, e, v))
95                elif getattr(v, "__iter__", None) != None:
96                    assign([ remote_service_base.pack_soap(obj, e, val ) \
97                            for val in v])
98                elif getattr(v, "pack_soap", None) != None:
99                    assign(v.pack_soap())
100                else:
101                    assign(v)
102            return obj
103        else: return contents
104
105    @staticmethod
106    def unpack_soap(element):
107        """
108        Convert a tree of ZSI SOAP classes intro a hash.  The inverse of
109        pack_soap
110
111        Elements or elements that are empty are ignored.
112        """
113        methods = [ m for m in dir(element) \
114                if m.startswith("get_element") or m.startswith("get_attribute")]
115        if len(methods) > 0:
116            rv = { }
117            for m in methods:
118                if m.startswith("get_element_"):
119                    n = m.replace("get_element_","",1)
120                else:
121                    n = m.replace("get_attribute_", "", 1)
122                sub = getattr(element, m)()
123                if sub != None:
124                    if isinstance(sub, basestring):
125                        rv[n] = sub
126                    elif getattr(sub, "__iter__", None) != None:
127                        if len(sub) > 0: rv[n] = \
128                                [remote_service_base.unpack_soap(e) \
129                                    for e in sub]
130                    else:
131                        rv[n] = remote_service_base.unpack_soap(sub)
132            return rv
133        else: 
134            return element
135
136    @staticmethod
137    def apply_to_tags(e, map):
138        """
139        Map is an iterable of ordered pairs (tuples) that map a key to a
140        function.
141        This function walks the given message and replaces any object with a
142        key in the map with the result of applying that function to the object.
143        """
144        if isinstance(e, dict):
145            for k in e.keys():
146                if k in map:
147                    fcn = map[k]
148                    if isinstance(e[k], list):
149                        e[k] = [ fcn(b) for b in e[k]]
150                    else:
151                        e[k] = fcn(e[k])
152                elif isinstance(e[k], dict):
153                    remote_service_base.apply_to_tags(e[k], map)
154                elif isinstance(e[k], list):
155                    for ee in e[k]:
156                        remote_service_base.apply_to_tags(ee, map)
157        # Other types end the recursion - they should be leaves
158        return e
159
160    @staticmethod
161    def strip_unicode(obj):
162        """Walk through a message and convert all strings to non-unicode
163        strings"""
164        if isinstance(obj, dict):
165            for k in obj.keys():
166                obj[k] = remote_service_base.strip_unicode(obj[k])
167            return obj
168        elif isinstance(obj, basestring) and not isinstance(obj, str):
169            return str(obj)
170        elif getattr(obj, "__iter__", None):
171            return [ remote_service_base.strip_unicode(x) for x in obj]
172        else:
173            return obj
174
175    @staticmethod
176    def make_unicode(obj):
177        """Walk through a message and convert all strings to unicode"""
178        if isinstance(obj, dict):
179            for k in obj.keys():
180                if k not in remote_service_base.do_not_unicode:
181                    obj[k] = remote_service_base.make_unicode(obj[k])
182            return obj
183        elif isinstance(obj, basestring) and not isinstance(obj, unicode):
184            return unicode(obj)
185        elif getattr(obj, "__iter__", None):
186            return [ remote_service_base.make_unicode(x) for x in obj]
187        else:
188            return obj
189
190
191
192class soap_handler(remote_service_base):
193    """
194    Encapsulate the handler code to unpack and pack SOAP requests and responses
195    and call the given method.
196
197    The code to decapsulate and encapsulate parameters encoded in SOAP is the
198    same modulo a few parameters.  This is a functor that calls a fedd service
199    trhough a soap interface.  The parameters are the typecode of the request
200    parameters, the method to call (usually a bound instance of a method on a
201    fedd service providing class), the constructor of a response packet and the
202    name of the body element of that packet.  The handler takes a ParsedSoap
203    object (the request) and returns an instance of the class created by
204    constructor containing the response.  Failures of the constructor or badly
205    created constructors will result in None being returned.
206    """
207    def __init__(self, service_name, method, typecode=None,
208            constructor=None, body_name=None):
209        self.method = method
210
211        response_class_name = "%sResponseMessage" % service_name
212        request_class_name = "%sRequestMessage" % service_name
213
214        if body_name: self.body_name = body_name
215        else: self.body_name = "%sResponseBody" % service_name
216
217        if constructor: self.constructor = constructor
218        else:
219            self.constructor = self.get_class(response_class_name)
220            if not self.constructor:
221                raise service_error(service_error.internal,
222                        "Cannot find class for %s" % response_class_name)
223
224        if typecode: self.typecode = typecode
225        else: 
226            req = self.get_class(request_class_name)
227            if req:
228                self.typecode = req.typecode
229            else:
230                raise service_error(service_error.internal,
231                        "Cannot find class for %s" % request_class_name)
232
233            if not self.typecode:
234                raise service_error(service_error.internal,
235                        "Cannot get typecode for %s" % class_name)
236
237    def get_class(self, class_name):
238        return getattr(fedd_services, class_name, None) or \
239                getattr(fedd_internal_services, class_name, None)
240
241    def __call__(self, ps, fid):
242        req = ps.Parse(self.typecode)
243        # Convert the message to a dict with the fedid strings converted to
244        # fedid objects
245        req = self.apply_to_tags(self.unpack_soap(req), self.fedid_to_object)
246
247        msg = self.method(req, fid)
248
249        resp = self.constructor()
250        set_element = getattr(resp, "set_element_%s" % self.body_name, None)
251        if set_element and callable(set_element):
252            try:
253                set_element(self.pack_soap(resp, self.body_name, msg))
254                return resp
255            except (NameError, TypeError):
256                return None
257        else:
258            return None
259
260class xmlrpc_handler(remote_service_base):
261    """
262    Generate the handler code to unpack and pack XMLRPC requests and responses
263    and call the given method.
264
265    The code to marshall and unmarshall XMLRPC parameters to and from a fedd
266    service is largely the same.  This helper creates such a handler.  The
267    parameters are the method name, and the name of the body struct that
268    contains the response.  A handler is created that takes the params response
269    from an xmlrpclib.loads on the incoming rpc and a fedid and responds with
270    a hash representing the struct ro be returned to the other side.  On error
271    None is returned.  Fedid fields are decapsulated from binary and converted
272    to fedid objects on input and encapsulated as Binaries on output.
273    """
274    def __init__(self, service_name, method):
275        self.method = method
276        self.body_name = "%sResponseBody" % service_name
277
278    def __call__(self, params, fid):
279        msg = None
280
281        p = self.apply_to_tags(params[0], self.decap_fedids)
282        try:
283            msg = self.method(p, fid)
284        except service_error, e:
285            raise Fault(e.code, "%s: %s" % (e.code_string(), e.desc))
286        if msg != None:
287            return self.make_unicode(self.apply_to_tags(\
288                    { self.body_name: msg }, self.encap_fedids))
289        else:
290            return None
291
292class service_caller(remote_service_base):
293    def __init__(self, service_name, request_message=None, 
294            request_body_name=None, tracefile=None, strict=True,
295            log=None, max_retries=None, fedd_encapsulation=True):
296        self.service_name = service_name
297
298        if getattr(fedd_services.feddBindingSOAP, service_name, None):
299            self.locator = fedd_services.feddServiceLocator
300            self.port_name = service_port_name
301        elif getattr(fedd_internal_services.feddInternalBindingSOAP, 
302                service_name, None):
303            self.locator = fedd_internal_services.feddInternalServiceLocator
304            self.port_name = internal_service_port_name
305
306        if request_message: self.request_message = request_message
307        else:
308            request_message_name = "%sRequestMessage" % service_name
309            self.request_message = \
310                    getattr(fedd_services, request_message_name, None) or \
311                    getattr(fedd_internal_services, request_message_name,
312                            None)
313            if not self.request_message and strict:
314                raise service_error(service_error.internal,
315                        "Cannot find class for %s" % request_message_name)
316
317        if request_body_name is not None:
318            self.request_body_name = request_body_name
319        else: 
320            self.request_body_name = "%sRequestBody" % service_name
321
322        self.tracefile = tracefile
323        self.__call__ = self.call_service
324        if max_retries is not None: self.max_retries = max_retries
325        else: self.max_retries = 5
326        self.log = log
327        if not fedd_encapsulation:
328            self.fedid_to_object = {}
329            self.decap_fedids = {}
330            self.encap_fedids = {}
331            self.do_not_unicode = set()
332
333
334    def serialize_soap(self, req):
335        """
336        Return a string containing the message that would be sent to call this
337        service with the given request.
338        """
339        msg = self.request_message()
340        set_element = getattr(msg, "set_element_%s" % self.request_body_name,
341                None)
342        if not set_element:
343            raise service_error(service_error.internal,
344                    "Cannot get element setting method for %s" % \
345                            self.request_body_name)
346        set_element(self.pack_soap(msg, self.request_body_name, req))
347        sw = SoapWriter()
348        sw.serialize(msg)
349        return unicode(sw)
350
351    def call_xmlrpc_service(self, url, req, cert_file=None, cert_pwd=None, 
352            trusted_certs=None, context=None, tracefile=None):
353        """Send an XMLRPC request.  """
354
355
356        # If a context is given, use it.  Otherwise construct one from
357        # components.  The construction shouldn't call out for passwords.
358        if context:
359            ctx = context
360        else:
361            try:
362                ctx = fedd_ssl_context(cert_file, trusted_certs, 
363                        password=cert_pwd)
364            except SSL.SSLError, e:
365                raise service_error(service_error.server_config,
366                        "Certificates misconfigured: %s" % e)
367
368        # Of all the dumbass things.  The XMLRPC library in use here won't
369        # properly encode unicode strings, so we make a copy of req with
370        # the unicode objects converted.  We also convert the url to a
371        # basic string if it isn't one already.
372        r = self.strip_unicode(copy.deepcopy(req))
373        if self.request_body_name:
374            r  = self.apply_to_tags(\
375                    { self.request_body_name: r}, self.encap_fedids)
376        else:
377            r = self.apply_to_tags(r, self.encap_fedids)
378
379        url = str(url)
380        ok = False
381        retries = 0
382
383        while not ok and retries < self.max_retries:
384            try:
385                transport = SSL_Transport(ctx)
386                port = ServerProxy(url, transport=transport)
387                remote_method = getattr(port, self.service_name, None)
388                resp = remote_method(r)
389                ok = True
390            except socket_error, e:
391                raise service_error(service_error.connect, 
392                        "Cannot connect to %s: %s" % (url, e[1]))
393            except BIOError, e:
394                if self.log:
395                    self.log.warn("BIO error contacting %s: %s" % (url, e))
396                retries += 1
397            except sslerror, e:
398                if self.log:
399                    self.log.warn("SSL (socket) error contacting %s: %s" % 
400                            (url, e))
401                retries += 1
402            except SSLError, e:
403                if self.log:
404                    self.log.warn("SSL error contacting %s: %s" % (url, e))
405                retries += 1
406            except httplib.HTTPException, e:
407                if self.log:
408                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
409                retries +=1
410            except Fault, f:
411                raise service_error(f.faultCode, f.faultString)
412            except Error, e:
413                raise service_error(service_error.protocol, 
414                        "Remote XMLRPC Fault: %s" % e)
415
416        if retries >= self.max_retries :
417            raise service_error(service_error.connect, "Too many SSL failures")
418
419        return self.apply_to_tags(resp, self.decap_fedids) 
420
421    def hammer_port_shut(self, port):
422        """
423        If there is an ssl failure, the connection to the service often stays
424        open and idle, confusing subsequent attempts to contact it (the symptom
425        is a stuck connection).  This routine walks through the open port and
426        hammers everything as closed as it can.  It is called a couple places
427        in call_soap_service.  This is dirty code, walking the internals of a
428        couple data structures, but without it, the daemon can lock up.
429        """
430        try:
431            if port is not None:
432                binding = getattr(port, 'binding', None)
433                if binding is not None: connection = getattr(binding, 'h', None)
434                else: connection = None
435
436                if connection is not None:
437                    csock = getattr(connection, 'sock', None)
438                    if csock is not None:
439                        csock.clear()
440                        csock.close()
441                        cssocket = getattr(csock, 'socket', None)
442                        if cssocket is not None:
443                            cssocket.shutdown(SHUT_RDWR)
444                            cssocket.close()
445                        del csock
446                        connection.sock = None
447                    httplib.HTTPConnection.close(connection)
448                    if self.log:
449                        self.log.debug("Closed connection with prejudice")
450
451                del port
452                port = None
453                if self.log:
454                    self.log.debug("Annihilated port")
455        except Exception, e:
456            port = None
457
458        return port
459
460
461    def call_soap_service(self, url, req, cert_file=None, cert_pwd=None,
462            trusted_certs=None, context=None, tracefile=None):
463        """
464        Send req on to the real destination in dt and return the response
465
466        Req is just the requestType object.  This function re-wraps it.  It
467        also rethrows any faults.
468        """
469
470        tf = tracefile or self.tracefile or None
471
472        if not self.request_body_name:
473            raise service_error(service_error.internal, 
474                    "Call to soap service without a configured request body");
475
476        ok = False
477        retries = 0
478        port = None
479        while not ok and retries < self.max_retries:
480            try:
481                # If this is a retry, close the request and annihilate the port
482                port = self.hammer_port_shut(port)
483
484                # Reconstruct the full request message
485                msg = self.request_message()
486                set_element = getattr(msg, "set_element_%s" % \
487                        self.request_body_name,
488                        None)
489                if not set_element:
490                    raise service_error(service_error.internal,
491                            "Cannot get element setting method for %s" % \
492                                    self.request_body_name)
493                set_element(self.pack_soap(msg, self.request_body_name, req))
494                # If a context is given, use it.  Otherwise construct one from
495                # components.  The construction shouldn't call out for
496                # passwords.
497                if context:
498                    if self.log:
499                        self.log.debug("Context passed in to call_soap")
500                    ctx = context
501                else:
502                    if self.log:
503                        self.log.debug(
504                                "Constructing context in call_soap: %s" % \
505                                        cert_file)
506                    try:
507                        ctx = fedd_ssl_context(cert_file, trusted_certs, 
508                                password=cert_pwd)
509                    except SSL.SSLError, e:
510                        if self.log:
511                            self.log.debug("Certificate error: %s" % e)
512                        raise service_error(service_error.server_config,
513                                "Certificates misconfigured: %s" % e)
514                loc = self.locator()
515                get_port = getattr(loc, self.port_name, None)
516                if not get_port:
517                    raise service_error(service_error.internal, 
518                            "Cannot get port %s from locator" % self.port_name)
519                port = get_port(url,
520                        transport=M2Crypto.httpslib.HTTPSConnection, 
521                        transdict={ 'ssl_context' : ctx },
522                        tracefile=tf)
523                remote_method = getattr(port, self.service_name, None)
524                if not remote_method:
525                    raise service_error(service_error.internal,
526                            "Cannot get service from SOAP port")
527
528                fail_exc = None
529                if self.log:
530                    self.log.debug("Calling %s (retry %d)" % \
531                            (self.service_name, retries))
532                resp = remote_method(msg)
533                ok = True
534            except socket_error, e:
535                self.hammer_port_shut(port)
536                raise service_error(service_error.connect, 
537                        "Cannot connect to %s: %s" % (url, e[1]))
538            except BIOError, e:
539                if self.log:
540                    self.log.warn("BIO error contacting %s: %s" % (url, e))
541                fail_exc = e
542                retries += 1
543            except sslerror, e:
544                if self.log:
545                    self.log.warn("SSL (socket) error contacting %s: %s" % 
546                            (url, e))
547                retries += 1
548            except SSLError, e:
549                if self.log:
550                    self.log.warn("SSL error contacting %s: %s" % (url, e))
551                fail_exc = e
552                retries += 1
553            except httplib.HTTPException, e:
554                if self.log:
555                    self.log.warn("HTTP error contacting %s: %s" % (url, e))
556                fail_exc = e
557                retries +=1
558            except ParseException, e:
559                port = self.hammer_port_shut(port)
560                raise service_error(service_error.protocol,
561                        "Bad format message (XMLRPC??): %s" % e)
562            except FaultException, e:
563                if self.log:
564                    self.log.debug('SOAP Fault')
565                port = self.hammer_port_shut(port)
566                # If the method isn't implemented we get a FaultException
567                # without a detail (which would be a FeddFault).  If that's the
568                # case construct a service_error out of the SOAP fields of the
569                # fault, if they're present.
570                if e.fault.detail:
571                    det = e.fault.detail[0]
572                    ee = self.unpack_soap(det).get('FeddFaultBody', { })
573                else:
574                    ee = { 'code': service_error.internal, 
575                            'desc': e.fault.string or "Something Weird" }
576                if ee:
577                    if 'proof' in ee: 
578                        pl = [ proof.from_dict(p) for p in ee['proof']]
579                    else: 
580                        pl = None
581                    raise service_error(ee.get('code', 'no code'), 
582                            ee.get('desc','no desc'), proof=pl)
583                else:
584                    raise service_error(service_error.internal,
585                            "Unexpected fault body")
586            except:
587                if self.log:
588                    self.log.error('Something weird: %s' % \
589                            traceback.format_exc())
590                raise service_error(service_error.internal, 
591                        'Something weird: %s' % traceback.format_exc())
592
593
594        if retries >= self.max_retries and fail_exc and not ok:
595            port = self.hammer_port_shut(port)
596            raise service_error(service_error.connect, 
597                    "Too many failures: %s" % fail_exc)
598
599        # Unpack and convert fedids to objects
600        r = self.apply_to_tags(self.unpack_soap(resp), self.fedid_to_object)
601
602        #  Make sure all strings are unicode
603        r = self.make_unicode(r)
604        return r
605
606    def call_service(self, url, req, cert_file=None, cert_pwd=None, 
607        trusted_certs=None, context=None, tracefile=None):
608        p_fault = None  # Any SOAP failure (sent unless XMLRPC works)
609        resp = None
610        try:
611            # Try the SOAP request
612            resp = self.call_soap_service(url, req, 
613                    cert_file, cert_pwd, trusted_certs, context, tracefile)
614            return resp
615        except service_error, e:
616            if e.code == service_error.protocol: p_fault = None
617            else: raise
618        except FaultException, f:
619            p_fault = f.fault.detail[0]
620               
621
622        # If we could not get a valid SOAP response to the request above,
623        # try the same address using XMLRPC and let any faults flow back
624        # out.
625        if p_fault == None:
626            resp = self.call_xmlrpc_service(url, req, cert_file,
627                    cert_pwd, trusted_certs, context, tracefile)
628            return resp
629        else:
630            # Build the fault
631            ee = unpack_soap(p_fault).get('FeddFaultBody', { })
632            if ee:
633                raise service_error(ee['code'], ee['desc'])
634            else:
635                raise service_error(service_error.internal,
636                        "Unexpected fault body")
Note: See TracBrowser for help on using the repository browser.