source: fedd/fedd.py @ dab4d56

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since dab4d56 was dab4d56, checked in by Ted Faber <faber@…>, 15 years ago

Some correct response codes and importantly closing the sockets after each
connection, which was confusing the new python (2.5)

  • Property mode set to 100755
File size: 12.8 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from fedd_services import ns0
19from service_error import *
20
21from threading import *
22from signal import signal, pause, SIGINT, SIGTERM
23from select import select, error
24from time import sleep
25import logging
26from ConfigParser import *
27
28# The SSL server here is based on the implementation described at
29# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
30
31# Turn off the matching of hostname to certificate ID
32SSL.Connection.clientPostConnectionCheck = None
33
34class fedd_config_parser(SafeConfigParser):
35    """
36    A SafeConfig parser with a more forgiving get attribute
37    """
38
39    def safe_get(self, sect, opt, method, default=None):
40        """
41        If the option is present, return it, otherwise the default.
42        """
43        if self.has_option(sect, opt): return method(self, sect, opt)
44        else: return default
45
46    def get(self, sect, opt, default=None):
47        """
48        This returns the requested option or a given default.
49
50        It's more like getattr than get.
51        """
52
53        return self.safe_get(sect, opt, SafeConfigParser.get, default)
54
55    def getint(self, sect, opt, default=0):
56        """
57        Returns the selected option as an int or a default.
58        """
59
60        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
61
62    def getfloat(self, sect, opt, default=0.0):
63        """
64        Returns the selected option as an int or a default.
65        """
66
67        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
68
69    def getboolean(self, sect, opt, default=False):
70        """
71        Returns the selected option as a boolean or a default.
72        """
73
74        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
75
76class fedd_server(ThreadingSSLServer):
77    """
78    Interface the fedd services to the XMLRPC and SOAP interfaces
79    """
80    def __init__(self, ME, handler, ssl_ctx, impl):
81        """
82        Create an SSL server that handles the transport in handler using the
83        credentials in ssl_ctx, and interfacing to the implementation of fedd
84        services in fedd.  ME is the host port pair on which to bind.
85        """
86        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
87        self.impl = impl
88        self.soap_methods = impl.soap_services
89        self.xmlrpc_methods = impl.xmlrpc_services
90        self.log = logging.getLogger("fedd")
91
92class fedd_soap_handler(BaseHTTPRequestHandler):
93    """
94    Standard connection between SOAP and the fedd services in impl.
95
96    Much of this is boilerplate from
97    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
98    """
99    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
100
101    def send_xml(self, text, code=200):
102        """Send an XML document as reply"""
103        self.send_response(code)
104        self.send_header('Content-type', 'text/xml; charset="utf-8"')
105        self.send_header('Content-Length', str(len(text)))
106        self.end_headers()
107        self.wfile.write(text)
108        self.wfile.flush()
109        self.request.socket.close()
110
111    def send_fault(self, f, code=500):
112        """Send a SOAP encoded fault as reply"""
113        self.send_xml(f.AsSOAP(processContents="lax"), code)
114
115    def check_headers(self, ps):
116        """Send a fault for any required envelope headers"""
117        for (uri, localname) in ps.WhatMustIUnderstand():
118            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
119            return False
120        return  True
121
122    def check_method(self, ps):
123        """Confirm that this class implements the namespace and SOAP method"""
124        root = ps.body_root
125        if root.namespaceURI not in self.server.impl.soap_namespaces:
126            self.send_fault(Fault(Fault.Client, 
127                'Unknown namespace "%s"' % root.namespaceURI))
128            return False
129
130        if getattr(root, 'localName', None) == None:
131            self.send_fault(Fault(Fault.Client, 'No method"'))
132            return False
133        return True
134
135    def do_POST(self):
136        """Treat an HTTP POST request as a SOAP service call"""
137        try:
138            cl = int(self.headers['content-length'])
139            data = self.rfile.read(cl)
140            ps = ParsedSoap(data)
141        except ParseException, e:
142            self.send_fault(Fault(Fault.Client, str(e)))
143            return
144        except Exception, e:
145            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
146            return
147        if not self.check_headers(ps): return
148        if not self.check_method(ps): return
149        try:
150            resp = self.soap_dispatch(ps.body_root.localName, ps,
151                    fedid(cert=self.request.get_peer_cert()))
152        except Fault, f:
153            self.send_fault(f)
154            resp = None
155       
156        if resp != None:
157            sw = SoapWriter()
158            sw.serialize(resp)
159            self.send_xml(str(sw))
160
161    def log_request(self, code=0, size=0):
162        """
163        Log request to the fedd logger
164        """
165        self.server.log.info("Successful SOAP request code %d" % code)
166
167    def soap_dispatch(self, method, req, fid):
168        """
169        The connection to the implementation, using the  method maps
170
171        The implementation provides a mapping from SOAP method name to the
172        method in the implementation that provides the service.
173        """
174        if self.server.soap_methods.has_key(method):
175            try:
176                return self.server.soap_methods[method](req, fid)
177            except service_error, e:
178                de = ns0.faultType_Def(
179                        (ns0.faultType_Def.schema,
180                            "FeddFaultBody")).pyclass()
181                de._code=e.code
182                de._errstr=e.code_string()
183                de._desc=e.desc
184                if  e.is_server_error():
185                    raise Fault(Fault.Server, e.code_string(), detail=de)
186                else:
187                    raise Fault(Fault.Client, e.code_string(), detail=de)
188        else:
189            raise Fault(Fault.Client, "Unknown method: %s" % method)
190
191
192class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
193    """
194    Standard connection between XMLRPC and the fedd services in impl.
195
196    Much of this is boilerplate from
197    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
198    """
199    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
200
201    def send_xml(self, text, code=200):
202        """Send an XML document as reply"""
203        self.send_response(code)
204        self.send_header('Content-type', 'text/xml; charset="utf-8"')
205        self.send_header('Content-Length', str(len(text)))
206        self.end_headers()
207        self.wfile.write(text)
208        self.wfile.flush()
209        # Make sure to close the socket when we're done
210        self.request.socket.close()
211
212    def do_POST(self):
213        """Treat an HTTP POST request as an XMLRPC service call"""
214
215        resp = None
216        data = None
217        method = None
218        code = 200
219        cl = int(self.headers['content-length'])
220        data = self.rfile.read(cl)
221
222        try:
223            params, method = xmlrpclib.loads(data)
224        except xmlrpclib.ResponseError:
225            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
226                "Malformed request"), methodresponse=True)
227            code = 500
228
229        if method != None:
230            try:
231                resp = self.xmlrpc_dispatch(method, params,
232                            fedid(cert=self.request.get_peer_cert()))
233                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
234                        methodresponse=True)
235            except xmlrpclib.Fault, f:
236                data = xmlrpclib.dumps(f, methodresponse=True)
237                resp = None
238                code = 500
239
240        self.send_xml(data, code)
241
242    def log_request(self, code=0, size=0):
243        """
244        Log request to the fedd logger
245        """
246        self.server.log.info("Successful XMLRPC request code %d" % code)
247
248
249    def xmlrpc_dispatch(self, method, req, fid):
250        """
251        The connection to the implementation, using the  method maps
252
253        The implementation provides a mapping from XMLRPC method name to the
254        method in the implementation that provides the service.
255        """
256        if self.server.xmlrpc_methods.has_key(method):
257            try:
258                return self.server.xmlrpc_methods[method](req, fid)
259            except service_error, e:
260                raise xmlrpclib.Fault(e.code_string(), e.desc)
261        else:
262            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
263
264class fedd_opts(OptionParser):
265    """Encapsulate option processing in this class, rather than in main"""
266    def __init__(self):
267        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
268                version="0.1")
269
270        self.set_defaults(host="localhost", port=23235, transport="soap",
271                logfile=None, debug=0)
272
273        self.add_option("-d", "--debug", action="count", dest="debug", 
274                help="Set debug.  Repeat for more information")
275        self.add_option("-f", "--configfile", action="store",
276                dest="configfile", help="Configuration file (required)")
277        self.add_option("-H", "--host", action="store", type="string",
278                dest="host", help="Hostname to listen on (default %default)")
279        self.add_option("-l", "--logfile", action="store", dest="logfile", 
280                help="File to send log messages to")
281        self.add_option("-p", "--port", action="store", type="int",
282                dest="port", help="Port to listen on (default %default)")
283        self.add_option("-s", "--service", action="append", type="string",
284                dest="services",
285                help="Service description: host:port:transport")
286        self.add_option("-x","--transport", action="store", type="choice",
287                choices=("xmlrpc", "soap"),
288                help="Transport for request (xmlrpc|soap) (Default: %default)")
289        self.add_option("--trace", action="store_const", dest="tracefile", 
290                const=sys.stderr, help="Print SOAP exchange to stderr")
291
292servers_active = True       # Sub-servers run while this is True
293services = [ ]              # Service descriptions
294servers = [ ]               # fedd_server instances instantiated from services
295servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
296
297def shutdown(sig, frame):
298    """
299    On a signal, stop running sub-servers. 
300   
301    This is connected to signals below
302    """
303    global servers_active, flog
304
305    servers_active = False
306    flog.info("Received signal %d, shutting down" % sig);
307
308def run_server(s):
309    """
310    Operate a subserver, shutting down when servers_active is false.
311
312    Each server (that is host/port/transport triple) has a thread running this
313    function, so each can handle requests independently.  They all call in to
314    the same implementation, which must manage its own synchronization.
315    """
316    global servers_active   # Not strictly needed: servers_active is only read
317    global servers          # List of active servers
318    global servers_lock     # Lock to manipulate servers
319
320    while servers_active:
321        try:
322            i, o, e = select((s,), (), (), 1.0)
323            if s in i: s.handle_request()
324        except error:
325            # The select call seems to get interrupted by signals as well as
326            # the main thread.  This essentially ignores signals in this
327            # thread.
328            pass
329
330    # Done.  Remove us from the list
331    servers_lock.acquire()
332    servers.remove(s)
333    servers_lock.release()
334
335opts, args = fedd_opts().parse_args()
336
337# Logging setup
338flog = logging.getLogger("fedd")
339ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
340        '%d %b %y %H:%M:%S')
341
342if opts.logfile: fh = logging.FileHandler(opts.logfile)
343else: fh = logging.StreamHandler(sys.stdout)
344
345# The handler will print anything, setting the logger level will affect what
346# gets recorded.
347fh.setLevel(logging.DEBUG)
348
349if opts.debug: flog.setLevel(logging.DEBUG)
350else: flog.setLevel(logging.INFO)
351
352fh.setFormatter(ffmt)
353flog.addHandler(fh)
354
355
356
357# Initialize the implementation
358if opts.configfile != None: 
359    try:
360        config= fedd_config_parser()
361        config.read(opts.configfile)
362    except e:
363        sys.exit("Cannot parse confgi file: %s" % e)
364else: 
365    sys.exit("--configfile is required")
366
367try:
368    impl = new_feddservice(config)
369except RuntimeError, e:
370    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
371            "No message"
372    sys.exit("Error configuring fedd: %s" % str)
373
374if impl.cert_file == None:
375    sys.exit("Must supply certificate file (probably in config)")
376
377# Create the SSL credentials
378ctx = None
379while ctx == None:
380    try:
381        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
382                password=impl.cert_pwd)
383    except SSL.SSLError, e:
384        if str(e) != "bad decrypt" or impl.cert_pwd != None:
385            raise
386
387# Walk through the service descriptions and pack them into the services list.
388# That list has the form (transport (host, port)).
389if opts.services:
390    for s in opts.services:
391        h, p, t  = s.split(':')
392
393        if not h: h = opts.host
394        if not p: p = opts.port
395        if not t: h = opts.transport
396
397        p = int(p)
398
399        services.append((t, (h, p)))
400else:
401    services.append((opts.transport, (opts.host, opts.port)))
402
403# Create the servers and put them into a list
404for s in services:
405    if s[0] == "soap":
406        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
407    elif s[0] == "xmlrpc":
408        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
409    else: flog.warning("Unknown transport: %s" % s[0])
410
411#  Make sure that there are no malformed servers in the list
412services = [ s for s in services if s ]
413
414# Catch signals
415signal(SIGINT, shutdown)
416signal(SIGTERM, shutdown)
417
418# Start the servers
419for s in servers:
420    Thread(target=run_server, args=(s,)).start()
421
422# Main thread waits for signals
423while servers_active:
424    sleep(1.0)
425
426#Once shutdown starts wait for all the servers to terminate.
427while True:
428    servers_lock.acquire()
429    if len(servers) == 0: 
430        servers_lock.release()
431        flog.info("All servers exited.  Terminating")
432        sys.exit(0)
433    servers_lock.release()
434    sleep(1)
435
Note: See TracBrowser for help on using the repository browser.