source: fedd/fedd.py @ 1b57352

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 1b57352 was 1b57352, checked in by Ted Faber <faber@…>, 15 years ago

Deal with common connection and SSL errors cleanly

  • Property mode set to 100755
File size: 12.9 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from socket import error as socket_error
6from BaseHTTPServer import BaseHTTPRequestHandler
7
8from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
9    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
10
11from M2Crypto import SSL
12from M2Crypto.SSL.SSLServer import ThreadingSSLServer
13import xmlrpclib
14
15from optparse import OptionParser
16
17from fedd_util import fedd_ssl_context
18from fedid import fedid
19from fedd_deter_impl import new_feddservice
20from fedd_services import ns0
21from service_error import *
22
23from threading import *
24from signal import signal, pause, SIGINT, SIGTERM
25from select import select, error
26from time import sleep
27import logging
28from ConfigParser import *
29
30# The SSL server here is based on the implementation described at
31# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
32
33# Turn off the matching of hostname to certificate ID
34SSL.Connection.clientPostConnectionCheck = None
35
36class fedd_config_parser(SafeConfigParser):
37    """
38    A SafeConfig parser with a more forgiving get attribute
39    """
40
41    def safe_get(self, sect, opt, method, default=None):
42        """
43        If the option is present, return it, otherwise the default.
44        """
45        if self.has_option(sect, opt): return method(self, sect, opt)
46        else: return default
47
48    def get(self, sect, opt, default=None):
49        """
50        This returns the requested option or a given default.
51
52        It's more like getattr than get.
53        """
54
55        return self.safe_get(sect, opt, SafeConfigParser.get, default)
56
57    def getint(self, sect, opt, default=0):
58        """
59        Returns the selected option as an int or a default.
60        """
61
62        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
63
64    def getfloat(self, sect, opt, default=0.0):
65        """
66        Returns the selected option as an int or a default.
67        """
68
69        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
70
71    def getboolean(self, sect, opt, default=False):
72        """
73        Returns the selected option as a boolean or a default.
74        """
75
76        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
77
78class fedd_server(ThreadingSSLServer):
79    """
80    Interface the fedd services to the XMLRPC and SOAP interfaces
81    """
82    def __init__(self, ME, handler, ssl_ctx, impl):
83        """
84        Create an SSL server that handles the transport in handler using the
85        credentials in ssl_ctx, and interfacing to the implementation of fedd
86        services in fedd.  ME is the host port pair on which to bind.
87        """
88        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
89        self.impl = impl
90        self.soap_methods = impl.soap_services
91        self.xmlrpc_methods = impl.xmlrpc_services
92        self.log = logging.getLogger("fedd")
93
94    def handle_error(self, request, address):
95        """
96        The default SSLServer prints a stack trace here.  This is a little
97        friendlier.
98        """
99        if request or address:
100            self.log.warn("[fedd] Error on incoming connection: %s %s" % \
101                    (request, address))
102        else:
103            self.log.warn("[fedd] Error on incoming connection " + \
104                    "(Likely SSL error)")
105
106class fedd_soap_handler(BaseHTTPRequestHandler):
107    """
108    Standard connection between SOAP and the fedd services in impl.
109
110    Much of this is boilerplate from
111    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
112    """
113    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
114
115    def send_xml(self, text, code=200):
116        """Send an XML document as reply"""
117        self.send_response(code)
118        self.send_header('Content-type', 'text/xml; charset="utf-8"')
119        self.send_header('Content-Length', str(len(text)))
120        self.end_headers()
121        self.wfile.write(text)
122        self.wfile.flush()
123        self.request.socket.close()
124
125    def send_fault(self, f, code=500):
126        """Send a SOAP encoded fault as reply"""
127        self.send_xml(f.AsSOAP(processContents="lax"), code)
128
129    def check_headers(self, ps):
130        """Send a fault for any required envelope headers"""
131        for (uri, localname) in ps.WhatMustIUnderstand():
132            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
133            return False
134        return  True
135
136    def check_method(self, ps):
137        """Confirm that this class implements the namespace and SOAP method"""
138        root = ps.body_root
139        if root.namespaceURI not in self.server.impl.soap_namespaces:
140            self.send_fault(Fault(Fault.Client, 
141                'Unknown namespace "%s"' % root.namespaceURI))
142            return False
143
144        if getattr(root, 'localName', None) == None:
145            self.send_fault(Fault(Fault.Client, 'No method"'))
146            return False
147        return True
148
149    def do_POST(self):
150        """Treat an HTTP POST request as a SOAP service call"""
151        try:
152            cl = int(self.headers['content-length'])
153            data = self.rfile.read(cl)
154            ps = ParsedSoap(data)
155        except ParseException, e:
156            self.send_fault(Fault(Fault.Client, str(e)))
157            return
158        except Exception, e:
159            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
160            return
161        if not self.check_headers(ps): return
162        if not self.check_method(ps): return
163        try:
164            resp = self.soap_dispatch(ps.body_root.localName, ps,
165                    fedid(cert=self.request.get_peer_cert()))
166        except Fault, f:
167            self.send_fault(f)
168            resp = None
169       
170        if resp != None:
171            sw = SoapWriter()
172            sw.serialize(resp)
173            self.send_xml(str(sw))
174
175    def log_request(self, code=0, size=0):
176        """
177        Log request to the fedd logger
178        """
179        self.server.log.info("Successful SOAP request code %d" % code)
180
181    def soap_dispatch(self, method, req, fid):
182        """
183        The connection to the implementation, using the  method maps
184
185        The implementation provides a mapping from SOAP method name to the
186        method in the implementation that provides the service.
187        """
188        if self.server.soap_methods.has_key(method):
189            try:
190                return self.server.soap_methods[method](req, fid)
191            except service_error, e:
192                de = ns0.faultType_Def(
193                        (ns0.faultType_Def.schema,
194                            "FeddFaultBody")).pyclass()
195                de._code=e.code
196                de._errstr=e.code_string()
197                de._desc=e.desc
198                if  e.is_server_error():
199                    raise Fault(Fault.Server, e.code_string(), detail=de)
200                else:
201                    raise Fault(Fault.Client, e.code_string(), detail=de)
202        else:
203            raise Fault(Fault.Client, "Unknown method: %s" % method)
204
205
206class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
207    """
208    Standard connection between XMLRPC and the fedd services in impl.
209
210    Much of this is boilerplate from
211    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
212    """
213    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
214
215    def send_xml(self, text, code=200):
216        """Send an XML document as reply"""
217        self.send_response(code)
218        self.send_header('Content-type', 'text/xml; charset="utf-8"')
219        self.send_header('Content-Length', str(len(text)))
220        self.end_headers()
221        self.wfile.write(text)
222        self.wfile.flush()
223        # Make sure to close the socket when we're done
224        self.request.socket.close()
225
226    def do_POST(self):
227        """Treat an HTTP POST request as an XMLRPC service call"""
228        # NB: XMLRPC faults are not HTTP errors, so the code is always 200,
229        # unless an HTTP error occurs, which we don't handle.
230
231        resp = None
232        data = None
233        method = None
234        cl = int(self.headers['content-length'])
235        data = self.rfile.read(cl)
236
237        try:
238            params, method = xmlrpclib.loads(data)
239        except xmlrpclib.ResponseError:
240            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
241                "Malformed request"), methodresponse=True)
242
243        if method != None:
244            try:
245                resp = self.xmlrpc_dispatch(method, params,
246                            fedid(cert=self.request.get_peer_cert()))
247                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
248                        methodresponse=True)
249            except xmlrpclib.Fault, f:
250                data = xmlrpclib.dumps(f, methodresponse=True)
251                resp = None
252
253        self.send_xml(data)
254
255    def log_request(self, code=0, size=0):
256        """
257        Log request to the fedd logger
258        """
259        self.server.log.info("Successful XMLRPC request code %d" % code)
260
261
262    def xmlrpc_dispatch(self, method, req, fid):
263        """
264        The connection to the implementation, using the  method maps
265
266        The implementation provides a mapping from XMLRPC method name to the
267        method in the implementation that provides the service.
268        """
269        if self.server.xmlrpc_methods.has_key(method):
270            try:
271                return self.server.xmlrpc_methods[method](req, fid)
272            except service_error, e:
273                raise xmlrpclib.Fault(e.code_string(), e.desc)
274        else:
275            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
276
277class fedd_opts(OptionParser):
278    """Encapsulate option processing in this class, rather than in main"""
279    def __init__(self):
280        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
281                version="0.1")
282
283        self.set_defaults(logfile=None, debug=0)
284
285        self.add_option("-d", "--debug", action="count", dest="debug", 
286                help="Set debug.  Repeat for more information")
287        self.add_option("-f", "--configfile", action="store",
288                default="/usr/local/etc/fedd.conf",
289                dest="configfile", help="Configuration file (required)")
290        self.add_option("-l", "--logfile", action="store", dest="logfile", 
291                help="File to send log messages to")
292        self.add_option("--trace", action="store_const", dest="tracefile", 
293                const=sys.stderr, help="Print SOAP exchange to stderr")
294
295servers_active = True       # Sub-servers run while this is True
296servers = [ ]               # fedd_server instances instantiated from services
297servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
298
299def shutdown(sig, frame):
300    """
301    On a signal, stop running sub-servers. 
302   
303    This is connected to signals below
304    """
305    global servers_active, flog
306
307    servers_active = False
308    flog.info("Received signal %d, shutting down" % sig);
309
310def run_server(s):
311    """
312    Operate a subserver, shutting down when servers_active is false.
313
314    Each server (that is host/port/transport triple) has a thread running this
315    function, so each can handle requests independently.  They all call in to
316    the same implementation, which must manage its own synchronization.
317    """
318    global servers_active   # Not strictly needed: servers_active is only read
319    global servers          # List of active servers
320    global servers_lock     # Lock to manipulate servers
321
322    while servers_active:
323        try:
324            i, o, e = select((s,), (), (), 1.0)
325            if s in i: s.handle_request()
326        except error:
327            # The select call seems to get interrupted by signals as well as
328            # the main thread.  This essentially ignores signals in this
329            # thread.
330            pass
331
332    # Done.  Remove us from the list
333    servers_lock.acquire()
334    servers.remove(s)
335    servers_lock.release()
336
337opts, args = fedd_opts().parse_args()
338
339# Logging setup
340flog = logging.getLogger("fedd")
341ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
342        '%d %b %y %H:%M:%S')
343
344if opts.logfile: fh = logging.FileHandler(opts.logfile)
345else: fh = logging.StreamHandler(sys.stdout)
346
347# The handler will print anything, setting the logger level will affect what
348# gets recorded.
349fh.setLevel(logging.DEBUG)
350
351if opts.debug: flog.setLevel(logging.DEBUG)
352else: flog.setLevel(logging.INFO)
353
354fh.setFormatter(ffmt)
355flog.addHandler(fh)
356
357
358
359# Initialize the implementation
360if opts.configfile != None: 
361    try:
362        config= fedd_config_parser()
363        config.read(opts.configfile)
364    except e:
365        sys.exit("Cannot parse confgi file: %s" % e)
366else: 
367    sys.exit("--configfile is required")
368
369try:
370    impl = new_feddservice(config)
371except RuntimeError, e:
372    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
373            "No message"
374    sys.exit("Error configuring fedd: %s" % str)
375
376if impl.cert_file == None:
377    sys.exit("Must supply certificate file (probably in config)")
378
379# Create the SSL credentials
380ctx = None
381while ctx == None:
382    try:
383        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
384                password=impl.cert_pwd)
385    except SSL.SSLError, e:
386        if str(e) != "bad decrypt" or impl.cert_pwd != None:
387            raise
388
389services = config.get("globals", "services", "23235")
390
391for s in services.split(","):
392    s = s.strip()
393    colons = s.count(":")
394    try:
395        if colons == 0:
396            p = int(s)
397            h = ''
398            t = 'soap'
399        elif colons == 1:
400            p, t  = s.split(":")
401            p = int(p)
402            h = ''
403        elif colons == 2:
404            h, p, t  = s.split(":")
405            p = int(p)
406        else:
407            flog.error("Invalid service specification %s ignored." % s)
408            continue
409    except ValueError:
410        flog.error("Error converting port to integer in %s: spec ignored" % s)
411        continue
412
413    t = t.lower()
414    try:
415        if t == 'soap':
416            servers.append(fedd_server((h, p), fedd_soap_handler, ctx, impl))
417        elif t == 'xmlrpc':
418            servers.append(fedd_server((h, p), fedd_xmlrpc_handler, ctx, impl))
419        else:
420            flog.error("Invalid transport specification (%s) in service %s" % \
421                    (t, s))
422            continue
423    except socket_error, e:
424        flog.error("Cannot create server for %s: %s" % (s, e[1]))
425        continue
426
427#  Make sure that there are no malformed servers in the list
428servers = [ s for s in servers if s ]
429
430# Catch signals
431signal(SIGINT, shutdown)
432signal(SIGTERM, shutdown)
433
434# Start the servers
435for s in servers:
436    Thread(target=run_server, args=(s,)).start()
437
438# Main thread waits for signals
439while servers_active:
440    sleep(1.0)
441
442#Once shutdown starts wait for all the servers to terminate.
443while True:
444    servers_lock.acquire()
445    if len(servers) == 0: 
446        servers_lock.release()
447        flog.info("All servers exited.  Terminating")
448        sys.exit(0)
449    servers_lock.release()
450    sleep(1)
451
Note: See TracBrowser for help on using the repository browser.