source: fedd/fedd.py @ f4f4117

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f4f4117 was f4f4117, checked in by Ted Faber <faber@…>, 16 years ago

add remote splitter interface

  • Property mode set to 100755
File size: 12.6 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from BaseHTTPServer import BaseHTTPRequestHandler
6
7from ZSI import Fault, ParseException, FaultFromNotUnderstood, \
8    FaultFromZSIException, FaultFromException, ParsedSoap, SoapWriter
9
10from M2Crypto import SSL
11from M2Crypto.SSL.SSLServer import ThreadingSSLServer
12import xmlrpclib
13
14from optparse import OptionParser
15
16from fedd_util import fedd_ssl_context, fedid
17from fedd_deter_impl import new_feddservice
18from fedd_services import ns0
19from service_error import *
20
21from threading import *
22from signal import signal, pause, SIGINT, SIGTERM
23from select import select, error
24from time import sleep
25import logging
26from ConfigParser import *
27
28# The SSL server here is based on the implementation described at
29# http://www.xml.com/pub/a/ws/2004/01/20/salz.html
30
31# Turn off the matching of hostname to certificate ID
32SSL.Connection.clientPostConnectionCheck = None
33
34class fedd_config_parser(SafeConfigParser):
35    """
36    A SafeConfig parser with a more forgiving get attribute
37    """
38
39    def safe_get(self, sect, opt, method, default=None):
40        """
41        If the option is present, return it, otherwise the default.
42        """
43        if self.has_option(sect, opt): return method(self, sect, opt)
44        else: return default
45
46    def get(self, sect, opt, default=None):
47        """
48        This returns the requested option or a given default.
49
50        It's more like getattr than get.
51        """
52
53        return self.safe_get(sect, opt, SafeConfigParser.get, default)
54
55    def getint(self, sect, opt, default=0):
56        """
57        Returns the selected option as an int or a default.
58        """
59
60        return self.safe_get(sect, opt, SafeConfigParser.getint, default)
61
62    def getfloat(self, sect, opt, default=0.0):
63        """
64        Returns the selected option as an int or a default.
65        """
66
67        return self.safe_get(sect, opt, SafeConfigParser.getfloat, default)
68
69    def getboolean(self, sect, opt, default=False):
70        """
71        Returns the selected option as a boolean or a default.
72        """
73
74        return self.safe_get(sect, opt, SafeConfigParser.getboolean, default)
75
76class fedd_server(ThreadingSSLServer):
77    """
78    Interface the fedd services to the XMLRPC and SOAP interfaces
79    """
80    def __init__(self, ME, handler, ssl_ctx, impl):
81        """
82        Create an SSL server that handles the transport in handler using the
83        credentials in ssl_ctx, and interfacing to the implementation of fedd
84        services in fedd.  ME is the host port pair on which to bind.
85        """
86        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
87        self.impl = impl
88        self.soap_methods = impl.soap_services
89        self.xmlrpc_methods = impl.xmlrpc_services
90        self.log = logging.getLogger("fedd")
91
92class fedd_soap_handler(BaseHTTPRequestHandler):
93    """
94    Standard connection between SOAP and the fedd services in impl.
95
96    Much of this is boilerplate from
97    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
98    """
99    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
100
101    def send_xml(self, text, code=200):
102        """Send an XML document as reply"""
103        self.send_response(code)
104        self.send_header('Content-type', 'text/xml; charset="utf-8"')
105        self.send_header('Content-Length', str(len(text)))
106        self.end_headers()
107        self.wfile.write(text)
108        self.wfile.flush()
109
110    def send_fault(self, f, code=500):
111        """Send a SOAP encoded fault as reply"""
112        self.send_xml(f.AsSOAP(processContents="lax"), code)
113
114    def check_headers(self, ps):
115        """Send a fault for any required envelope headers"""
116        for (uri, localname) in ps.WhatMustIUnderstand():
117            self.send_fault(FaultFromNotUnderstood(uri, lname, 'fedd'))
118            return False
119        return  True
120
121    def check_method(self, ps):
122        """Confirm that this class implements the namespace and SOAP method"""
123        root = ps.body_root
124        if root.namespaceURI not in self.server.impl.soap_namespaces:
125            self.send_fault(Fault(Fault.Client, 
126                'Unknown namespace "%s"' % root.namespaceURI))
127            return False
128
129        if getattr(root, 'localName', None) == None:
130            self.send_fault(Fault(Fault.Client, 'No method"'))
131            return False
132        return True
133
134    def do_POST(self):
135        """Treat an HTTP POST request as a SOAP service call"""
136        try:
137            cl = int(self.headers['content-length'])
138            data = self.rfile.read(cl)
139            ps = ParsedSoap(data)
140        except ParseException, e:
141            self.send_fault(Fault(Fault.Client, str(e)))
142            return
143        except Exception, e:
144            self.send_fault(FaultFromException(e, 0, sys.exc_info()[2]))
145            return
146        if not self.check_headers(ps): return
147        if not self.check_method(ps): return
148        try:
149            resp = self.soap_dispatch(ps.body_root.localName, ps,
150                    fedid(cert=self.request.get_peer_cert()))
151        except Fault, f:
152            self.send_fault(f)
153            resp = None
154       
155        if resp != None:
156            sw = SoapWriter()
157            sw.serialize(resp)
158            self.send_xml(str(sw))
159
160    def log_request(self, code=0, size=0):
161        """
162        Log request to the fedd logger
163        """
164        self.server.log.info("Successful SOAP request code %d" % code)
165
166    def soap_dispatch(self, method, req, fid):
167        """
168        The connection to the implementation, using the  method maps
169
170        The implementation provides a mapping from SOAP method name to the
171        method in the implementation that provides the service.
172        """
173        if self.server.soap_methods.has_key(method):
174            try:
175                return self.server.soap_methods[method](req, fid)
176            except service_error, e:
177                de = ns0.faultType_Def(
178                        (ns0.faultType_Def.schema,
179                            "FeddFaultBody")).pyclass()
180                de._code=e.code
181                de._errstr=e.code_string()
182                de._desc=e.desc
183                if  e.is_server_error():
184                    raise Fault(Fault.Server, e.code_string(), detail=de)
185                else:
186                    raise Fault(Fault.Client, e.code_string(), detail=de)
187        else:
188            raise Fault(Fault.Client, "Unknown method: %s" % method)
189
190
191class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
192    """
193    Standard connection between XMLRPC and the fedd services in impl.
194
195    Much of this is boilerplate from
196    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
197    """
198    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
199
200    def send_xml(self, text, code=200):
201        """Send an XML document as reply"""
202        self.send_response(code)
203        self.send_header('Content-type', 'text/xml; charset="utf-8"')
204        self.send_header('Content-Length', str(len(text)))
205        self.end_headers()
206        self.wfile.write(text)
207        self.wfile.flush()
208
209    def do_POST(self):
210        """Treat an HTTP POST request as an XMLRPC service call"""
211
212        resp = None
213        data = None
214        method = None
215        cl = int(self.headers['content-length'])
216        data = self.rfile.read(cl)
217
218        try:
219            params, method = xmlrpclib.loads(data)
220        except xmlrpclib.ResponseError:
221            data = xmlrpclib.dumps(xmlrpclib.Fault("Client", 
222                "Malformed request"), methodresponse=True)
223       
224        if method != None:
225            try:
226                resp = self.xmlrpc_dispatch(method, params,
227                            fedid(cert=self.request.get_peer_cert()))
228                data = xmlrpclib.dumps((resp,), encoding='UTF-8', 
229                        methodresponse=True)
230            except xmlrpclib.Fault, f:
231                data = xmlrpclib.dumps(f, methodresponse=True)
232                resp = None
233        self.send_xml(data)
234
235    def log_request(self, code=0, size=0):
236        """
237        Log request to the fedd logger
238        """
239        self.server.log.info("Successful XMLRPC request code %d" % code)
240
241
242    def xmlrpc_dispatch(self, method, req, fid):
243        """
244        The connection to the implementation, using the  method maps
245
246        The implementation provides a mapping from XMLRPC method name to the
247        method in the implementation that provides the service.
248        """
249        if self.server.xmlrpc_methods.has_key(method):
250            try:
251                return self.server.xmlrpc_methods[method](req, fid)
252            except service_error, e:
253                raise xmlrpclib.Fault(e.code_string(), e.desc)
254        else:
255            raise xmlrpclib.Fault(100, "Unknown method: %s" % method)
256
257class fedd_opts(OptionParser):
258    """Encapsulate option processing in this class, rather than in main"""
259    def __init__(self):
260        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
261                version="0.1")
262
263        self.set_defaults(host="localhost", port=23235, transport="soap",
264                logfile=None, debug=0)
265
266        self.add_option("-d", "--debug", action="count", dest="debug", 
267                help="Set debug.  Repeat for more information")
268        self.add_option("-f", "--configfile", action="store",
269                dest="configfile", help="Configuration file (required)")
270        self.add_option("-H", "--host", action="store", type="string",
271                dest="host", help="Hostname to listen on (default %default)")
272        self.add_option("-l", "--logfile", action="store", dest="logfile", 
273                help="File to send log messages to")
274        self.add_option("-p", "--port", action="store", type="int",
275                dest="port", help="Port to listen on (default %default)")
276        self.add_option("-s", "--service", action="append", type="string",
277                dest="services",
278                help="Service description: host:port:transport")
279        self.add_option("-x","--transport", action="store", type="choice",
280                choices=("xmlrpc", "soap"),
281                help="Transport for request (xmlrpc|soap) (Default: %default)")
282        self.add_option("--trace", action="store_const", dest="tracefile", 
283                const=sys.stderr, help="Print SOAP exchange to stderr")
284
285servers_active = True       # Sub-servers run while this is True
286services = [ ]              # Service descriptions
287servers = [ ]               # fedd_server instances instantiated from services
288servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
289
290def shutdown(sig, frame):
291    """
292    On a signal, stop running sub-servers. 
293   
294    This is connected to signals below
295    """
296    global servers_active, flog
297
298    servers_active = False
299    flog.info("Received signal %d, shutting down" % sig);
300
301def run_server(s):
302    """
303    Operate a subserver, shutting down when servers_active is false.
304
305    Each server (that is host/port/transport triple) has a thread running this
306    function, so each can handle requests independently.  They all call in to
307    the same implementation, which must manage its own synchronization.
308    """
309    global servers_active   # Not strictly needed: servers_active is only read
310    global servers          # List of active servers
311    global servers_lock     # Lock to manipulate servers
312
313    while servers_active:
314        try:
315            i, o, e = select((s,), (), (), 1.0)
316            if s in i: s.handle_request()
317        except error:
318            # The select call seems to get interrupted by signals as well as
319            # the main thread.  This essentially ignores signals in this
320            # thread.
321            pass
322
323    # Done.  Remove us from the list
324    servers_lock.acquire()
325    servers.remove(s)
326    servers_lock.release()
327
328opts, args = fedd_opts().parse_args()
329
330# Logging setup
331flog = logging.getLogger("fedd")
332ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
333        '%d %b %y %H:%M:%S')
334
335if opts.logfile: fh = logging.FileHandler(opts.logfile)
336else: fh = logging.StreamHandler(sys.stdout)
337
338# The handler will print anything, setting the logger level will affect what
339# gets recorded.
340fh.setLevel(logging.DEBUG)
341
342if opts.debug: flog.setLevel(logging.DEBUG)
343else: flog.setLevel(logging.INFO)
344
345fh.setFormatter(ffmt)
346flog.addHandler(fh)
347
348
349
350# Initialize the implementation
351if opts.configfile != None: 
352    try:
353        config= fedd_config_parser()
354        config.read(opts.configfile)
355    except e:
356        sys.exit("Cannot parse confgi file: %s" % e)
357else: 
358    sys.exit("--configfile is required")
359
360try:
361    impl = new_feddservice(config)
362except RuntimeError, e:
363    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
364            "No message"
365    sys.exit("Error configuring fedd: %s" % str)
366
367if impl.cert_file == None:
368    sys.exit("Must supply certificate file (probably in config)")
369
370# Create the SSL credentials
371ctx = None
372while ctx == None:
373    try:
374        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
375                password=impl.cert_pwd)
376    except SSL.SSLError, e:
377        if str(e) != "bad decrypt" or impl.cert_pwd != None:
378            raise
379
380# Walk through the service descriptions and pack them into the services list.
381# That list has the form (transport (host, port)).
382if opts.services:
383    for s in opts.services:
384        h, p, t  = s.split(':')
385
386        if not h: h = opts.host
387        if not p: p = opts.port
388        if not t: h = opts.transport
389
390        p = int(p)
391
392        services.append((t, (h, p)))
393else:
394    services.append((opts.transport, (opts.host, opts.port)))
395
396# Create the servers and put them into a list
397for s in services:
398    if s[0] == "soap":
399        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
400    elif s[0] == "xmlrpc":
401        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
402    else: flog.warning("Unknown transport: %s" % s[0])
403
404#  Make sure that there are no malformed servers in the list
405services = [ s for s in services if s ]
406
407# Catch signals
408signal(SIGINT, shutdown)
409signal(SIGTERM, shutdown)
410
411# Start the servers
412for s in servers:
413    Thread(target=run_server, args=(s,)).start()
414
415# Main thread waits for signals
416while servers_active:
417    sleep(1.0)
418
419#Once shutdown starts wait for all the servers to terminate.
420while True:
421    servers_lock.acquire()
422    if len(servers) == 0: 
423        servers_lock.release()
424        flog.info("All servers exited.  Terminating")
425        sys.exit(0)
426    servers_lock.release()
427    sleep(1)
428
Note: See TracBrowser for help on using the repository browser.