source: fedd/fedd.py @ 72ed6e4

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 72ed6e4 was 72ed6e4, checked in by Ted Faber <faber@…>, 15 years ago

refactor configuration parsing to make code extensions more modular

  • Property mode set to 100755
File size: 11.9 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from service_error import *
19
20from threading import *
21from signal import signal, pause, SIGINT, SIGTERM
22from select import select, error
23from time import sleep
24import logging
25from ConfigParser import *
26
27# The SSL server here is based on the implementation described at
28# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
29
30# Turn off the matching of hostname to certificate ID
31SSL.Connection.clientPostConnectionCheck = None
32
33class fedd_config_parser(SafeConfigParser):
34    """
35    A SafeConfig parser with a more forgiving get attribute
36    """
37    def get(self, sect, opt, default=None):
38        """
39        This returns the requested option or a given default.
40
41        It's more like getattr than get.
42        """
43        if self.has_option(sect, opt):
44            return SafeConfigParser.get(self, sect, opt)
45        else:
46            return default
47
48class fedd_server(ThreadingSSLServer):
49    """
50    Interface the fedd services to the XMLRPC and SOAP interfaces
51    """
52    def __init__(self, ME, handler, ssl_ctx, impl):
53        """
54        Create an SSL server that handles the transport in handler using the
55        credentials in ssl_ctx, and interfacing to the implementation of fedd
56        services in fedd.  ME is the host port pair on which to bind.
57        """
58        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
59        self.impl = impl
60        self.soap_methods = impl.soap_services
61        self.xmlrpc_methods = impl.xmlrpc_services
62        self.log = logging.getLogger("fedd")
63
64class fedd_soap_handler(BaseHTTPRequestHandler):
65    """
66    Standard connection between SOAP and the fedd services in impl.
67
68    Much of this is boilerplate from
69    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
70    """
71    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
72
73    def send_xml(self, text, code=200):
74        """Send an XML document as reply"""
75        self.send_response(code)
76        self.send_header('Content-type', 'text/xml; charset="utf-8"')
77        self.send_header('Content-Length', str(len(text)))
78        self.end_headers()
79        self.wfile.write(text)
80        self.wfile.flush()
81
82    def send_fault(self, f, code=500):
83        """Send a SOAP encoded fault as reply"""
84        self.send_xml(f.AsSOAP(processContents="lax"), code)
85
86    def check_headers(self, ps):
87        """Send a fault for any required envelope headers"""
88        for (uri, localname) in ps.WhatMustIUnderstand():
89            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
90            return False
91        return  True
92
93    def check_method(self, ps):
94        """Confirm that this class implements the namespace and SOAP method"""
95        root = ps.body_root
96        if root.namespaceURI not in self.server.impl.soap_namespaces:
97            self.send_fault(Fault(Fault.Client, 
98                'Unknown namespace "%s"' % root.namespaceURI))
99            return False
100
101        if getattr(root, 'localName', None) == None:
102            self.send_fault(Fault(Fault.Client, 'No method"'))
103            return False
104        return True
105
106    def do_POST(self):
107        """Treat an HTTP POST request as a SOAP service call"""
108        try:
109            cl = int(self.headers['content-length'])
110            data = self.rfile.read(cl)
111            ps = ParsedSoap(data)
112        except ParseException, e:
113            self.send_fault(Fault(Fault.Client, str(e)))
114            return
115        except Exception, e:
116            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
117            return
118        if not self.check_headers(ps): return
119        if not self.check_method(ps): return
120        try:
121            resp = self.soap_dispatch(ps.body_root.localName, ps,
122                    fedid(cert=self.request.get_peer_cert()))
123        except Fault, f:
124            self.send_fault(f)
125            resp = None
126       
127        if resp != None:
128            sw = SoapWriter()
129            sw.serialize(resp)
130            self.send_xml(str(sw))
131
132    def log_request(self, code=0, size=0):
133        """
134        Log request to the fedd logger
135        """
136        self.server.log.info("Successful SOAP request code %d" % code)
137
138    def soap_dispatch(self, method, req, fid):
139        """
140        The connection to the implementation, using the  method maps
141
142        The implementation provides a mapping from SOAP method name to the
143        method in the implementation that provides the service.
144        """
145        if self.server.soap_methods.has_key(method):
146            try:
147                return self.server.soap_methods[method](req, fid)
148            except service_error, e:
149                de = ns0.faultType_Def(
150                        (ns0.faultType_Def.schema,
151                            "FeddFaultBody")).pyclass()
152                de._code=e.code
153                de._errstr=e.code_string()
154                de._desc=e.desc
155                if  e.is_server_error():
156                    raise Fault(Fault.Server, e.code_string(), detail=de)
157                else:
158                    raise Fault(Fault.Client, e.code_string(), detail=de)
159        else:
160            raise Fault(Fault.Client, "Unknown method: %s" % method)
161
162
163class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
164    """
165    Standard connection between XMLRPC and the fedd services in impl.
166
167    Much of this is boilerplate from
168    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
169    """
170    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
171
172    def send_xml(self, text, code=200):
173        """Send an XML document as reply"""
174        self.send_response(code)
175        self.send_header('Content-type', 'text/xml; charset="utf-8"')
176        self.send_header('Content-Length', str(len(text)))
177        self.end_headers()
178        self.wfile.write(text)
179        self.wfile.flush()
180
181    def do_POST(self):
182        """Treat an HTTP POST request as an XMLRPC service call"""
183
184        resp = None
185        data = None
186        method = None
187        cl = int(self.headers['content-length'])
188        data = self.rfile.read(cl)
189
190        try:
191            params, method = xmlrpclib.loads(data)
192        except xmlrpclib.ResponseError:
193            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
194                "Malformed request"), methodresponse=True)
195       
196        if method != None:
197            try:
198                resp = self.xmlrpc_dispatch(method, params,
199                            fedid(cert=self.request.get_peer_cert()))
200                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
201                        methodresponse=True)
202            except xmlrpclib.Fault, f:
203                data = xmlrpclib.dumps(f, methodresponse=True)
204                resp = None
205        self.send_xml(data)
206
207    def log_request(self, code=0, size=0):
208        """
209        Log request to the fedd logger
210        """
211        self.server.log.info("Successful XMLRPC request code %d" % code)
212
213
214    def xmlrpc_dispatch(self, method, req, fid):
215        """
216        The connection to the implementation, using the  method maps
217
218        The implementation provides a mapping from XMLRPC method name to the
219        method in the implementation that provides the service.
220        """
221        if self.server.xmlrpc_methods.has_key(method):
222            try:
223                return self.server.xmlrpc_methods[method](req, fid)
224            except service_error, e:
225                raise xmlrpclib.Fault(e.code_string(), e.desc)
226        else:
227            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
228
229class fedd_opts(OptionParser):
230    """Encapsulate option processing in this class, rather than in main"""
231    def __init__(self):
232        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
233                version="0.1")
234
235        self.set_defaults(host="localhost", port=23235, transport="soap",
236                logfile=None, debug=0)
237
238        self.add_option("-d", "--debug", action="count", dest="debug", 
239                help="Set debug.  Repeat for more information")
240        self.add_option("-f", "--configfile", action="store",
241                dest="configfile", help="Configuration file (required)")
242        self.add_option("-H", "--host", action="store", type="string",
243                dest="host", help="Hostname to listen on (default %default)")
244        self.add_option("-l", "--logfile", action="store", dest="logfile", 
245                help="File to send log messages to")
246        self.add_option("-p", "--port", action="store", type="int",
247                dest="port", help="Port to listen on (default %default)")
248        self.add_option("-s", "--service", action="append", type="string",
249                dest="services",
250                help="Service description: host:port:transport")
251        self.add_option("-x","--transport", action="store", type="choice",
252                choices=("xmlrpc", "soap"),
253                help="Transport for request (xmlrpc|soap) (Default: %default)")
254        self.add_option("--trace", action="store_const", dest="tracefile", 
255                const=sys.stderr, help="Print SOAP exchange to stderr")
256
257servers_active = True       # Sub-servers run while this is True
258services = [ ]              # Service descriptions
259servers = [ ]               # fedd_server instances instantiated from services
260servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
261
262def shutdown(sig, frame):
263    """
264    On a signal, stop running sub-servers. 
265   
266    This is connected to signals below
267    """
268    global servers_active, flog
269
270    servers_active = False
271    flog.info("Received signal %d, shutting down" % sig);
272
273def run_server(s):
274    """
275    Operate a subserver, shutting down when servers_active is false.
276
277    Each server (that is host/port/transport triple) has a thread running this
278    function, so each can handle requests independently.  They all call in to
279    the same implementation, which must manage its own synchronization.
280    """
281    global servers_active   # Not strictly needed: servers_active is only read
282    global servers          # List of active servers
283    global servers_lock     # Lock to manipulate servers
284
285    while servers_active:
286        try:
287            i, o, e = select((s,), (), (), 1.0)
288            if s in i: s.handle_request()
289        except error:
290            # The select call seems to get interrupted by signals as well as
291            # the main thread.  This essentially ignores signals in this
292            # thread.
293            pass
294
295    # Done.  Remove us from the list
296    servers_lock.acquire()
297    servers.remove(s)
298    servers_lock.release()
299
300opts, args = fedd_opts().parse_args()
301
302# Logging setup
303flog = logging.getLogger("fedd")
304ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
305        '%d %b %y %H:%M:%S')
306
307if opts.logfile: fh = logging.FileHandler(opts.logfile)
308else: fh = logging.StreamHandler(sys.stdout)
309
310# The handler will print anything, setting the logger level will affect what
311# gets recorded.
312fh.setLevel(logging.DEBUG)
313
314if opts.debug: flog.setLevel(logging.DEBUG)
315else: flog.setLevel(logging.INFO)
316
317fh.setFormatter(ffmt)
318flog.addHandler(fh)
319
320
321
322# Initialize the implementation
323if opts.configfile != None: 
324    try:
325        config= fedd_config_parser()
326        config.read(opts.configfile)
327    except e:
328        sys.exit("Cannot parse confgi file: %s" % e)
329else: 
330    sys.exit("--configfile is required")
331
332try:
333    impl = new_feddservice(config)
334except RuntimeError, e:
335    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
336            "No message"
337    sys.exit("Error configuring fedd: %s" % str)
338
339if impl.cert_file == None:
340    sys.exit("Must supply certificate file (probably in config)")
341
342# Create the SSL credentials
343ctx = None
344while ctx == None:
345    try:
346        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
347                password=impl.cert_pwd)
348    except SSL.SSLError, e:
349        if str(e) != "bad decrypt" or impl.cert_pwd != None:
350            raise
351
352# Walk through the service descriptions and pack them into the services list.
353# That list has the form (transport (host, port)).
354if opts.services:
355    for s in opts.services:
356        h, p, t  = s.split(':')
357
358        if not h: h = opts.host
359        if not p: p = opts.port
360        if not t: h = opts.transport
361
362        p = int(p)
363
364        services.append((t, (h, p)))
365else:
366    services.append((opts.transport, (opts.host, opts.port)))
367
368# Create the servers and put them into a list
369for s in services:
370    if s[0] == "soap":
371        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
372    elif s[0] == "xmlrpc":
373        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
374    else: flog.warning("Unknown transport: %s" % s[0])
375
376#  Make sure that there are no malformed servers in the list
377services = [ s for s in services if s ]
378
379# Catch signals
380signal(SIGINT, shutdown)
381signal(SIGTERM, shutdown)
382
383# Start the servers
384for s in servers:
385    Thread(target=run_server, args=(s,)).start()
386
387# Main thread waits for signals
388while servers_active:
389    sleep(1.0)
390
391#Once shutdown starts wait for all the servers to terminate.
392while True:
393    servers_lock.acquire()
394    if len(servers) == 0: 
395        servers_lock.release()
396        flog.info("All servers exited.  Terminating")
397        sys.exit(0)
398    servers_lock.release()
399    sleep(1)
400
Note: See TracBrowser for help on using the repository browser.