source: fedd/fedd.py @ 0ea11af

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 0ea11af was 0ea11af, checked in by Ted Faber <faber@…>, 16 years ago

clean up and add some docs

  • Property mode set to 100755
File size: 10.8 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from service_error import *
19
20from threading import *
21from signal import signal, pause, SIGINT, SIGTERM
22from select import select
23from time import sleep
24import logging
25
26# The SSL server here is based on the implementation described at
27# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
28
29# Turn off the matching of hostname to certificate ID
30SSL.Connection.clientPostConnectionCheck = None
31
32class fedd_server(ThreadingSSLServer):
33    """
34    Interface the fedd services to the XMLRPC and SOAP interfaces
35    """
36    def __init__(self, ME, handler, ssl_ctx, impl):
37        """
38        Create an SSL server that handles the transport in handler using the
39        credentials in ssl_ctx, and interfacing to the implementation of fedd
40        services in fedd.  ME is the host port pair on which to bind.
41        """
42        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
43        self.impl = impl
44        self.soap_methods = impl.get_soap_services()
45        self.xmlrpc_methods = impl.get_xmlrpc_services()
46
47class fedd_soap_handler(BaseHTTPRequestHandler):
48    """
49    Standard connection between SOAP and the fedd services in impl.
50
51    Much of this is boilerplate from
52    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
53    """
54    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
55
56    def send_xml(self, text, code=200):
57        """Send an XML document as reply"""
58        self.send_response(code)
59        self.send_header('Content-type', 'text/xml; charset="utf-8"')
60        self.send_header('Content-Length', str(len(text)))
61        self.end_headers()
62        self.wfile.write(text)
63        self.wfile.flush()
64
65    def send_fault(self, f, code=500):
66        """Send a SOAP encoded fault as reply"""
67        self.send_xml(f.AsSOAP(processContents="lax"), code)
68
69    def check_headers(self, ps):
70        """Send a fault for any required envelope headers"""
71        for (uri, localname) in ps.WhatMustIUnderstand():
72            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
73            return False
74        return  True
75
76    def check_method(self, ps):
77        """Confirm that this class implements the namespace and SOAP method"""
78        root = ps.body_root
79        if root.namespaceURI not in self.server.impl.soap_namespaces:
80            self.send_fault(Fault(Fault.Client, 
81                'Unknown namespace "%s"' % root.namespaceURI))
82            return False
83
84        if getattr(root, 'localName', None) == None:
85            self.send_fault(Fault(Fault.Client, 'No method"'))
86            return False
87        return True
88
89    def do_POST(self):
90        """Treat an HTTP POST request as a SOAP service call"""
91        try:
92            cl = int(self.headers['content-length'])
93            data = self.rfile.read(cl)
94            ps = ParsedSoap(data)
95        except ParseException, e:
96            self.send_fault(Fault(Fault.Client, str(e)))
97            return
98        except Exception, e:
99            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
100            return
101        if not self.check_headers(ps): return
102        if not self.check_method(ps): return
103        try:
104            resp = self.soap_dispatch(ps.body_root.localName, ps,
105                    fedid(cert=self.request.get_peer_cert()))
106        except Fault, f:
107            self.send_fault(f)
108            resp = None
109       
110        if resp != None:
111            sw = SoapWriter()
112            sw.serialize(resp)
113            self.send_xml(str(sw))
114
115    def soap_dispatch(self, method, req, fid):
116        """
117        The connection to the implementation, using the  method maps
118
119        The implementation provides a mapping from SOAP method name to the
120        method in the implementation that provides the service.
121        """
122        if self.server.soap_methods.has_key(method):
123            try:
124                return self.server.soap_methods[method](req, fid)
125            except service_error, e:
126                de = ns0.faultType_Def(
127                        (ns0.faultType_Def.schema,
128                            "FeddFaultBody")).pyclass()
129                de._code=e.code
130                de._errstr=e.code_string()
131                de._desc=e.desc
132                if  e.is_server_error():
133                    raise Fault(Fault.Server, e.code_string(), detail=de)
134                else:
135                    raise Fault(Fault.Client, e.code_string(), detail=de)
136        else:
137            raise Fault(Fault.Client, "Unknown method: %s" % method)
138
139
140class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
141    """
142    Standard connection between XMLRPC and the fedd services in impl.
143
144    Much of this is boilerplate from
145    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
146    """
147    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
148
149    def send_xml(self, text, code=200):
150        """Send an XML document as reply"""
151        self.send_response(code)
152        self.send_header('Content-type', 'text/xml; charset="utf-8"')
153        self.send_header('Content-Length', str(len(text)))
154        self.end_headers()
155        self.wfile.write(text)
156        self.wfile.flush()
157
158    def do_POST(self):
159        """Treat an HTTP POST request as an XMLRPC service call"""
160
161        resp = None
162        data = None
163        method = None
164        cl = int(self.headers['content-length'])
165        data = self.rfile.read(cl)
166
167        try:
168            params, method = xmlrpclib.loads(data)
169        except xmlrpclib.ResponseError:
170            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
171                "Malformed request"), methodresponse=True)
172       
173        if method != None:
174            try:
175                resp = self.xmlrpc_dispatch(method, params,
176                            fedid(cert=self.request.get_peer_cert()))
177                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
178                        methodresponse=True)
179            except xmlrpclib.Fault, f:
180                data = xmlrpclib.dumps(f, methodresponse=True)
181                resp = None
182        self.send_xml(data)
183
184
185    def xmlrpc_dispatch(self, method, req, fid):
186        """
187        The connection to the implementation, using the  method maps
188
189        The implementation provides a mapping from XMLRPC method name to the
190        method in the implementation that provides the service.
191        """
192        if self.server.xmlrpc_methods.has_key(method):
193            try:
194                return self.server.xmlrpc_methods[method](req, fid)
195            except service_error, e:
196                raise xmlrpclib.Fault(e.code_string(), e.desc)
197        else:
198            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
199
200class fedd_opts(OptionParser):
201    """Encapsulate option processing in this class, rather than in main"""
202    def __init__(self):
203        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
204                version="0.1")
205
206        self.set_defaults(host="localhost", port=23235, transport="soap",
207                logfile=None, debug=0)
208
209        self.add_option("-d", "--debug", action="count", dest="debug", 
210                help="Set debug.  Repeat for more information")
211        self.add_option("-f", "--configfile", action="store",
212                dest="configfile", help="Configuration file (required)")
213        self.add_option("-H", "--host", action="store", type="string",
214                dest="host", help="Hostname to listen on (default %default)")
215        self.add_option("-l", "--logfile", action="store", dest="logfile", 
216                help="File to send log messages to")
217        self.add_option("-p", "--port", action="store", type="int",
218                dest="port", help="Port to listen on (default %default)")
219        self.add_option("-s", "--service", action="append", type="string",
220                dest="services",
221                help="Service description: host:port:transport")
222        self.add_option("-x","--transport", action="store", type="choice",
223                choices=("xmlrpc", "soap"),
224                help="Transport for request (xmlrpc|soap) (Default: %default)")
225        self.add_option("--trace", action="store_const", dest="tracefile", 
226                const=sys.stderr, help="Print SOAP exchange to stderr")
227
228servers_active = True       # Sub-servers run while this is True
229services = [ ]              # Service descriptions
230servers = [ ]               # fedd_server instances instantiated from services
231servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
232
233def shutdown(sig, frame):
234    """
235    On a signal, stop running sub-servers. 
236   
237    This is connected to signals below
238    """
239    global servers_active, flog
240    servers_active = False
241    flog.info("Received signal %d, shutting down" % sig);
242
243def run_server(s):
244    """
245    Operate a subserver, shutting down when servers_active is false.
246
247    Each server (that is host/port/transport triple) has a thread running this
248    function, so each can handle requests independently.  They all call in to
249    the same implementation, which must manage its own synchronization.
250    """
251    global servers_active   # Not strictly needed: servers_active is only read
252    global servers          # List of active servers
253    global servers_lock     # Lock to manipulate servers
254
255    while servers_active:
256        i, o, e = select((s,), (), (), 5.0)
257        if s in i: s.handle_request()
258
259    # Done.  Remove us from the list
260    servers_lock.acquire()
261    servers.remove(s)
262    servers_lock.release()
263
264opts, args = fedd_opts().parse_args()
265
266# Logging setup
267flog = logging.getLogger("fedd")
268ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
269        '%d %b %y %H:%M:%S')
270
271if opts.logfile: fh = logging.FileHandler(opts.logfile)
272else: fh = logging.StreamHandler(sys.stdout)
273
274# The handler will print anything, setting the logger level will affect what
275# gets recorded.
276fh.setLevel(logging.DEBUG)
277
278if opts.debug: flog.setLevel(logging.DEBUG)
279else: flog.setLevel(logging.INFO)
280
281fh.setFormatter(ffmt)
282flog.addHandler(fh)
283
284# Initialize the implementation
285if opts.configfile != None: 
286    try:
287        impl = new_feddservice(opts.configfile)
288    except RuntimeError, e:
289        str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
290                "No message"
291        sys.exit("Error configuring fedd: %s" % str)
292else: 
293    sys.exit("--configfile is required")
294
295if impl.cert_file == None:
296    sys.exit("Must supply certificate file (probably in config)")
297
298# Create the SSL credentials
299ctx = None
300while ctx == None:
301    try:
302        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
303                password=impl.cert_pwd)
304    except SSL.SSLError, e:
305        if str(e) != "bad decrypt" or impl.cert_pwd != None:
306            raise
307
308# Walk through the service descriptions and pack them into the services list.
309# That list has the form (transport (host, port)).
310if opts.services:
311    for s in opts.services:
312        h, p, t  = s.split(':')
313
314        if not h: h = opts.host
315        if not p: p = opts.port
316        if not t: h = opts.transport
317
318        p = int(p)
319
320        services.append((t, (h, p)))
321else:
322    services.append((opts.transport, (opts.host, opts.port)))
323
324# Create the servers and put them into a list
325for s in services:
326    if s[0] == "soap":
327        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
328    elif s[0] == "xmlrpc":
329        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
330    else: flog.warning("Unknown transport: %s" % s[0])
331
332#  Make sure that there are no malformed servers in the list
333services = [ s for s in services if s ]
334
335# Catch signals
336signal(SIGINT, shutdown)
337signal(SIGTERM, shutdown)
338
339# Start the servers
340for s in servers:
341    Thread(target=run_server, args=(s,)).start()
342
343# Main thread waits for signals
344while servers_active:
345    pause()
346
347#Once shutdown starts wait for all the servers to terminate.
348while True:
349    servers_lock.acquire()
350    if len(servers) == 0: 
351        servers_lock.release()
352        flog.info("All servers exited.  Terminating")
353        sys.exit(0)
354    servers_lock.release()
355    sleep(1)
356
Note: See TracBrowser for help on using the repository browser.