source: fedd/fedd.py @ 9ecda47

axis_examplecompt_changesinfo-ops
Last change on this file since 9ecda47 was 50ef6e4, checked in by Ted Faber <faber@…>, 14 years ago

Thread safety at last??

  • Property mode set to 100755
File size: 5.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from optparse import OptionParser
6
7from federation import config_parser
8from federation.server import server, xmlrpc_handler, soap_handler
9from federation.util import fedd_ssl_context
10from federation.deter_impl import new_feddservice
11
12from socket import error as socket_error
13from threading import Lock, Thread
14from signal import signal, pause, SIGINT, SIGTERM
15from select import select, error
16from time import sleep
17
18import logging
19import M2Crypto
20
21class fedd_opts(OptionParser):
22    """Encapsulate option processing in this class, rather than in main"""
23    def __init__(self):
24        OptionParser.__init__(self, usage="%prog [opts] (--help for details)",
25                version="0.1")
26
27        self.set_defaults(logfile=None, debug=0)
28
29        self.add_option("-d", "--debug", action="count", dest="debug", 
30                help="Set debug.  Repeat for more information")
31        self.add_option("-f", "--configfile", action="store",
32                default="/usr/local/etc/fedd.conf",
33                dest="configfile", help="Configuration file (required)")
34        self.add_option("-l", "--logfile", action="store", dest="logfile", 
35                help="File to send log messages to")
36        self.add_option("--trace", action="store_const", dest="tracefile", 
37                const=sys.stderr, help="Print SOAP exchange to stderr")
38
39servers_active = True       # Sub-servers run while this is True
40servers = [ ]               # server instances instantiated from services
41servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
42
43def shutdown(sig, frame):
44    """
45    On a signal, stop running sub-servers. 
46   
47    This is connected to signals below
48    """
49    global servers_active, flog
50
51    servers_active = False
52    flog.info("Received signal %d, shutting down" % sig);
53
54def run_server(s):
55    """
56    Operate a subserver, shutting down when servers_active is false.
57
58    Each server (that is host/port/transport triple) has a thread running this
59    function, so each can handle requests independently.  They all call in to
60    the same implementation, which must manage its own synchronization.
61    """
62    global servers_active   # Not strictly needed: servers_active is only read
63    global servers          # List of active servers
64    global servers_lock     # Lock to manipulate servers
65
66    while servers_active:
67        try:
68            i, o, e = select((s,), (), (), 1.0)
69            if s in i: s.handle_request()
70        except error:
71            # The select call seems to get interrupted by signals as well as
72            # the main thread.  This essentially ignores signals in this
73            # thread.
74            pass
75
76    # Done.  Remove us from the list
77    servers_lock.acquire()
78    servers.remove(s)
79    servers_lock.release()
80
81M2Crypto.threading.init()
82opts, args = fedd_opts().parse_args()
83
84# Logging setup
85flog = logging.getLogger("fedd")
86ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
87        '%d %b %y %H:%M:%S')
88
89if opts.logfile: fh = logging.FileHandler(opts.logfile)
90else: fh = logging.StreamHandler(sys.stdout)
91
92# The handler will print anything, setting the logger level will affect what
93# gets recorded.
94fh.setLevel(logging.DEBUG)
95
96if opts.debug: flog.setLevel(logging.DEBUG)
97else: flog.setLevel(logging.INFO)
98
99fh.setFormatter(ffmt)
100flog.addHandler(fh)
101
102
103if not os.access(opts.configfile, os.R_OK):
104    sys.exit("Can't read config file %s" % opts.configfile)
105
106# Initialize the implementation
107try:
108    config= config_parser()
109    config.read(opts.configfile)
110except Exception, e:
111    sys.exit("Cannot parse config file %s: %s" % (opts.configfile, e))
112
113try:
114    impl = new_feddservice(config)
115except RuntimeError, e:
116    str = getattr(e, 'desc', None) or getattr(e,'message', None) or \
117            "No message"
118    sys.exit("Error configuring fedd: %s" % str)
119
120if impl.cert_file:
121    if not os.access(impl.cert_file, os.R_OK):
122        sys.exit("Cannot read certificate file: %s" % impl.cert_file)
123else:
124    sys.exit("Must supply certificate file (probably in config)")
125
126if impl.trusted_certs:
127    if not os.access(impl.trusted_certs, os.R_OK):
128        sys.exit("Cannot read trusted certificate file: %s" % \
129                impl.trusted_certs)
130
131# Create the SSL credentials
132ctx = None
133while ctx == None:
134    try:
135        ctx = fedd_ssl_context(impl.cert_file, impl.trusted_certs, 
136                password=impl.cert_pwd)
137    except Exception, e:
138        if str(e) != "bad decrypt" or impl.cert_pwd != None:
139            raise
140
141services = config.get("globals", "services", "23235")
142
143for s in services.split(","):
144    s = s.strip()
145    colons = s.count(":")
146    try:
147        if colons == 0:
148            p = int(s)
149            h = ''
150            t = 'soap'
151        elif colons == 1:
152            p, t  = s.split(":")
153            p = int(p)
154            h = ''
155        elif colons == 2:
156            h, p, t  = s.split(":")
157            p = int(p)
158        else:
159            flog.error("Invalid service specification %s ignored." % s)
160            continue
161    except ValueError:
162        flog.error("Error converting port to integer in %s: spec ignored" % s)
163        continue
164
165    t = t.lower()
166    sdebug = (opts.debug > 0)
167    try:
168        if t == 'soap':
169            servers.append(server((h, p), soap_handler, ctx, impl, sdebug))
170        elif t == 'xmlrpc':
171            servers.append(server((h, p), xmlrpc_handler, ctx, impl, sdebug))
172        else:
173            flog.error("Invalid transport specification (%s) in service %s" % \
174                    (t, s))
175            continue
176    except socket_error, e:
177        flog.error("Cannot create server for %s: %s" % (s, e[1]))
178        continue
179
180#  Make sure that there are no malformed servers in the list
181servers = [ s for s in servers if s ]
182
183# Catch signals
184signal(SIGINT, shutdown)
185signal(SIGTERM, shutdown)
186
187# Start the servers
188for s in servers:
189    Thread(target=run_server, args=(s,)).start()
190
191# Main thread waits for signals
192while servers_active:
193    sleep(1.0)
194
195#Once shutdown starts wait for all the servers to terminate.
196while True:
197    servers_lock.acquire()
198    if len(servers) == 0: 
199        servers_lock.release()
200        flog.info("All servers exited.  Terminating")
201        sys.exit(0)
202    servers_lock.release()
203    sleep(1)
204
Note: See TracBrowser for help on using the repository browser.