source: fedd/fedd.py @ 058f58e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 058f58e was 058f58e, checked in by Ted Faber <faber@…>, 15 years ago

Unify the code for calling SOAP and XMLRPC services into a couple classes.
Before there were slightly different semantics everywhere.

Also make the handlers classes rather than the output of stub compiling
functions.

  • Property mode set to 100755
File size: 12.8 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from fedd_services import ns0
19from service_error import *
20
21from threading import *
22from signal import signal, pause, SIGINT, SIGTERM
23from select import select, error
24from time import sleep
25import logging
26from ConfigParser import *
27
28# The SSL server here is based on the implementation described at
29# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
30
31# Turn off the matching of hostname to certificate ID
32SSL.Connection.clientPostConnectionCheck = None
33
34class fedd_config_parser(SafeConfigParser):
35    """
36    A SafeConfig parser with a more forgiving get attribute
37    """
38
39    def safe_get(self, sect, opt, method, default=None):
40        """
41        If the option is present, return it, otherwise the default.
42        """
43        if self.has_option(sect, opt): return method(self, sect, opt)
44        else: return default
45
46    def get(self, sect, opt, default=None):
47        """
48        This returns the requested option or a given default.
49
50        It's more like getattr than get.
51        """
52
53        return self.safe_get(sect, opt, SafeConfigParser.get, default)
54
55    def getint(self, sect, opt, default=0):
56        """
57        Returns the selected option as an int or a default.
58        """
59
60        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
61
62    def getfloat(self, sect, opt, default=0.0):
63        """
64        Returns the selected option as an int or a default.
65        """
66
67        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
68
69    def getboolean(self, sect, opt, default=False):
70        """
71        Returns the selected option as a boolean or a default.
72        """
73
74        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
75
76class fedd_server(ThreadingSSLServer):
77    """
78    Interface the fedd services to the XMLRPC and SOAP interfaces
79    """
80    def __init__(self, ME, handler, ssl_ctx, impl):
81        """
82        Create an SSL server that handles the transport in handler using the
83        credentials in ssl_ctx, and interfacing to the implementation of fedd
84        services in fedd.  ME is the host port pair on which to bind.
85        """
86        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
87        self.impl = impl
88        self.soap_methods = impl.soap_services
89        self.xmlrpc_methods = impl.xmlrpc_services
90        self.log = logging.getLogger("fedd")
91
92class fedd_soap_handler(BaseHTTPRequestHandler):
93    """
94    Standard connection between SOAP and the fedd services in impl.
95
96    Much of this is boilerplate from
97    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
98    """
99    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
100
101    def send_xml(self, text, code=200):
102        """Send an XML document as reply"""
103        self.send_response(code)
104        self.send_header('Content-type', 'text/xml; charset="utf-8"')
105        self.send_header('Content-Length', str(len(text)))
106        self.end_headers()
107        self.wfile.write(text)
108        self.wfile.flush()
109        self.request.socket.close()
110
111    def send_fault(self, f, code=500):
112        """Send a SOAP encoded fault as reply"""
113        self.send_xml(f.AsSOAP(processContents="lax"), code)
114
115    def check_headers(self, ps):
116        """Send a fault for any required envelope headers"""
117        for (uri, localname) in ps.WhatMustIUnderstand():
118            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
119            return False
120        return  True
121
122    def check_method(self, ps):
123        """Confirm that this class implements the namespace and SOAP method"""
124        root = ps.body_root
125        if root.namespaceURI not in self.server.impl.soap_namespaces:
126            self.send_fault(Fault(Fault.Client, 
127                'Unknown namespace "%s"' % root.namespaceURI))
128            return False
129
130        if getattr(root, 'localName', None) == None:
131            self.send_fault(Fault(Fault.Client, 'No method"'))
132            return False
133        return True
134
135    def do_POST(self):
136        """Treat an HTTP POST request as a SOAP service call"""
137        try:
138            cl = int(self.headers['content-length'])
139            data = self.rfile.read(cl)
140            ps = ParsedSoap(data)
141        except ParseException, e:
142            self.send_fault(Fault(Fault.Client, str(e)))
143            return
144        except Exception, e:
145            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
146            return
147        if not self.check_headers(ps): return
148        if not self.check_method(ps): return
149        try:
150            resp = self.soap_dispatch(ps.body_root.localName, ps,
151                    fedid(cert=self.request.get_peer_cert()))
152        except Fault, f:
153            self.send_fault(f)
154            resp = None
155       
156        if resp != None:
157            sw = SoapWriter()
158            sw.serialize(resp)
159            self.send_xml(str(sw))
160
161    def log_request(self, code=0, size=0):
162        """
163        Log request to the fedd logger
164        """
165        self.server.log.info("Successful SOAP request code %d" % code)
166
167    def soap_dispatch(self, method, req, fid):
168        """
169        The connection to the implementation, using the  method maps
170
171        The implementation provides a mapping from SOAP method name to the
172        method in the implementation that provides the service.
173        """
174        if self.server.soap_methods.has_key(method):
175            try:
176                return self.server.soap_methods[method](req, fid)
177            except service_error, e:
178                de = ns0.faultType_Def(
179                        (ns0.faultType_Def.schema,
180                            "FeddFaultBody")).pyclass()
181                de._code=e.code
182                de._errstr=e.code_string()
183                de._desc=e.desc
184                if  e.is_server_error():
185                    raise Fault(Fault.Server, e.code_string(), detail=de)
186                else:
187                    raise Fault(Fault.Client, e.code_string(), detail=de)
188        else:
189            raise Fault(Fault.Client, "Unknown method: %s" % method)
190
191
192class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
193    """
194    Standard connection between XMLRPC and the fedd services in impl.
195
196    Much of this is boilerplate from
197    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
198    """
199    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
200
201    def send_xml(self, text, code=200):
202        """Send an XML document as reply"""
203        self.send_response(code)
204        self.send_header('Content-type', 'text/xml; charset="utf-8"')
205        self.send_header('Content-Length', str(len(text)))
206        self.end_headers()
207        self.wfile.write(text)
208        self.wfile.flush()
209        # Make sure to close the socket when we're done
210        self.request.socket.close()
211
212    def do_POST(self):
213        """Treat an HTTP POST request as an XMLRPC service call"""
214        # NB: XMLRPC faults are not HTTP errors, so the code is always 200,
215        # unless an HTTP error occurs, which we don't handle.
216
217        resp = None
218        data = None
219        method = None
220        cl = int(self.headers['content-length'])
221        data = self.rfile.read(cl)
222
223        try:
224            params, method = xmlrpclib.loads(data)
225        except xmlrpclib.ResponseError:
226            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
227                "Malformed request"), methodresponse=True)
228
229        if method != None:
230            try:
231                resp = self.xmlrpc_dispatch(method, params,
232                            fedid(cert=self.request.get_peer_cert()))
233                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
234                        methodresponse=True)
235            except xmlrpclib.Fault, f:
236                data = xmlrpclib.dumps(f, methodresponse=True)
237                resp = None
238
239        self.send_xml(data)
240
241    def log_request(self, code=0, size=0):
242        """
243        Log request to the fedd logger
244        """
245        self.server.log.info("Successful XMLRPC request code %d" % code)
246
247
248    def xmlrpc_dispatch(self, method, req, fid):
249        """
250        The connection to the implementation, using the  method maps
251
252        The implementation provides a mapping from XMLRPC method name to the
253        method in the implementation that provides the service.
254        """
255        if self.server.xmlrpc_methods.has_key(method):
256            try:
257                return self.server.xmlrpc_methods[method](req, fid)
258            except service_error, e:
259                raise xmlrpclib.Fault(e.code_string(), e.desc)
260        else:
261            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
262
263class fedd_opts(OptionParser):
264    """Encapsulate option processing in this class, rather than in main"""
265    def __init__(self):
266        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
267                version="0.1")
268
269        self.set_defaults(host="localhost", port=23235, transport="soap",
270                logfile=None, debug=0)
271
272        self.add_option("-d", "--debug", action="count", dest="debug", 
273                help="Set debug.  Repeat for more information")
274        self.add_option("-f", "--configfile", action="store",
275                dest="configfile", help="Configuration file (required)")
276        self.add_option("-H", "--host", action="store", type="string",
277                dest="host", help="Hostname to listen on (default %default)")
278        self.add_option("-l", "--logfile", action="store", dest="logfile", 
279                help="File to send log messages to")
280        self.add_option("-p", "--port", action="store", type="int",
281                dest="port", help="Port to listen on (default %default)")
282        self.add_option("-s", "--service", action="append", type="string",
283                dest="services",
284                help="Service description: host:port:transport")
285        self.add_option("-x","--transport", action="store", type="choice",
286                choices=("xmlrpc", "soap"),
287                help="Transport for request (xmlrpc|soap) (Default: %default)")
288        self.add_option("--trace", action="store_const", dest="tracefile", 
289                const=sys.stderr, help="Print SOAP exchange to stderr")
290
291servers_active = True       # Sub-servers run while this is True
292services = [ ]              # Service descriptions
293servers = [ ]               # fedd_server instances instantiated from services
294servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
295
296def shutdown(sig, frame):
297    """
298    On a signal, stop running sub-servers. 
299   
300    This is connected to signals below
301    """
302    global servers_active, flog
303
304    servers_active = False
305    flog.info("Received signal %d, shutting down" % sig);
306
307def run_server(s):
308    """
309    Operate a subserver, shutting down when servers_active is false.
310
311    Each server (that is host/port/transport triple) has a thread running this
312    function, so each can handle requests independently.  They all call in to
313    the same implementation, which must manage its own synchronization.
314    """
315    global servers_active   # Not strictly needed: servers_active is only read
316    global servers          # List of active servers
317    global servers_lock     # Lock to manipulate servers
318
319    while servers_active:
320        try:
321            i, o, e = select((s,), (), (), 1.0)
322            if s in i: s.handle_request()
323        except error:
324            # The select call seems to get interrupted by signals as well as
325            # the main thread.  This essentially ignores signals in this
326            # thread.
327            pass
328
329    # Done.  Remove us from the list
330    servers_lock.acquire()
331    servers.remove(s)
332    servers_lock.release()
333
334opts, args = fedd_opts().parse_args()
335
336# Logging setup
337flog = logging.getLogger("fedd")
338ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
339        '%d %b %y %H:%M:%S')
340
341if opts.logfile: fh = logging.FileHandler(opts.logfile)
342else: fh = logging.StreamHandler(sys.stdout)
343
344# The handler will print anything, setting the logger level will affect what
345# gets recorded.
346fh.setLevel(logging.DEBUG)
347
348if opts.debug: flog.setLevel(logging.DEBUG)
349else: flog.setLevel(logging.INFO)
350
351fh.setFormatter(ffmt)
352flog.addHandler(fh)
353
354
355
356# Initialize the implementation
357if opts.configfile != None: 
358    try:
359        config= fedd_config_parser()
360        config.read(opts.configfile)
361    except e:
362        sys.exit("Cannot parse confgi file: %s" % e)
363else: 
364    sys.exit("--configfile is required")
365
366try:
367    impl = new_feddservice(config)
368except RuntimeError, e:
369    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
370            "No message"
371    sys.exit("Error configuring fedd: %s" % str)
372
373if impl.cert_file == None:
374    sys.exit("Must supply certificate file (probably in config)")
375
376# Create the SSL credentials
377ctx = None
378while ctx == None:
379    try:
380        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
381                password=impl.cert_pwd)
382    except SSL.SSLError, e:
383        if str(e) != "bad decrypt" or impl.cert_pwd != None:
384            raise
385
386# Walk through the service descriptions and pack them into the services list.
387# That list has the form (transport (host, port)).
388if opts.services:
389    for s in opts.services:
390        h, p, t  = s.split(':')
391
392        if not h: h = opts.host
393        if not p: p = opts.port
394        if not t: h = opts.transport
395
396        p = int(p)
397
398        services.append((t, (h, p)))
399else:
400    services.append((opts.transport, (opts.host, opts.port)))
401
402# Create the servers and put them into a list
403for s in services:
404    if s[0] == "soap":
405        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
406    elif s[0] == "xmlrpc":
407        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
408    else: flog.warning("Unknown transport: %s" % s[0])
409
410#  Make sure that there are no malformed servers in the list
411services = [ s for s in services if s ]
412
413# Catch signals
414signal(SIGINT, shutdown)
415signal(SIGTERM, shutdown)
416
417# Start the servers
418for s in servers:
419    Thread(target=run_server, args=(s,)).start()
420
421# Main thread waits for signals
422while servers_active:
423    sleep(1.0)
424
425#Once shutdown starts wait for all the servers to terminate.
426while True:
427    servers_lock.acquire()
428    if len(servers) == 0: 
429        servers_lock.release()
430        flog.info("All servers exited.  Terminating")
431        sys.exit(0)
432    servers_lock.release()
433    sleep(1)
434
Note: See TracBrowser for help on using the repository browser.