source: fedd/fedd_util.py @ 6ff0b91

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 6ff0b91 was 6ff0b91, checked in by Ted Faber <faber@…>, 16 years ago

Reimplementation of fedd as a start in merging the codebases

  • Property mode set to 100644
File size: 6.5 KB
Line 
1#!/usr/local/bin/python
2
3import sys
4
5from M2Crypto import SSL, X509, EVP
6from pyasn1.codec.der import decoder
7
8
9# The version of M2Crypto on users is pretty old and doesn't have all the
10# features that are useful.  The legacy code is somewhat more brittle than the
11# main line, but will work.
12if "as_der" not in dir(EVP.PKey):
13    from asn1_raw import get_key_bits_from_file, get_key_bits_from_cert
14    legacy = True
15else:
16    legacy = False
17
18class fedd_ssl_context(SSL.Context):
19    """
20    Simple wrapper around an M2Crypto.SSL.Context to initialize it for fedd.
21    """
22    def __init__(self, my_cert, trusted_certs=None, password=None):
23        """
24        construct a fedd_ssl_context
25
26        @param my_cert: PEM file with my certificate in it
27        @param trusted_certs: PEM file with trusted certs in it (optional)
28        """
29        SSL.Context.__init__(self)
30
31        # load_cert takes a callback to get a password, not a password, so if
32        # the caller provided a password, this creates a nonce callback using a
33        # lambda form.
34        if password != None and not callable(password):
35            # This is cute.  password = lambda *args: password produces a
36            # function object that returns itself rather than one that returns
37            # the object itself.  This is because password is an object
38            # reference and after the assignment it's a lambda.  So we assign
39            # to a temp.
40            pwd = password
41            password =lambda *args: pwd
42
43        if password != None:
44            self.load_cert(my_cert, callback=password)
45        else:
46            self.load_cert(my_cert)
47        if trusted_certs != None: self.load_verify_locations(trusted_certs)
48        self.set_verify(SSL.verify_peer, 10)
49
50class fedid:
51    """
52    Wrapper around the federated ID from an X509 certificate.
53    """
54    HASHSIZE=20
55    def __init__(self, bits=None, hexstr=None, cert=None, file=None):
56        if bits != None:
57            self.set_bits(bits)
58        elif hexstr != None:
59            self.set_hexstr(hexstr)
60        elif cert != None:
61            self.set_cert(cert)
62        elif file != None:
63            self.set_file(file)
64        else:
65            self.buf = None
66
67    def __hash__(self):
68        return hash(self.buf)
69
70    def __eq__(self, other):
71        if isinstance(other, type(self)):
72            return self.buf == other.buf
73        elif isinstance(other, type(str())):
74            return self.buf == other;
75        else:
76            return False
77
78    def __ne__(self, other): return not self.__eq__(other)
79
80    def __str__(self):
81        if self.buf != None:
82            return str().join([ "%02x" % ord(x) for x in self.buf])
83        else: return ""
84
85    def __repr__(self):
86        return "fedid(hexstr='%s')" % self.__str__()
87
88    def pack_soap(self):
89        return self.buf
90
91    def digest_bits(self, bits):
92        """Internal function.  Compute the fedid from bits and store in buf"""
93        d = EVP.MessageDigest('sha1')
94        d.update(bits)
95        self.buf = d.final()
96
97
98    def set_hexstr(self, hexstr):
99        h = hexstr.replace(':','')
100        self.buf= str().join([chr(int(h[i:i+2],16)) \
101                for i in range(0,2*fedid.HASHSIZE,2)])
102
103    def get_hexstr(self):
104        """Return the hexstring representation of the fedid"""
105        return __str__(self)
106
107    def set_bits(self, bits):
108        """Set the fedid to bits(a 160 bit buffer)"""
109        self.buf = bits
110
111    def get_bits(self):
112        """Get the 160 bit buffer from the fedid"""
113        return self.buf
114
115    def set_file(self, file):
116        """Get the fedid from a certificate file
117
118        Calculate the SHA1 hash over the bit string of the public key as
119        defined in RFC3280.
120        """
121        self.buf = None
122        if legacy: self.digest_bits(get_key_bits_from_file(file))
123        else: self.set_cert(X509.load_cert(file))
124
125    def set_cert(self, cert):
126        """Get the fedid from a certificate.
127
128        Calculate the SHA1 hash over the bit string of the public key as
129        defined in RFC3280.
130        """
131
132        self.buf = None
133        if (cert != None):
134            if legacy:
135                self.digest_bits(get_key_bits_from_cert(cert))
136            else:
137                b = []
138                k = cert.get_pubkey()
139
140                # Getting the key was easy, but getting the bit string of the
141                # key requires a side trip through ASN.1
142                dec = decoder.decode(k.as_der())
143
144                # kv is a tuple of the bits in the key.  The loop below
145                # recombines these into bytes and then into a buffer for the
146                # SSL digest function.
147                kv =  dec[0].getComponentByPosition(1)
148                for i in range(0, len(kv), 8):
149                    v = 0
150                    for j in range(0, 8):
151                        v = (v << 1) + kv[i+j]
152                    b.append(v)
153                # The comprehension turns b from a list of bytes into a buffer
154                # (string) of bytes
155                self.digest_bits(str().join([chr(x) for x in b]))
156
157def print_soap(soap, out=sys.stdout, level=0, show_all=False):
158    """Recursively print a soap element to out.
159
160    This is printed as pseudo XML with a <x></x> block around each element and
161    the element printed indented.  If show_all is true, even empty elements are
162    displayed.
163    """
164
165    indent = level* "\t"
166
167    methods = [ method for method in dir(soap) \
168            if method.startswith("get_element") and \
169            callable(getattr(soap, method))]
170    if len(methods) != 0:
171        for m in methods:
172            element = getattr(soap, m)();
173            if element != None or show_all:
174                # Element may be a list of repetitions of the same element. If
175                # so set elements to be that list, otherwise create a one entry
176                # tuple.  In either case we're going to iterate over elements.
177                if getattr(element,"__iter__", None) == None:
178                    elements = (element,)
179                else:
180                    elements = element
181                for e in elements:
182                    # attrs are the element attributes for this element and
183                    # they're printed in the framing XML rather than by the
184                    # recursive call.
185                    attrs = [ attr for attr in dir(e)\
186                            if attr.startswith("get_attribute") and \
187                            callable(getattr(e,attr))]
188                    print >> out, "%s<%s" % (indent, m[len("get_element_"):]),
189                    for a in attrs:
190                        print >>out, ' %s="%s"' % \
191                            (a[len("get_attribute_"):], getattr(e,a)()),
192                    print >>out, ">"
193                    print_soap(e, out, level+1)
194                    print >>out, "%s</%s>" % (indent, m[len("get_element_"):])
195    else:
196        print >> out, "%s%s" % (indent, soap)
197
198def pack_id(id):
199    """
200    Return a dictionary with the field name set by the id type
201    """
202    if isinstance(id, type(fedid())): return { 'fedid': id }
203    elif id.startswith("http:") or id.startswith("https:"): return { 'uri': id }
204    else: return { 'username': id}
205
206def pack_soap(container, name, contents):
207    if getattr(contents, "__iter__", None) != None:
208        obj = getattr(container, "new_%s" % name, None)()
209        for e, v in contents.iteritems():
210            assign = getattr(obj, "set_element_%s" % e, None) or \
211                    getattr(obj, "set_attribute_%s" % e, None)
212            if isinstance(v, type(dict())):
213                assign(pack_soap(obj, e, v))
214            elif getattr(v, "__iter__", None) != None:
215                assign([ pack_soap(obj, e, val ) for val in v])
216            elif getattr(v, "pack_soap", None) != None:
217                assign(v.pack_soap())
218            else:
219                assign(v)
220        return obj
221    else: return contents
Note: See TracBrowser for help on using the repository browser.