source: fedd/fedd.py @ ac54ef3

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since ac54ef3 was 51cc9df, checked in by Ted Faber <faber@…>, 16 years ago

split fedid out

  • Property mode set to 100755
File size: 12.8 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context
17from fedid import fedid
18from fedd_deter_impl import new_feddservice
19from fedd_services import ns0
20from service_error import *
21
22from threading import *
23from signal import signal, pause, SIGINT, SIGTERM
24from select import select, error
25from time import sleep
26import logging
27from ConfigParser import *
28
29# The SSL server here is based on the implementation described at
30# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
31
32# Turn off the matching of hostname to certificate ID
33SSL.Connection.clientPostConnectionCheck = None
34
35class fedd_config_parser(SafeConfigParser):
36    """
37    A SafeConfig parser with a more forgiving get attribute
38    """
39
40    def safe_get(self, sect, opt, method, default=None):
41        """
42        If the option is present, return it, otherwise the default.
43        """
44        if self.has_option(sect, opt): return method(self, sect, opt)
45        else: return default
46
47    def get(self, sect, opt, default=None):
48        """
49        This returns the requested option or a given default.
50
51        It's more like getattr than get.
52        """
53
54        return self.safe_get(sect, opt, SafeConfigParser.get, default)
55
56    def getint(self, sect, opt, default=0):
57        """
58        Returns the selected option as an int or a default.
59        """
60
61        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
62
63    def getfloat(self, sect, opt, default=0.0):
64        """
65        Returns the selected option as an int or a default.
66        """
67
68        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
69
70    def getboolean(self, sect, opt, default=False):
71        """
72        Returns the selected option as a boolean or a default.
73        """
74
75        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
76
77class fedd_server(ThreadingSSLServer):
78    """
79    Interface the fedd services to the XMLRPC and SOAP interfaces
80    """
81    def __init__(self, ME, handler, ssl_ctx, impl):
82        """
83        Create an SSL server that handles the transport in handler using the
84        credentials in ssl_ctx, and interfacing to the implementation of fedd
85        services in fedd.  ME is the host port pair on which to bind.
86        """
87        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
88        self.impl = impl
89        self.soap_methods = impl.soap_services
90        self.xmlrpc_methods = impl.xmlrpc_services
91        self.log = logging.getLogger("fedd")
92
93class fedd_soap_handler(BaseHTTPRequestHandler):
94    """
95    Standard connection between SOAP and the fedd services in impl.
96
97    Much of this is boilerplate from
98    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
99    """
100    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
101
102    def send_xml(self, text, code=200):
103        """Send an XML document as reply"""
104        self.send_response(code)
105        self.send_header('Content-type', 'text/xml; charset="utf-8"')
106        self.send_header('Content-Length', str(len(text)))
107        self.end_headers()
108        self.wfile.write(text)
109        self.wfile.flush()
110        self.request.socket.close()
111
112    def send_fault(self, f, code=500):
113        """Send a SOAP encoded fault as reply"""
114        self.send_xml(f.AsSOAP(processContents="lax"), code)
115
116    def check_headers(self, ps):
117        """Send a fault for any required envelope headers"""
118        for (uri, localname) in ps.WhatMustIUnderstand():
119            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
120            return False
121        return  True
122
123    def check_method(self, ps):
124        """Confirm that this class implements the namespace and SOAP method"""
125        root = ps.body_root
126        if root.namespaceURI not in self.server.impl.soap_namespaces:
127            self.send_fault(Fault(Fault.Client, 
128                'Unknown namespace "%s"' % root.namespaceURI))
129            return False
130
131        if getattr(root, 'localName', None) == None:
132            self.send_fault(Fault(Fault.Client, 'No method"'))
133            return False
134        return True
135
136    def do_POST(self):
137        """Treat an HTTP POST request as a SOAP service call"""
138        try:
139            cl = int(self.headers['content-length'])
140            data = self.rfile.read(cl)
141            ps = ParsedSoap(data)
142        except ParseException, e:
143            self.send_fault(Fault(Fault.Client, str(e)))
144            return
145        except Exception, e:
146            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
147            return
148        if not self.check_headers(ps): return
149        if not self.check_method(ps): return
150        try:
151            resp = self.soap_dispatch(ps.body_root.localName, ps,
152                    fedid(cert=self.request.get_peer_cert()))
153        except Fault, f:
154            self.send_fault(f)
155            resp = None
156       
157        if resp != None:
158            sw = SoapWriter()
159            sw.serialize(resp)
160            self.send_xml(str(sw))
161
162    def log_request(self, code=0, size=0):
163        """
164        Log request to the fedd logger
165        """
166        self.server.log.info("Successful SOAP request code %d" % code)
167
168    def soap_dispatch(self, method, req, fid):
169        """
170        The connection to the implementation, using the  method maps
171
172        The implementation provides a mapping from SOAP method name to the
173        method in the implementation that provides the service.
174        """
175        if self.server.soap_methods.has_key(method):
176            try:
177                return self.server.soap_methods[method](req, fid)
178            except service_error, e:
179                de = ns0.faultType_Def(
180                        (ns0.faultType_Def.schema,
181                            "FeddFaultBody")).pyclass()
182                de._code=e.code
183                de._errstr=e.code_string()
184                de._desc=e.desc
185                if  e.is_server_error():
186                    raise Fault(Fault.Server, e.code_string(), detail=de)
187                else:
188                    raise Fault(Fault.Client, e.code_string(), detail=de)
189        else:
190            raise Fault(Fault.Client, "Unknown method: %s" % method)
191
192
193class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
194    """
195    Standard connection between XMLRPC and the fedd services in impl.
196
197    Much of this is boilerplate from
198    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
199    """
200    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
201
202    def send_xml(self, text, code=200):
203        """Send an XML document as reply"""
204        self.send_response(code)
205        self.send_header('Content-type', 'text/xml; charset="utf-8"')
206        self.send_header('Content-Length', str(len(text)))
207        self.end_headers()
208        self.wfile.write(text)
209        self.wfile.flush()
210        # Make sure to close the socket when we're done
211        self.request.socket.close()
212
213    def do_POST(self):
214        """Treat an HTTP POST request as an XMLRPC service call"""
215        # NB: XMLRPC faults are not HTTP errors, so the code is always 200,
216        # unless an HTTP error occurs, which we don't handle.
217
218        resp = None
219        data = None
220        method = None
221        cl = int(self.headers['content-length'])
222        data = self.rfile.read(cl)
223
224        try:
225            params, method = xmlrpclib.loads(data)
226        except xmlrpclib.ResponseError:
227            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
228                "Malformed request"), methodresponse=True)
229
230        if method != None:
231            try:
232                resp = self.xmlrpc_dispatch(method, params,
233                            fedid(cert=self.request.get_peer_cert()))
234                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
235                        methodresponse=True)
236            except xmlrpclib.Fault, f:
237                data = xmlrpclib.dumps(f, methodresponse=True)
238                resp = None
239
240        self.send_xml(data)
241
242    def log_request(self, code=0, size=0):
243        """
244        Log request to the fedd logger
245        """
246        self.server.log.info("Successful XMLRPC request code %d" % code)
247
248
249    def xmlrpc_dispatch(self, method, req, fid):
250        """
251        The connection to the implementation, using the  method maps
252
253        The implementation provides a mapping from XMLRPC method name to the
254        method in the implementation that provides the service.
255        """
256        if self.server.xmlrpc_methods.has_key(method):
257            try:
258                return self.server.xmlrpc_methods[method](req, fid)
259            except service_error, e:
260                raise xmlrpclib.Fault(e.code_string(), e.desc)
261        else:
262            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
263
264class fedd_opts(OptionParser):
265    """Encapsulate option processing in this class, rather than in main"""
266    def __init__(self):
267        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
268                version="0.1")
269
270        self.set_defaults(host="localhost", port=23235, transport="soap",
271                logfile=None, debug=0)
272
273        self.add_option("-d", "--debug", action="count", dest="debug", 
274                help="Set debug.  Repeat for more information")
275        self.add_option("-f", "--configfile", action="store",
276                dest="configfile", help="Configuration file (required)")
277        self.add_option("-H", "--host", action="store", type="string",
278                dest="host", help="Hostname to listen on (default %default)")
279        self.add_option("-l", "--logfile", action="store", dest="logfile", 
280                help="File to send log messages to")
281        self.add_option("-p", "--port", action="store", type="int",
282                dest="port", help="Port to listen on (default %default)")
283        self.add_option("-s", "--service", action="append", type="string",
284                dest="services",
285                help="Service description: host:port:transport")
286        self.add_option("-x","--transport", action="store", type="choice",
287                choices=("xmlrpc", "soap"),
288                help="Transport for request (xmlrpc|soap) (Default: %default)")
289        self.add_option("--trace", action="store_const", dest="tracefile", 
290                const=sys.stderr, help="Print SOAP exchange to stderr")
291
292servers_active = True       # Sub-servers run while this is True
293services = [ ]              # Service descriptions
294servers = [ ]               # fedd_server instances instantiated from services
295servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
296
297def shutdown(sig, frame):
298    """
299    On a signal, stop running sub-servers. 
300   
301    This is connected to signals below
302    """
303    global servers_active, flog
304
305    servers_active = False
306    flog.info("Received signal %d, shutting down" % sig);
307
308def run_server(s):
309    """
310    Operate a subserver, shutting down when servers_active is false.
311
312    Each server (that is host/port/transport triple) has a thread running this
313    function, so each can handle requests independently.  They all call in to
314    the same implementation, which must manage its own synchronization.
315    """
316    global servers_active   # Not strictly needed: servers_active is only read
317    global servers          # List of active servers
318    global servers_lock     # Lock to manipulate servers
319
320    while servers_active:
321        try:
322            i, o, e = select((s,), (), (), 1.0)
323            if s in i: s.handle_request()
324        except error:
325            # The select call seems to get interrupted by signals as well as
326            # the main thread.  This essentially ignores signals in this
327            # thread.
328            pass
329
330    # Done.  Remove us from the list
331    servers_lock.acquire()
332    servers.remove(s)
333    servers_lock.release()
334
335opts, args = fedd_opts().parse_args()
336
337# Logging setup
338flog = logging.getLogger("fedd")
339ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
340        '%d %b %y %H:%M:%S')
341
342if opts.logfile: fh = logging.FileHandler(opts.logfile)
343else: fh = logging.StreamHandler(sys.stdout)
344
345# The handler will print anything, setting the logger level will affect what
346# gets recorded.
347fh.setLevel(logging.DEBUG)
348
349if opts.debug: flog.setLevel(logging.DEBUG)
350else: flog.setLevel(logging.INFO)
351
352fh.setFormatter(ffmt)
353flog.addHandler(fh)
354
355
356
357# Initialize the implementation
358if opts.configfile != None: 
359    try:
360        config= fedd_config_parser()
361        config.read(opts.configfile)
362    except e:
363        sys.exit("Cannot parse confgi file: %s" % e)
364else: 
365    sys.exit("--configfile is required")
366
367try:
368    impl = new_feddservice(config)
369except RuntimeError, e:
370    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
371            "No message"
372    sys.exit("Error configuring fedd: %s" % str)
373
374if impl.cert_file == None:
375    sys.exit("Must supply certificate file (probably in config)")
376
377# Create the SSL credentials
378ctx = None
379while ctx == None:
380    try:
381        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
382                password=impl.cert_pwd)
383    except SSL.SSLError, e:
384        if str(e) != "bad decrypt" or impl.cert_pwd != None:
385            raise
386
387# Walk through the service descriptions and pack them into the services list.
388# That list has the form (transport (host, port)).
389if opts.services:
390    for s in opts.services:
391        h, p, t  = s.split(':')
392
393        if not h: h = opts.host
394        if not p: p = opts.port
395        if not t: h = opts.transport
396
397        p = int(p)
398
399        services.append((t, (h, p)))
400else:
401    services.append((opts.transport, (opts.host, opts.port)))
402
403# Create the servers and put them into a list
404for s in services:
405    if s[0] == "soap":
406        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
407    elif s[0] == "xmlrpc":
408        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
409    else: flog.warning("Unknown transport: %s" % s[0])
410
411#  Make sure that there are no malformed servers in the list
412services = [ s for s in services if s ]
413
414# Catch signals
415signal(SIGINT, shutdown)
416signal(SIGTERM, shutdown)
417
418# Start the servers
419for s in servers:
420    Thread(target=run_server, args=(s,)).start()
421
422# Main thread waits for signals
423while servers_active:
424    sleep(1.0)
425
426#Once shutdown starts wait for all the servers to terminate.
427while True:
428    servers_lock.acquire()
429    if len(servers) == 0: 
430        servers_lock.release()
431        flog.info("All servers exited.  Terminating")
432        sys.exit(0)
433    servers_lock.release()
434    sleep(1)
435
Note: See TracBrowser for help on using the repository browser.