source: fedd/fedd.py @ a2da110

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since a2da110 was a2da110, checked in by Ted Faber <faber@…>, 15 years ago

Move service configuration into the config file and remove some extraneous options.

  • Property mode set to 100755
File size: 12.5 KB
RevLine 
[6ff0b91]1#!/usr/local/bin/python
2
3import sys
4
[a2da110]5from socket import error as socket_error
[6ff0b91]6from BaseHTTPServer import BaseHTTPRequestHandler
7
8from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
9    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
10
11from M2Crypto import SSL
[8ecfbad]12from M2Crypto.SSL.SSLServer import ThreadingSSLServer
[329f61d]13import xmlrpclib
[6ff0b91]14
15from optparse import OptionParser
16
[51cc9df]17from fedd_util import fedd_ssl_context
18from fedid import fedid
[19cc408]19from fedd_deter_impl import new_feddservice
[f4f4117]20from fedd_services import ns0
[19cc408]21from service_error import *
[6ff0b91]22
[a97394b]23from threading import *
[11a08b0]24from signal import signal, pause, SIGINT, SIGTERM
[d199ced]25from select import select, error
[0ea11af]26from time import sleep
[11a08b0]27import logging
[72ed6e4]28from ConfigParser import *
[a97394b]29
[6ff0b91]30# The SSL server here is based on the implementation described at
31# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
32
33# Turn off the matching of hostname to certificate ID
34SSL.Connection.clientPostConnectionCheck = None
35
[72ed6e4]36class fedd_config_parser(SafeConfigParser):
37    """
38    A SafeConfig parser with a more forgiving get attribute
39    """
[f4f4117]40
41    def safe_get(self, sect, opt, method, default=None):
42        """
43        If the option is present, return it, otherwise the default.
44        """
45        if self.has_option(sect, opt): return method(self, sect, opt)
46        else: return default
47
[72ed6e4]48    def get(self, sect, opt, default=None):
49        """
50        This returns the requested option or a given default.
51
52        It's more like getattr than get.
53        """
[f4f4117]54
55        return self.safe_get(sect, opt, SafeConfigParser.get, default)
56
57    def getint(self, sect, opt, default=0):
58        """
59        Returns the selected option as an int or a default.
60        """
61
62        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
63
64    def getfloat(self, sect, opt, default=0.0):
65        """
66        Returns the selected option as an int or a default.
67        """
68
69        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
70
71    def getboolean(self, sect, opt, default=False):
72        """
73        Returns the selected option as a boolean or a default.
74        """
75
76        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
[72ed6e4]77
[8ecfbad]78class fedd_server(ThreadingSSLServer):
[0ea11af]79    """
80    Interface the fedd services to the XMLRPC and SOAP interfaces
81    """
[6ff0b91]82    def __init__(self, ME, handler, ssl_ctx, impl):
[0ea11af]83        """
84        Create an SSL server that handles the transport in handler using the
85        credentials in ssl_ctx, and interfacing to the implementation of fedd
86        services in fedd.  ME is the host port pair on which to bind.
87        """
[8ecfbad]88        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
[6ff0b91]89        self.impl = impl
[0b466d1]90        self.soap_methods = impl.soap_services
91        self.xmlrpc_methods = impl.xmlrpc_services
92        self.log = logging.getLogger("fedd")
[6ff0b91]93
[329f61d]94class fedd_soap_handler(BaseHTTPRequestHandler):
[0ea11af]95    """
96    Standard connection between SOAP and the fedd services in impl.
97
98    Much of this is boilerplate from
99    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
100    """
[6ff0b91]101    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
102
103    def send_xml(self, text, code=200):
104        """Send an XML document as reply"""
105        self.send_response(code)
106        self.send_header('Content-type', 'text/xml; charset="utf-8"')
107        self.send_header('Content-Length', str(len(text)))
108        self.end_headers()
109        self.wfile.write(text)
110        self.wfile.flush()
[dab4d56]111        self.request.socket.close()
[6ff0b91]112
113    def send_fault(self, f, code=500):
114        """Send a SOAP encoded fault as reply"""
[0a47d52]115        self.send_xml(f.AsSOAP(processContents="lax"), code)
[6ff0b91]116
117    def check_headers(self, ps):
118        """Send a fault for any required envelope headers"""
119        for (uri, localname) in ps.WhatMustIUnderstand():
120            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
121            return False
122        return  True
123
124    def check_method(self, ps):
125        """Confirm that this class implements the namespace and SOAP method"""
126        root = ps.body_root
[7aec37d]127        if root.namespaceURI not in self.server.impl.soap_namespaces:
[6ff0b91]128            self.send_fault(Fault(Fault.Client, 
129                'Unknown namespace "%s"' % root.namespaceURI))
130            return False
[329f61d]131
132        if getattr(root, 'localName', None) == None:
133            self.send_fault(Fault(Fault.Client, 'No method"'))
[6ff0b91]134            return False
135        return True
136
137    def do_POST(self):
138        """Treat an HTTP POST request as a SOAP service call"""
139        try:
140            cl = int(self.headers['content-length'])
141            data = self.rfile.read(cl)
142            ps = ParsedSoap(data)
143        except ParseException, e:
[0a47d52]144            self.send_fault(Fault(Fault.Client, str(e)))
[6ff0b91]145            return
146        except Exception, e:
147            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
148            return
149        if not self.check_headers(ps): return
150        if not self.check_method(ps): return
151        try:
[19cc408]152            resp = self.soap_dispatch(ps.body_root.localName, ps,
[329f61d]153                    fedid(cert=self.request.get_peer_cert()))
[6ff0b91]154        except Fault, f:
155            self.send_fault(f)
156            resp = None
157       
158        if resp != None:
159            sw = SoapWriter()
160            sw.serialize(resp)
161            self.send_xml(str(sw))
162
[0b466d1]163    def log_request(self, code=0, size=0):
164        """
165        Log request to the fedd logger
166        """
167        self.server.log.info("Successful SOAP request code %d" % code)
168
[19cc408]169    def soap_dispatch(self, method, req, fid):
[0ea11af]170        """
171        The connection to the implementation, using the  method maps
172
173        The implementation provides a mapping from SOAP method name to the
174        method in the implementation that provides the service.
175        """
[19cc408]176        if self.server.soap_methods.has_key(method):
177            try:
178                return self.server.soap_methods[method](req, fid)
179            except service_error, e:
180                de = ns0.faultType_Def(
181                        (ns0.faultType_Def.schema,
182                            "FeddFaultBody")).pyclass()
183                de._code=e.code
184                de._errstr=e.code_string()
185                de._desc=e.desc
186                if  e.is_server_error():
187                    raise Fault(Fault.Server, e.code_string(), detail=de)
188                else:
189                    raise Fault(Fault.Client, e.code_string(), detail=de)
190        else:
191            raise Fault(Fault.Client, "Unknown method: %s" % method)
192
193
[329f61d]194class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
[0ea11af]195    """
196    Standard connection between XMLRPC and the fedd services in impl.
197
198    Much of this is boilerplate from
199    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
200    """
[329f61d]201    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
202
203    def send_xml(self, text, code=200):
204        """Send an XML document as reply"""
205        self.send_response(code)
206        self.send_header('Content-type', 'text/xml; charset="utf-8"')
207        self.send_header('Content-Length', str(len(text)))
208        self.end_headers()
209        self.wfile.write(text)
210        self.wfile.flush()
[dab4d56]211        # Make sure to close the socket when we're done
212        self.request.socket.close()
[329f61d]213
214    def do_POST(self):
215        """Treat an HTTP POST request as an XMLRPC service call"""
[058f58e]216        # NB: XMLRPC faults are not HTTP errors, so the code is always 200,
217        # unless an HTTP error occurs, which we don't handle.
[329f61d]218
219        resp = None
220        data = None
[0a47d52]221        method = None
[329f61d]222        cl = int(self.headers['content-length'])
223        data = self.rfile.read(cl)
224
225        try:
[0a47d52]226            params, method = xmlrpclib.loads(data)
227        except xmlrpclib.ResponseError:
228            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
229                "Malformed request"), methodresponse=True)
[dab4d56]230
[0a47d52]231        if method != None:
232            try:
[19cc408]233                resp = self.xmlrpc_dispatch(method, params,
[0a47d52]234                            fedid(cert=self.request.get_peer_cert()))
[bcbf543]235                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
236                        methodresponse=True)
[0a47d52]237            except xmlrpclib.Fault, f:
238                data = xmlrpclib.dumps(f, methodresponse=True)
239                resp = None
[dab4d56]240
[058f58e]241        self.send_xml(data)
[329f61d]242
[0b466d1]243    def log_request(self, code=0, size=0):
244        """
245        Log request to the fedd logger
246        """
247        self.server.log.info("Successful XMLRPC request code %d" % code)
248
[329f61d]249
[19cc408]250    def xmlrpc_dispatch(self, method, req, fid):
[0ea11af]251        """
252        The connection to the implementation, using the  method maps
253
254        The implementation provides a mapping from XMLRPC method name to the
255        method in the implementation that provides the service.
256        """
[19cc408]257        if self.server.xmlrpc_methods.has_key(method):
258            try:
259                return self.server.xmlrpc_methods[method](req, fid)
260            except service_error, e:
261                raise xmlrpclib.Fault(e.code_string(), e.desc)
262        else:
263            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
264
[6ff0b91]265class fedd_opts(OptionParser):
266    """Encapsulate option processing in this class, rather than in main"""
267    def __init__(self):
268        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
269                version="0.1")
270
[a2da110]271        self.set_defaults(logfile=None, debug=0)
[6ff0b91]272
273        self.add_option("-d", "--debug", action="count", dest="debug", 
274                help="Set debug.  Repeat for more information")
275        self.add_option("-f", "--configfile", action="store",
[a2da110]276                default="/usr/local/etc/fedd.conf",
[6ff0b91]277                dest="configfile", help="Configuration file (required)")
[11a08b0]278        self.add_option("-l", "--logfile", action="store", dest="logfile", 
279                help="File to send log messages to")
[6ff0b91]280        self.add_option("--trace", action="store_const", dest="tracefile", 
281                const=sys.stderr, help="Print SOAP exchange to stderr")
282
[0ea11af]283servers_active = True       # Sub-servers run while this is True
284servers = [ ]               # fedd_server instances instantiated from services
285servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
[11a08b0]286
287def shutdown(sig, frame):
[0ea11af]288    """
289    On a signal, stop running sub-servers. 
290   
291    This is connected to signals below
292    """
[11a08b0]293    global servers_active, flog
[d199ced]294
[11a08b0]295    servers_active = False
296    flog.info("Received signal %d, shutting down" % sig);
297
[a97394b]298def run_server(s):
[0ea11af]299    """
300    Operate a subserver, shutting down when servers_active is false.
301
302    Each server (that is host/port/transport triple) has a thread running this
303    function, so each can handle requests independently.  They all call in to
304    the same implementation, which must manage its own synchronization.
305    """
[11a08b0]306    global servers_active   # Not strictly needed: servers_active is only read
[0ea11af]307    global servers          # List of active servers
308    global servers_lock     # Lock to manipulate servers
[11a08b0]309
[0ea11af]310    while servers_active:
[d199ced]311        try:
312            i, o, e = select((s,), (), (), 1.0)
313            if s in i: s.handle_request()
314        except error:
315            # The select call seems to get interrupted by signals as well as
316            # the main thread.  This essentially ignores signals in this
317            # thread.
318            pass
[a97394b]319
[0ea11af]320    # Done.  Remove us from the list
321    servers_lock.acquire()
322    servers.remove(s)
323    servers_lock.release()
[a97394b]324
[6ff0b91]325opts, args = fedd_opts().parse_args()
326
[0ea11af]327# Logging setup
[11a08b0]328flog = logging.getLogger("fedd")
[0ea11af]329ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
330        '%d %b %y %H:%M:%S')
[11a08b0]331
332if opts.logfile: fh = logging.FileHandler(opts.logfile)
333else: fh = logging.StreamHandler(sys.stdout)
334
335# The handler will print anything, setting the logger level will affect what
336# gets recorded.
337fh.setLevel(logging.DEBUG)
338
339if opts.debug: flog.setLevel(logging.DEBUG)
340else: flog.setLevel(logging.INFO)
341
342fh.setFormatter(ffmt)
343flog.addHandler(fh)
344
[72ed6e4]345
346
[0ea11af]347# Initialize the implementation
[6ff0b91]348if opts.configfile != None: 
349    try:
[72ed6e4]350        config= fedd_config_parser()
351        config.read(opts.configfile)
352    except e:
353        sys.exit("Cannot parse confgi file: %s" % e)
[6ff0b91]354else: 
355    sys.exit("--configfile is required")
356
[72ed6e4]357try:
358    impl = new_feddservice(config)
359except RuntimeError, e:
360    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
361            "No message"
362    sys.exit("Error configuring fedd: %s" % str)
363
[6ff0b91]364if impl.cert_file == None:
365    sys.exit("Must supply certificate file (probably in config)")
366
[0ea11af]367# Create the SSL credentials
[2106ed1]368ctx = None
369while ctx == None:
[6ff0b91]370    try:
371        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
372                password=impl.cert_pwd)
373    except SSL.SSLError, e:
[2106ed1]374        if str(e) != "bad decrypt" or impl.cert_pwd != None:
[6ff0b91]375            raise
376
[a2da110]377services = config.get("globals", "services", "23235")
[a97394b]378
[a2da110]379for s in services.split(","):
380    s = s.strip()
381    colons = s.count(":")
382    try:
383        if colons == 0:
384            p = int(s)
385            h = ''
386            t = 'soap'
387        elif colons == 1:
388            p, t  = s.split(":")
389            p = int(p)
390            h = ''
391        elif colons == 2:
392            h, p, t  = s.split(":")
393            p = int(p)
394        else:
395            flog.error("Invalid service specification %s ignored." % s)
396            continue
397    except ValueError:
398        flog.error("Error converting port to integer in %s: spec ignored" % s)
399        continue
[a97394b]400
[a2da110]401    t = t.lower()
402    try:
403        if t == 'soap':
404            servers.append(fedd_server((h, p), fedd_soap_handler, ctx, impl))
405        elif t == 'xmlrpc':
406            servers.append(fedd_server((h, p), fedd_xmlrpc_handler, ctx, impl))
407        else:
408            flog.error("Invalid transport specification (%s) in service %s" % \
409                    (t, s))
410            continue
411    except socket_error, e:
412        flog.error("Cannot create server for %s: %s" % (s, e[1]))
413        continue
[11a08b0]414
[0ea11af]415#  Make sure that there are no malformed servers in the list
[a2da110]416servers = [ s for s in servers if s ]
[0ea11af]417
418# Catch signals
[11a08b0]419signal(SIGINT, shutdown)
420signal(SIGTERM, shutdown)
[a97394b]421
[0ea11af]422# Start the servers
[a97394b]423for s in servers:
[0ea11af]424    Thread(target=run_server, args=(s,)).start()
425
426# Main thread waits for signals
427while servers_active:
[d199ced]428    sleep(1.0)
[0ea11af]429
430#Once shutdown starts wait for all the servers to terminate.
431while True:
432    servers_lock.acquire()
433    if len(servers) == 0: 
434        servers_lock.release()
435        flog.info("All servers exited.  Terminating")
436        sys.exit(0)
437    servers_lock.release()
438    sleep(1)
[11a08b0]439
Note: See TracBrowser for help on using the repository browser.