#!/usr/local/bin/python import sys from M2Crypto import SSL, X509, EVP from pyasn1.codec.der import decoder # The version of M2Crypto on users is pretty old and doesn't have all the # features that are useful. The legacy code is somewhat more brittle than the # main line, but will work. if "as_der" not in dir(EVP.PKey): from asn1_raw import get_key_bits_from_file, get_key_bits_from_cert legacy = True else: legacy = False class fedd_ssl_context(SSL.Context): """ Simple wrapper around an M2Crypto.SSL.Context to initialize it for fedd. """ def __init__(self, my_cert, trusted_certs=None, password=None): """ construct a fedd_ssl_context @param my_cert: PEM file with my certificate in it @param trusted_certs: PEM file with trusted certs in it (optional) """ SSL.Context.__init__(self) # load_cert takes a callback to get a password, not a password, so if # the caller provided a password, this creates a nonce callback using a # lambda form. if password != None and not callable(password): # This is cute. password = lambda *args: password produces a # function object that returns itself rather than one that returns # the object itself. This is because password is an object # reference and after the assignment it's a lambda. So we assign # to a temp. pwd = password password =lambda *args: pwd if password != None: self.load_cert(my_cert, callback=password) else: self.load_cert(my_cert) if trusted_certs != None: self.load_verify_locations(trusted_certs) self.set_verify(SSL.verify_peer, 10) class fedid: """ Wrapper around the federated ID from an X509 certificate. """ HASHSIZE=20 def __init__(self, bits=None, hexstr=None, cert=None, file=None): if bits != None: self.set_bits(bits) elif hexstr != None: self.set_hexstr(hexstr) elif cert != None: self.set_cert(cert) elif file != None: self.set_file(file) else: self.buf = None def __hash__(self): return hash(self.buf) def __eq__(self, other): if isinstance(other, type(self)): return self.buf == other.buf elif isinstance(other, type(str())): return self.buf == other; else: return False def __ne__(self, other): return not self.__eq__(other) def __str__(self): if self.buf != None: return str().join([ "%02x" % ord(x) for x in self.buf]) else: return "" def __repr__(self): return "fedid(hexstr='%s')" % self.__str__() def pack_soap(self): return self.buf def digest_bits(self, bits): """Internal function. Compute the fedid from bits and store in buf""" d = EVP.MessageDigest('sha1') d.update(bits) self.buf = d.final() def set_hexstr(self, hexstr): h = hexstr.replace(':','') self.buf= str().join([chr(int(h[i:i+2],16)) \ for i in range(0,2*fedid.HASHSIZE,2)]) def get_hexstr(self): """Return the hexstring representation of the fedid""" return __str__(self) def set_bits(self, bits): """Set the fedid to bits(a 160 bit buffer)""" self.buf = bits def get_bits(self): """Get the 160 bit buffer from the fedid""" return self.buf def set_file(self, file): """Get the fedid from a certificate file Calculate the SHA1 hash over the bit string of the public key as defined in RFC3280. """ self.buf = None if legacy: self.digest_bits(get_key_bits_from_file(file)) else: self.set_cert(X509.load_cert(file)) def set_cert(self, cert): """Get the fedid from a certificate. Calculate the SHA1 hash over the bit string of the public key as defined in RFC3280. """ self.buf = None if (cert != None): if legacy: self.digest_bits(get_key_bits_from_cert(cert)) else: b = [] k = cert.get_pubkey() # Getting the key was easy, but getting the bit string of the # key requires a side trip through ASN.1 dec = decoder.decode(k.as_der()) # kv is a tuple of the bits in the key. The loop below # recombines these into bytes and then into a buffer for the # SSL digest function. kv = dec[0].getComponentByPosition(1) for i in range(0, len(kv), 8): v = 0 for j in range(0, 8): v = (v << 1) + kv[i+j] b.append(v) # The comprehension turns b from a list of bytes into a buffer # (string) of bytes self.digest_bits(str().join([chr(x) for x in b])) def print_soap(soap, out=sys.stdout, level=0, show_all=False): """Recursively print a soap element to out. This is printed as pseudo XML with a block around each element and the element printed indented. If show_all is true, even empty elements are displayed. """ indent = level* "\t" methods = [ method for method in dir(soap) \ if method.startswith("get_element") and \ callable(getattr(soap, method))] if len(methods) != 0: for m in methods: element = getattr(soap, m)(); if element != None or show_all: # Element may be a list of repetitions of the same element. If # so set elements to be that list, otherwise create a one entry # tuple. In either case we're going to iterate over elements. if getattr(element,"__iter__", None) == None: elements = (element,) else: elements = element for e in elements: # attrs are the element attributes for this element and # they're printed in the framing XML rather than by the # recursive call. attrs = [ attr for attr in dir(e)\ if attr.startswith("get_attribute") and \ callable(getattr(e,attr))] print >> out, "%s<%s" % (indent, m[len("get_element_"):]), for a in attrs: print >>out, ' %s="%s"' % \ (a[len("get_attribute_"):], getattr(e,a)()), print >>out, ">" print_soap(e, out, level+1) print >>out, "%s" % (indent, m[len("get_element_"):]) else: print >> out, "%s%s" % (indent, soap) def pack_id(id): """ Return a dictionary with the field name set by the id type """ if isinstance(id, type(fedid())): return { 'fedid': id } elif id.startswith("http:") or id.startswith("https:"): return { 'uri': id } else: return { 'username': id} def pack_soap(container, name, contents): if getattr(contents, "__iter__", None) != None: obj = getattr(container, "new_%s" % name, None)() for e, v in contents.iteritems(): assign = getattr(obj, "set_element_%s" % e, None) or \ getattr(obj, "set_attribute_%s" % e, None) if isinstance(v, type(dict())): assign(pack_soap(obj, e, v)) elif getattr(v, "__iter__", None) != None: assign([ pack_soap(obj, e, val ) for val in v]) elif getattr(v, "pack_soap", None) != None: assign(v.pack_soap()) else: assign(v) return obj else: return contents