source: fedd/fedd.py @ 49e66b4

axis_examplecompt_changesinfo-ops
Last change on this file since 49e66b4 was 50ef6e4, checked in by Ted Faber <faber@…>, 15 years ago

Thread safety at last??

  • Property mode set to 100755
File size: 5.7 KB
RevLine 
[6ff0b91]1#!/usr/local/bin/python
2
[2729e48]3import os,sys
[6ff0b91]4
5from optparse import OptionParser
6
[5d3f239]7from federation import config_parser
8from federation.server import server, xmlrpc_handler, soap_handler
9from federation.util import fedd_ssl_context
10from federation.deter_impl import new_feddservice
[6ff0b91]11
[e087a7a]12from socket import error as socket_error
[6a0c9f4]13from threading import Lock, Thread
[11a08b0]14from signal import signal, pause, SIGINT, SIGTERM
[d199ced]15from select import select, error
[0ea11af]16from time import sleep
[50ef6e4]17
[11a08b0]18import logging
[50ef6e4]19import M2Crypto
[19cc408]20
[6ff0b91]21class fedd_opts(OptionParser):
22    """Encapsulate option processing in this class, rather than in main"""
23    def __init__(self):
24        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
25                version="0.1")
26
[a2da110]27        self.set_defaults(logfile=None, debug=0)
[6ff0b91]28
29        self.add_option("-d", "--debug", action="count", dest="debug", 
30                help="Set debug.  Repeat for more information")
31        self.add_option("-f", "--configfile", action="store",
[a2da110]32                default="/usr/local/etc/fedd.conf",
[6ff0b91]33                dest="configfile", help="Configuration file (required)")
[11a08b0]34        self.add_option("-l", "--logfile", action="store", dest="logfile", 
35                help="File to send log messages to")
[6ff0b91]36        self.add_option("--trace", action="store_const", dest="tracefile", 
37                const=sys.stderr, help="Print SOAP exchange to stderr")
38
[0ea11af]39servers_active = True       # Sub-servers run while this is True
[ec4fb42]40servers = [ ]               # server instances instantiated from services
[0ea11af]41servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
[11a08b0]42
43def shutdown(sig, frame):
[0ea11af]44    """
45    On a signal, stop running sub-servers. 
46   
47    This is connected to signals below
48    """
[11a08b0]49    global servers_active, flog
[d199ced]50
[11a08b0]51    servers_active = False
52    flog.info("Received signal %d, shutting down" % sig);
53
[a97394b]54def run_server(s):
[0ea11af]55    """
56    Operate a subserver, shutting down when servers_active is false.
57
58    Each server (that is host/port/transport triple) has a thread running this
59    function, so each can handle requests independently.  They all call in to
60    the same implementation, which must manage its own synchronization.
61    """
[11a08b0]62    global servers_active   # Not strictly needed: servers_active is only read
[0ea11af]63    global servers          # List of active servers
64    global servers_lock     # Lock to manipulate servers
[11a08b0]65
[0ea11af]66    while servers_active:
[d199ced]67        try:
68            i, o, e = select((s,), (), (), 1.0)
69            if s in i: s.handle_request()
70        except error:
71            # The select call seems to get interrupted by signals as well as
72            # the main thread.  This essentially ignores signals in this
73            # thread.
74            pass
[a97394b]75
[0ea11af]76    # Done.  Remove us from the list
77    servers_lock.acquire()
78    servers.remove(s)
79    servers_lock.release()
[a97394b]80
[50ef6e4]81M2Crypto.threading.init()
[6ff0b91]82opts, args = fedd_opts().parse_args()
83
[0ea11af]84# Logging setup
[11a08b0]85flog = logging.getLogger("fedd")
[0ea11af]86ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
87        '%d %b %y %H:%M:%S')
[11a08b0]88
89if opts.logfile: fh = logging.FileHandler(opts.logfile)
90else: fh = logging.StreamHandler(sys.stdout)
91
92# The handler will print anything, setting the logger level will affect what
93# gets recorded.
94fh.setLevel(logging.DEBUG)
95
96if opts.debug: flog.setLevel(logging.DEBUG)
97else: flog.setLevel(logging.INFO)
98
99fh.setFormatter(ffmt)
100flog.addHandler(fh)
101
[72ed6e4]102
[f92db31]103if not os.access(opts.configfile, os.R_OK):
104    sys.exit("Can't read config file %s" % opts.configfile)
[72ed6e4]105
[0ea11af]106# Initialize the implementation
[f92db31]107try:
108    config= config_parser()
109    config.read(opts.configfile)
110except Exception, e:
111    sys.exit("Cannot parse config file %s: %s" % (opts.configfile, e))
[6ff0b91]112
[72ed6e4]113try:
114    impl = new_feddservice(config)
115except RuntimeError, e:
116    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
117            "No message"
118    sys.exit("Error configuring fedd: %s" % str)
119
[2729e48]120if impl.cert_file:
121    if not os.access(impl.cert_file, os.R_OK):
122        sys.exit("Cannot read certificate file: %s" % impl.cert_file)
123else:
[6ff0b91]124    sys.exit("Must supply certificate file (probably in config)")
125
[2729e48]126if impl.trusted_certs:
127    if not os.access(impl.trusted_certs, os.R_OK):
128        sys.exit("Cannot read trusted certificate file: %s" % \
129                impl.trusted_certs)
130
[0ea11af]131# Create the SSL credentials
[2106ed1]132ctx = None
133while ctx == None:
[6ff0b91]134    try:
135        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
136                password=impl.cert_pwd)
[2729e48]137    except Exception, e:
[2106ed1]138        if str(e) != "bad decrypt" or impl.cert_pwd != None:
[6ff0b91]139            raise
140
[a2da110]141services = config.get("globals", "services", "23235")
[a97394b]142
[a2da110]143for s in services.split(","):
144    s = s.strip()
145    colons = s.count(":")
146    try:
147        if colons == 0:
148            p = int(s)
149            h = ''
150            t = 'soap'
151        elif colons == 1:
152            p, t  = s.split(":")
153            p = int(p)
154            h = ''
155        elif colons == 2:
156            h, p, t  = s.split(":")
157            p = int(p)
158        else:
159            flog.error("Invalid service specification %s ignored." % s)
160            continue
161    except ValueError:
162        flog.error("Error converting port to integer in %s: spec ignored" % s)
163        continue
[a97394b]164
[a2da110]165    t = t.lower()
[46a0f7a]166    sdebug = (opts.debug > 0)
[a2da110]167    try:
168        if t == 'soap':
[46a0f7a]169            servers.append(server((h, p), soap_handler, ctx, impl, sdebug))
[a2da110]170        elif t == 'xmlrpc':
[46a0f7a]171            servers.append(server((h, p), xmlrpc_handler, ctx, impl, sdebug))
[a2da110]172        else:
173            flog.error("Invalid transport specification (%s) in service %s" % \
174                    (t, s))
175            continue
176    except socket_error, e:
177        flog.error("Cannot create server for %s: %s" % (s, e[1]))
178        continue
[11a08b0]179
[0ea11af]180#  Make sure that there are no malformed servers in the list
[a2da110]181servers = [ s for s in servers if s ]
[0ea11af]182
183# Catch signals
[11a08b0]184signal(SIGINT, shutdown)
185signal(SIGTERM, shutdown)
[a97394b]186
[0ea11af]187# Start the servers
[a97394b]188for s in servers:
[0ea11af]189    Thread(target=run_server, args=(s,)).start()
190
191# Main thread waits for signals
192while servers_active:
[d199ced]193    sleep(1.0)
[0ea11af]194
195#Once shutdown starts wait for all the servers to terminate.
196while True:
197    servers_lock.acquire()
198    if len(servers) == 0: 
199        servers_lock.release()
200        flog.info("All servers exited.  Terminating")
201        sys.exit(0)
202    servers_lock.release()
203    sleep(1)
[11a08b0]204
Note: See TracBrowser for help on using the repository browser.