source: fedd/fedd.py @ a2da110

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since a2da110 was a2da110, checked in by Ted Faber <faber@…>, 16 years ago

Move service configuration into the config file and remove some extraneous options.

  • Property mode set to 100755
File size: 12.5 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from socket import error as socket_error
6from BaseHTTPServer import BaseHTTPRequestHandler
7
8from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
9    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
10
11from M2Crypto import SSL
12from M2Crypto.SSL.SSLServer import ThreadingSSLServer
13import xmlrpclib
14
15from optparse import OptionParser
16
17from fedd_util import fedd_ssl_context
18from fedid import fedid
19from fedd_deter_impl import new_feddservice
20from fedd_services import ns0
21from service_error import *
22
23from threading import *
24from signal import signal, pause, SIGINT, SIGTERM
25from select import select, error
26from time import sleep
27import logging
28from ConfigParser import *
29
30# The SSL server here is based on the implementation described at
31# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
32
33# Turn off the matching of hostname to certificate ID
34SSL.Connection.clientPostConnectionCheck = None
35
36class fedd_config_parser(SafeConfigParser):
37    """
38    A SafeConfig parser with a more forgiving get attribute
39    """
40
41    def safe_get(self, sect, opt, method, default=None):
42        """
43        If the option is present, return it, otherwise the default.
44        """
45        if self.has_option(sect, opt): return method(self, sect, opt)
46        else: return default
47
48    def get(self, sect, opt, default=None):
49        """
50        This returns the requested option or a given default.
51
52        It's more like getattr than get.
53        """
54
55        return self.safe_get(sect, opt, SafeConfigParser.get, default)
56
57    def getint(self, sect, opt, default=0):
58        """
59        Returns the selected option as an int or a default.
60        """
61
62        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
63
64    def getfloat(self, sect, opt, default=0.0):
65        """
66        Returns the selected option as an int or a default.
67        """
68
69        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
70
71    def getboolean(self, sect, opt, default=False):
72        """
73        Returns the selected option as a boolean or a default.
74        """
75
76        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
77
78class fedd_server(ThreadingSSLServer):
79    """
80    Interface the fedd services to the XMLRPC and SOAP interfaces
81    """
82    def __init__(self, ME, handler, ssl_ctx, impl):
83        """
84        Create an SSL server that handles the transport in handler using the
85        credentials in ssl_ctx, and interfacing to the implementation of fedd
86        services in fedd.  ME is the host port pair on which to bind.
87        """
88        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
89        self.impl = impl
90        self.soap_methods = impl.soap_services
91        self.xmlrpc_methods = impl.xmlrpc_services
92        self.log = logging.getLogger("fedd")
93
94class fedd_soap_handler(BaseHTTPRequestHandler):
95    """
96    Standard connection between SOAP and the fedd services in impl.
97
98    Much of this is boilerplate from
99    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
100    """
101    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
102
103    def send_xml(self, text, code=200):
104        """Send an XML document as reply"""
105        self.send_response(code)
106        self.send_header('Content-type', 'text/xml; charset="utf-8"')
107        self.send_header('Content-Length', str(len(text)))
108        self.end_headers()
109        self.wfile.write(text)
110        self.wfile.flush()
111        self.request.socket.close()
112
113    def send_fault(self, f, code=500):
114        """Send a SOAP encoded fault as reply"""
115        self.send_xml(f.AsSOAP(processContents="lax"), code)
116
117    def check_headers(self, ps):
118        """Send a fault for any required envelope headers"""
119        for (uri, localname) in ps.WhatMustIUnderstand():
120            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
121            return False
122        return  True
123
124    def check_method(self, ps):
125        """Confirm that this class implements the namespace and SOAP method"""
126        root = ps.body_root
127        if root.namespaceURI not in self.server.impl.soap_namespaces:
128            self.send_fault(Fault(Fault.Client, 
129                'Unknown namespace "%s"' % root.namespaceURI))
130            return False
131
132        if getattr(root, 'localName', None) == None:
133            self.send_fault(Fault(Fault.Client, 'No method"'))
134            return False
135        return True
136
137    def do_POST(self):
138        """Treat an HTTP POST request as a SOAP service call"""
139        try:
140            cl = int(self.headers['content-length'])
141            data = self.rfile.read(cl)
142            ps = ParsedSoap(data)
143        except ParseException, e:
144            self.send_fault(Fault(Fault.Client, str(e)))
145            return
146        except Exception, e:
147            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
148            return
149        if not self.check_headers(ps): return
150        if not self.check_method(ps): return
151        try:
152            resp = self.soap_dispatch(ps.body_root.localName, ps,
153                    fedid(cert=self.request.get_peer_cert()))
154        except Fault, f:
155            self.send_fault(f)
156            resp = None
157       
158        if resp != None:
159            sw = SoapWriter()
160            sw.serialize(resp)
161            self.send_xml(str(sw))
162
163    def log_request(self, code=0, size=0):
164        """
165        Log request to the fedd logger
166        """
167        self.server.log.info("Successful SOAP request code %d" % code)
168
169    def soap_dispatch(self, method, req, fid):
170        """
171        The connection to the implementation, using the  method maps
172
173        The implementation provides a mapping from SOAP method name to the
174        method in the implementation that provides the service.
175        """
176        if self.server.soap_methods.has_key(method):
177            try:
178                return self.server.soap_methods[method](req, fid)
179            except service_error, e:
180                de = ns0.faultType_Def(
181                        (ns0.faultType_Def.schema,
182                            "FeddFaultBody")).pyclass()
183                de._code=e.code
184                de._errstr=e.code_string()
185                de._desc=e.desc
186                if  e.is_server_error():
187                    raise Fault(Fault.Server, e.code_string(), detail=de)
188                else:
189                    raise Fault(Fault.Client, e.code_string(), detail=de)
190        else:
191            raise Fault(Fault.Client, "Unknown method: %s" % method)
192
193
194class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
195    """
196    Standard connection between XMLRPC and the fedd services in impl.
197
198    Much of this is boilerplate from
199    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
200    """
201    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
202
203    def send_xml(self, text, code=200):
204        """Send an XML document as reply"""
205        self.send_response(code)
206        self.send_header('Content-type', 'text/xml; charset="utf-8"')
207        self.send_header('Content-Length', str(len(text)))
208        self.end_headers()
209        self.wfile.write(text)
210        self.wfile.flush()
211        # Make sure to close the socket when we're done
212        self.request.socket.close()
213
214    def do_POST(self):
215        """Treat an HTTP POST request as an XMLRPC service call"""
216        # NB: XMLRPC faults are not HTTP errors, so the code is always 200,
217        # unless an HTTP error occurs, which we don't handle.
218
219        resp = None
220        data = None
221        method = None
222        cl = int(self.headers['content-length'])
223        data = self.rfile.read(cl)
224
225        try:
226            params, method = xmlrpclib.loads(data)
227        except xmlrpclib.ResponseError:
228            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
229                "Malformed request"), methodresponse=True)
230
231        if method != None:
232            try:
233                resp = self.xmlrpc_dispatch(method, params,
234                            fedid(cert=self.request.get_peer_cert()))
235                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
236                        methodresponse=True)
237            except xmlrpclib.Fault, f:
238                data = xmlrpclib.dumps(f, methodresponse=True)
239                resp = None
240
241        self.send_xml(data)
242
243    def log_request(self, code=0, size=0):
244        """
245        Log request to the fedd logger
246        """
247        self.server.log.info("Successful XMLRPC request code %d" % code)
248
249
250    def xmlrpc_dispatch(self, method, req, fid):
251        """
252        The connection to the implementation, using the  method maps
253
254        The implementation provides a mapping from XMLRPC method name to the
255        method in the implementation that provides the service.
256        """
257        if self.server.xmlrpc_methods.has_key(method):
258            try:
259                return self.server.xmlrpc_methods[method](req, fid)
260            except service_error, e:
261                raise xmlrpclib.Fault(e.code_string(), e.desc)
262        else:
263            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
264
265class fedd_opts(OptionParser):
266    """Encapsulate option processing in this class, rather than in main"""
267    def __init__(self):
268        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
269                version="0.1")
270
271        self.set_defaults(logfile=None, debug=0)
272
273        self.add_option("-d", "--debug", action="count", dest="debug", 
274                help="Set debug.  Repeat for more information")
275        self.add_option("-f", "--configfile", action="store",
276                default="/usr/local/etc/fedd.conf",
277                dest="configfile", help="Configuration file (required)")
278        self.add_option("-l", "--logfile", action="store", dest="logfile", 
279                help="File to send log messages to")
280        self.add_option("--trace", action="store_const", dest="tracefile", 
281                const=sys.stderr, help="Print SOAP exchange to stderr")
282
283servers_active = True       # Sub-servers run while this is True
284servers = [ ]               # fedd_server instances instantiated from services
285servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
286
287def shutdown(sig, frame):
288    """
289    On a signal, stop running sub-servers. 
290   
291    This is connected to signals below
292    """
293    global servers_active, flog
294
295    servers_active = False
296    flog.info("Received signal %d, shutting down" % sig);
297
298def run_server(s):
299    """
300    Operate a subserver, shutting down when servers_active is false.
301
302    Each server (that is host/port/transport triple) has a thread running this
303    function, so each can handle requests independently.  They all call in to
304    the same implementation, which must manage its own synchronization.
305    """
306    global servers_active   # Not strictly needed: servers_active is only read
307    global servers          # List of active servers
308    global servers_lock     # Lock to manipulate servers
309
310    while servers_active:
311        try:
312            i, o, e = select((s,), (), (), 1.0)
313            if s in i: s.handle_request()
314        except error:
315            # The select call seems to get interrupted by signals as well as
316            # the main thread.  This essentially ignores signals in this
317            # thread.
318            pass
319
320    # Done.  Remove us from the list
321    servers_lock.acquire()
322    servers.remove(s)
323    servers_lock.release()
324
325opts, args = fedd_opts().parse_args()
326
327# Logging setup
328flog = logging.getLogger("fedd")
329ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
330        '%d %b %y %H:%M:%S')
331
332if opts.logfile: fh = logging.FileHandler(opts.logfile)
333else: fh = logging.StreamHandler(sys.stdout)
334
335# The handler will print anything, setting the logger level will affect what
336# gets recorded.
337fh.setLevel(logging.DEBUG)
338
339if opts.debug: flog.setLevel(logging.DEBUG)
340else: flog.setLevel(logging.INFO)
341
342fh.setFormatter(ffmt)
343flog.addHandler(fh)
344
345
346
347# Initialize the implementation
348if opts.configfile != None: 
349    try:
350        config= fedd_config_parser()
351        config.read(opts.configfile)
352    except e:
353        sys.exit("Cannot parse confgi file: %s" % e)
354else: 
355    sys.exit("--configfile is required")
356
357try:
358    impl = new_feddservice(config)
359except RuntimeError, e:
360    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
361            "No message"
362    sys.exit("Error configuring fedd: %s" % str)
363
364if impl.cert_file == None:
365    sys.exit("Must supply certificate file (probably in config)")
366
367# Create the SSL credentials
368ctx = None
369while ctx == None:
370    try:
371        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
372                password=impl.cert_pwd)
373    except SSL.SSLError, e:
374        if str(e) != "bad decrypt" or impl.cert_pwd != None:
375            raise
376
377services = config.get("globals", "services", "23235")
378
379for s in services.split(","):
380    s = s.strip()
381    colons = s.count(":")
382    try:
383        if colons == 0:
384            p = int(s)
385            h = ''
386            t = 'soap'
387        elif colons == 1:
388            p, t  = s.split(":")
389            p = int(p)
390            h = ''
391        elif colons == 2:
392            h, p, t  = s.split(":")
393            p = int(p)
394        else:
395            flog.error("Invalid service specification %s ignored." % s)
396            continue
397    except ValueError:
398        flog.error("Error converting port to integer in %s: spec ignored" % s)
399        continue
400
401    t = t.lower()
402    try:
403        if t == 'soap':
404            servers.append(fedd_server((h, p), fedd_soap_handler, ctx, impl))
405        elif t == 'xmlrpc':
406            servers.append(fedd_server((h, p), fedd_xmlrpc_handler, ctx, impl))
407        else:
408            flog.error("Invalid transport specification (%s) in service %s" % \
409                    (t, s))
410            continue
411    except socket_error, e:
412        flog.error("Cannot create server for %s: %s" % (s, e[1]))
413        continue
414
415#  Make sure that there are no malformed servers in the list
416servers = [ s for s in servers if s ]
417
418# Catch signals
419signal(SIGINT, shutdown)
420signal(SIGTERM, shutdown)
421
422# Start the servers
423for s in servers:
424    Thread(target=run_server, args=(s,)).start()
425
426# Main thread waits for signals
427while servers_active:
428    sleep(1.0)
429
430#Once shutdown starts wait for all the servers to terminate.
431while True:
432    servers_lock.acquire()
433    if len(servers) == 0: 
434        servers_lock.release()
435        flog.info("All servers exited.  Terminating")
436        sys.exit(0)
437    servers_lock.release()
438    sleep(1)
439
Note: See TracBrowser for help on using the repository browser.