source: fedd/federation/util.py @ 86a7bb8

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 86a7bb8 was 632dd59, checked in by Ted Faber <faber@…>, 15 years ago

More unicode intolerance.

  • Property mode set to 100644
File size: 5.5 KB
RevLine 
[6ff0b91]1#!/usr/local/bin/python
2
[05191a6]3import re
4import string
[2c6128f]5import logging
[6ff0b91]6
[51cc9df]7from M2Crypto import SSL
8from fedid import fedid
[6ff0b91]9
[fe157b9]10
11# If this is an old enough version of M2Crypto.SSL that has an
12# ssl_verify_callback that doesn't allow 0-length signed certs, create a
13# version of that callback that does.  This is edited from the original in
14# M2Crypto.SSL.cb.  This version also elides the printing to stderr.
15if not getattr(SSL.cb, 'ssl_verify_callback_allow_unknown_ca', None):
16    from M2Crypto.SSL.Context import map
17    from M2Crypto import m2
18
19    def ssl_verify_callback(ssl_ctx_ptr, x509_ptr, errnum, errdepth, ok):
20        unknown_issuer = [
21            m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
22            m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE,
23            m2.X509_V_ERR_CERT_UNTRUSTED,
24            m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
25            ]
26        ssl_ctx = map()[ssl_ctx_ptr]
27
28        if errnum in unknown_issuer: 
29            if ssl_ctx.get_allow_unknown_ca():
30                ok = 1
31        # CRL checking goes here...
32        if ok:
33            if ssl_ctx.get_verify_depth() >= errdepth:
34                ok = 1
35            else:
36                ok = 0
37        return ok
38else:
39    def ssl_verify_callback(ssl_ctx_ptr, x509_ptr, errnum, errdepth, ok):
40        raise ValueError("This should never be called")
41
[6ff0b91]42class fedd_ssl_context(SSL.Context):
43    """
44    Simple wrapper around an M2Crypto.SSL.Context to initialize it for fedd.
45    """
46    def __init__(self, my_cert, trusted_certs=None, password=None):
47        """
48        construct a fedd_ssl_context
49
50        @param my_cert: PEM file with my certificate in it
51        @param trusted_certs: PEM file with trusted certs in it (optional)
52        """
53        SSL.Context.__init__(self)
54
55        # load_cert takes a callback to get a password, not a password, so if
56        # the caller provided a password, this creates a nonce callback using a
57        # lambda form.
58        if password != None and not callable(password):
59            # This is cute.  password = lambda *args: password produces a
60            # function object that returns itself rather than one that returns
61            # the object itself.  This is because password is an object
62            # reference and after the assignment it's a lambda.  So we assign
63            # to a temp.
[632dd59]64            pwd = str(password)
[6ff0b91]65            password =lambda *args: pwd
66
[632dd59]67        # The calls to str below (and above) are because the underlying SSL
68        # stuff is intolerant of unicode.
[6ff0b91]69        if password != None:
[632dd59]70            self.load_cert(str(my_cert), callback=password)
[6ff0b91]71        else:
[632dd59]72            self.load_cert(str(my_cert))
[f069052]73
74        # If no trusted certificates are specified, allow unknown CAs.
75        if trusted_certs: 
76            self.load_verify_locations(trusted_certs)
77            self.set_verify(SSL.verify_peer, 10)
78        else:
[12e6ca8]79            # More legacy code.  Recent versions of M2Crypto express the
80            # allow_unknown_ca option through a callback turned to allow it.
81            # Older versions use a standard callback that respects the
82            # attribute.  This should work under both regines.
83            callb = getattr(SSL.cb, 'ssl_verify_callback_allow_unknown_ca', 
[fe157b9]84                    ssl_verify_callback)
[12e6ca8]85            self.set_allow_unknown_ca(True)
86            self.set_verify(SSL.verify_peer, 10, callback=callb)
[6ff0b91]87
[05191a6]88def read_simple_accessdb(fn, auth, mask=[]):
89    """
90    Read a simple access database.  Each line is a fedid (of the form
91    fedid:hexstring) and a comma separated list of atributes to be assigned to
92    it.  This parses out the fedids and adds the attributes to the authorizer.
93    comments (preceded with a #) and blank lines are ignored.  Exceptions (e.g.
94    file exceptions and ValueErrors from badly parsed lines) are propagated.
95    """
96
97    rv = [ ]
98    lineno = 0
99    fedid_line = re.compile("fedid:([" + string.hexdigits + "]+)\s+" +\
100            "(\w+\s*(,\s*\w+)*)")
101
102    # If a single string came in, make it a list
103    if isinstance(mask, basestring): mask = [ mask ]
104
105    f = open(fn, 'r')
106    for line in f:
107        lineno += 1
108        line = line.strip()
109        if line.startswith('#') or len(line) == 0: 
110            continue
111        m = fedid_line.match(line)
112        if m :
113            fid = fedid(hexstr=m.group(1))
114            for a in [ a.strip() for a in m.group(2).split(",") \
115                    if not mask or a.strip() in mask ]:
116                auth.set_attribute(fid, a.strip())
117        else:
118            raise ValueError("Badly formatted line in accessdb: %s line %d" %\
[cc8d8e9]119                    (fn, lineno))
[05191a6]120    f.close()
121    return rv
122       
123
[6ff0b91]124def pack_id(id):
125    """
[3e293e4]126    Return a dictionary with the field name set by the id type.  Handy for
127    creating dictionaries to be converted to messages.
[6ff0b91]128    """
[f8b118e]129    if isinstance(id, fedid): return { 'fedid': id }
[6ff0b91]130    elif id.startswith("http:") or id.startswith("https:"): return { 'uri': id }
[e40c7ee]131    else: return { 'localname': id}
[6ff0b91]132
[2106ed1]133def unpack_id(id):
134    """return id as a type determined by the key"""
[f8b118e]135    for k in ("localname", "fedid", "uri", "kerberosUsername"):
136        if id.has_key(k): return id[k]
[2106ed1]137    return None
138
[2c6128f]139def set_log_level(config, sect, log):
140    """ Set the logging level to the value passed in sect of config."""
141    # The getattr sleight of hand finds the logging level constant
142    # corrersponding to the string.  We're a little paranoid to avoid user
143    # mayhem.
144    if config.has_option(sect, "log_level"):
145        level_str = config.get(sect, "log_level")
146        try:
147            level = int(getattr(logging, level_str.upper(), -1))
148
149            if  logging.DEBUG <= level <= logging.CRITICAL:
150                log.setLevel(level)
151            else:
152                log.error("Bad experiment_log value: %s" % level_str)
153
154        except ValueError:
155            log.error("Bad experiment_log value: %s" % level_str)
156
[40dd8c1]157def copy_file(src, dest, size=1024):
158    """
159    Exceedingly simple file copy.  Throws an IOError if there's a problem.
160    """
161    s = open(src,'r')
162    d = open(dest, 'w')
163
164    buf = s.read(size)
165    while buf != "":
166        d.write(buf)
167        buf = s.read(size)
168    s.close()
169    d.close()
170
Note: See TracBrowser for help on using the repository browser.