source: fedd/fedd.py @ 4700b3b

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4700b3b was d199ced, checked in by Ted Faber <faber@…>, 16 years ago

upgrade

  • Property mode set to 100755
File size: 11.4 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from service_error import *
19
20from threading import *
21from signal import signal, pause, SIGINT, SIGTERM
22from select import select, error
23from time import sleep
24import logging
25
26# The SSL server here is based on the implementation described at
27# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
28
29# Turn off the matching of hostname to certificate ID
30SSL.Connection.clientPostConnectionCheck = None
31
32class fedd_server(ThreadingSSLServer):
33    """
34    Interface the fedd services to the XMLRPC and SOAP interfaces
35    """
36    def __init__(self, ME, handler, ssl_ctx, impl):
37        """
38        Create an SSL server that handles the transport in handler using the
39        credentials in ssl_ctx, and interfacing to the implementation of fedd
40        services in fedd.  ME is the host port pair on which to bind.
41        """
42        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
43        self.impl = impl
44        self.soap_methods = impl.soap_services
45        self.xmlrpc_methods = impl.xmlrpc_services
46        self.log = logging.getLogger("fedd")
47
48class fedd_soap_handler(BaseHTTPRequestHandler):
49    """
50    Standard connection between SOAP and the fedd services in impl.
51
52    Much of this is boilerplate from
53    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
54    """
55    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
56
57    def send_xml(self, text, code=200):
58        """Send an XML document as reply"""
59        self.send_response(code)
60        self.send_header('Content-type', 'text/xml; charset="utf-8"')
61        self.send_header('Content-Length', str(len(text)))
62        self.end_headers()
63        self.wfile.write(text)
64        self.wfile.flush()
65
66    def send_fault(self, f, code=500):
67        """Send a SOAP encoded fault as reply"""
68        self.send_xml(f.AsSOAP(processContents="lax"), code)
69
70    def check_headers(self, ps):
71        """Send a fault for any required envelope headers"""
72        for (uri, localname) in ps.WhatMustIUnderstand():
73            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
74            return False
75        return  True
76
77    def check_method(self, ps):
78        """Confirm that this class implements the namespace and SOAP method"""
79        root = ps.body_root
80        if root.namespaceURI not in self.server.impl.soap_namespaces:
81            self.send_fault(Fault(Fault.Client, 
82                'Unknown namespace "%s"' % root.namespaceURI))
83            return False
84
85        if getattr(root, 'localName', None) == None:
86            self.send_fault(Fault(Fault.Client, 'No method"'))
87            return False
88        return True
89
90    def do_POST(self):
91        """Treat an HTTP POST request as a SOAP service call"""
92        try:
93            cl = int(self.headers['content-length'])
94            data = self.rfile.read(cl)
95            ps = ParsedSoap(data)
96        except ParseException, e:
97            self.send_fault(Fault(Fault.Client, str(e)))
98            return
99        except Exception, e:
100            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
101            return
102        if not self.check_headers(ps): return
103        if not self.check_method(ps): return
104        try:
105            resp = self.soap_dispatch(ps.body_root.localName, ps,
106                    fedid(cert=self.request.get_peer_cert()))
107        except Fault, f:
108            self.send_fault(f)
109            resp = None
110       
111        if resp != None:
112            sw = SoapWriter()
113            sw.serialize(resp)
114            self.send_xml(str(sw))
115
116    def log_request(self, code=0, size=0):
117        """
118        Log request to the fedd logger
119        """
120        self.server.log.info("Successful SOAP request code %d" % code)
121
122    def soap_dispatch(self, method, req, fid):
123        """
124        The connection to the implementation, using the  method maps
125
126        The implementation provides a mapping from SOAP method name to the
127        method in the implementation that provides the service.
128        """
129        if self.server.soap_methods.has_key(method):
130            try:
131                return self.server.soap_methods[method](req, fid)
132            except service_error, e:
133                de = ns0.faultType_Def(
134                        (ns0.faultType_Def.schema,
135                            "FeddFaultBody")).pyclass()
136                de._code=e.code
137                de._errstr=e.code_string()
138                de._desc=e.desc
139                if  e.is_server_error():
140                    raise Fault(Fault.Server, e.code_string(), detail=de)
141                else:
142                    raise Fault(Fault.Client, e.code_string(), detail=de)
143        else:
144            raise Fault(Fault.Client, "Unknown method: %s" % method)
145
146
147class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
148    """
149    Standard connection between XMLRPC and the fedd services in impl.
150
151    Much of this is boilerplate from
152    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
153    """
154    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
155
156    def send_xml(self, text, code=200):
157        """Send an XML document as reply"""
158        self.send_response(code)
159        self.send_header('Content-type', 'text/xml; charset="utf-8"')
160        self.send_header('Content-Length', str(len(text)))
161        self.end_headers()
162        self.wfile.write(text)
163        self.wfile.flush()
164
165    def do_POST(self):
166        """Treat an HTTP POST request as an XMLRPC service call"""
167
168        resp = None
169        data = None
170        method = None
171        cl = int(self.headers['content-length'])
172        data = self.rfile.read(cl)
173
174        try:
175            params, method = xmlrpclib.loads(data)
176        except xmlrpclib.ResponseError:
177            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
178                "Malformed request"), methodresponse=True)
179       
180        if method != None:
181            try:
182                resp = self.xmlrpc_dispatch(method, params,
183                            fedid(cert=self.request.get_peer_cert()))
184                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
185                        methodresponse=True)
186            except xmlrpclib.Fault, f:
187                data = xmlrpclib.dumps(f, methodresponse=True)
188                resp = None
189        self.send_xml(data)
190
191    def log_request(self, code=0, size=0):
192        """
193        Log request to the fedd logger
194        """
195        self.server.log.info("Successful XMLRPC request code %d" % code)
196
197
198    def xmlrpc_dispatch(self, method, req, fid):
199        """
200        The connection to the implementation, using the  method maps
201
202        The implementation provides a mapping from XMLRPC method name to the
203        method in the implementation that provides the service.
204        """
205        if self.server.xmlrpc_methods.has_key(method):
206            try:
207                return self.server.xmlrpc_methods[method](req, fid)
208            except service_error, e:
209                raise xmlrpclib.Fault(e.code_string(), e.desc)
210        else:
211            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
212
213class fedd_opts(OptionParser):
214    """Encapsulate option processing in this class, rather than in main"""
215    def __init__(self):
216        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
217                version="0.1")
218
219        self.set_defaults(host="localhost", port=23235, transport="soap",
220                logfile=None, debug=0)
221
222        self.add_option("-d", "--debug", action="count", dest="debug", 
223                help="Set debug.  Repeat for more information")
224        self.add_option("-f", "--configfile", action="store",
225                dest="configfile", help="Configuration file (required)")
226        self.add_option("-H", "--host", action="store", type="string",
227                dest="host", help="Hostname to listen on (default %default)")
228        self.add_option("-l", "--logfile", action="store", dest="logfile", 
229                help="File to send log messages to")
230        self.add_option("-p", "--port", action="store", type="int",
231                dest="port", help="Port to listen on (default %default)")
232        self.add_option("-s", "--service", action="append", type="string",
233                dest="services",
234                help="Service description: host:port:transport")
235        self.add_option("-x","--transport", action="store", type="choice",
236                choices=("xmlrpc", "soap"),
237                help="Transport for request (xmlrpc|soap) (Default: %default)")
238        self.add_option("--trace", action="store_const", dest="tracefile", 
239                const=sys.stderr, help="Print SOAP exchange to stderr")
240
241servers_active = True       # Sub-servers run while this is True
242services = [ ]              # Service descriptions
243servers = [ ]               # fedd_server instances instantiated from services
244servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
245
246def shutdown(sig, frame):
247    """
248    On a signal, stop running sub-servers. 
249   
250    This is connected to signals below
251    """
252    global servers_active, flog
253
254    servers_active = False
255    flog.info("Received signal %d, shutting down" % sig);
256
257def run_server(s):
258    """
259    Operate a subserver, shutting down when servers_active is false.
260
261    Each server (that is host/port/transport triple) has a thread running this
262    function, so each can handle requests independently.  They all call in to
263    the same implementation, which must manage its own synchronization.
264    """
265    global servers_active   # Not strictly needed: servers_active is only read
266    global servers          # List of active servers
267    global servers_lock     # Lock to manipulate servers
268
269    while servers_active:
270        try:
271            i, o, e = select((s,), (), (), 1.0)
272            if s in i: s.handle_request()
273        except error:
274            # The select call seems to get interrupted by signals as well as
275            # the main thread.  This essentially ignores signals in this
276            # thread.
277            pass
278
279    # Done.  Remove us from the list
280    servers_lock.acquire()
281    servers.remove(s)
282    servers_lock.release()
283
284opts, args = fedd_opts().parse_args()
285
286# Logging setup
287flog = logging.getLogger("fedd")
288ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
289        '%d %b %y %H:%M:%S')
290
291if opts.logfile: fh = logging.FileHandler(opts.logfile)
292else: fh = logging.StreamHandler(sys.stdout)
293
294# The handler will print anything, setting the logger level will affect what
295# gets recorded.
296fh.setLevel(logging.DEBUG)
297
298if opts.debug: flog.setLevel(logging.DEBUG)
299else: flog.setLevel(logging.INFO)
300
301fh.setFormatter(ffmt)
302flog.addHandler(fh)
303
304# Initialize the implementation
305if opts.configfile != None: 
306    try:
307        impl = new_feddservice(opts.configfile)
308    except RuntimeError, e:
309        str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
310                "No message"
311        sys.exit("Error configuring fedd: %s" % str)
312else: 
313    sys.exit("--configfile is required")
314
315if impl.cert_file == None:
316    sys.exit("Must supply certificate file (probably in config)")
317
318# Create the SSL credentials
319ctx = None
320while ctx == None:
321    try:
322        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
323                password=impl.cert_pwd)
324    except SSL.SSLError, e:
325        if str(e) != "bad decrypt" or impl.cert_pwd != None:
326            raise
327
328# Walk through the service descriptions and pack them into the services list.
329# That list has the form (transport (host, port)).
330if opts.services:
331    for s in opts.services:
332        h, p, t  = s.split(':')
333
334        if not h: h = opts.host
335        if not p: p = opts.port
336        if not t: h = opts.transport
337
338        p = int(p)
339
340        services.append((t, (h, p)))
341else:
342    services.append((opts.transport, (opts.host, opts.port)))
343
344# Create the servers and put them into a list
345for s in services:
346    if s[0] == "soap":
347        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
348    elif s[0] == "xmlrpc":
349        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
350    else: flog.warning("Unknown transport: %s" % s[0])
351
352#  Make sure that there are no malformed servers in the list
353services = [ s for s in services if s ]
354
355# Catch signals
356signal(SIGINT, shutdown)
357signal(SIGTERM, shutdown)
358
359# Start the servers
360for s in servers:
361    Thread(target=run_server, args=(s,)).start()
362
363# Main thread waits for signals
364while servers_active:
365    sleep(1.0)
366
367#Once shutdown starts wait for all the servers to terminate.
368while True:
369    servers_lock.acquire()
370    if len(servers) == 0: 
371        servers_lock.release()
372        flog.info("All servers exited.  Terminating")
373        sys.exit(0)
374    servers_lock.release()
375    sleep(1)
376
Note: See TracBrowser for help on using the repository browser.