#!/usr/local/bin/python import sys from M2Crypto import SSL, X509, EVP from pyasn1.codec.der import decoder # The version of M2Crypto on users is pretty old and doesn't have all the # features that are useful. The legacy code is somewhat more brittle than the # main line, but will work. if "as_der" not in dir(EVP.PKey): from asn1_raw import get_key_bits_from_file, get_key_bits_from_cert legacy = True else: legacy = False class fedd_ssl_context(SSL.Context): """ Simple wrapper around an M2Crypto.SSL.Context to initialize it for fedd. """ def __init__(self, my_cert, trusted_certs=None, password=None): """ construct a fedd_ssl_context @param my_cert: PEM file with my certificate in it @param trusted_certs: PEM file with trusted certs in it (optional) """ SSL.Context.__init__(self) # load_cert takes a callback to get a password, not a password, so if # the caller provided a password, this creates a nonce callback using a # lambda form. if password != None and not callable(password): # This is cute. password = lambda *args: password produces a # function object that returns itself rather than one that returns # the object itself. This is because password is an object # reference and after the assignment it's a lambda. So we assign # to a temp. pwd = password password =lambda *args: pwd if password != None: self.load_cert(my_cert, callback=password) else: self.load_cert(my_cert) if trusted_certs != None: self.load_verify_locations(trusted_certs) self.set_verify(SSL.verify_peer, 10) class fedid: """ Wrapper around the federated ID from an X509 certificate. """ HASHSIZE=20 def __init__(self, bits=None, hexstr=None, cert=None, file=None): if bits != None: self.set_bits(bits) elif hexstr != None: self.set_hexstr(hexstr) elif cert != None: self.set_cert(cert) elif file != None: self.set_file(file) else: self.buf = None def __hash__(self): return hash(self.buf) def __eq__(self, other): if isinstance(other, type(self)): return self.buf == other.buf elif isinstance(other, type(str())): return self.buf == other; else: return False def __ne__(self, other): return not self.__eq__(other) def __str__(self): if self.buf != None: return str().join([ "%02x" % ord(x) for x in self.buf]) else: return "" def __repr__(self): return "fedid(hexstr='%s')" % self.__str__() def pack_soap(self): return self.buf def digest_bits(self, bits): """Internal function. Compute the fedid from bits and store in buf""" d = EVP.MessageDigest('sha1') d.update(bits) self.buf = d.final() def set_hexstr(self, hexstr): h = hexstr.replace(':','') self.buf= str().join([chr(int(h[i:i+2],16)) \ for i in range(0,2*fedid.HASHSIZE,2)]) def get_hexstr(self): """Return the hexstring representation of the fedid""" return __str__(self) def set_bits(self, bits): """Set the fedid to bits(a 160 bit buffer)""" self.buf = bits def get_bits(self): """Get the 160 bit buffer from the fedid""" return self.buf def set_file(self, file): """Get the fedid from a certificate file Calculate the SHA1 hash over the bit string of the public key as defined in RFC3280. """ self.buf = None if legacy: self.digest_bits(get_key_bits_from_file(file)) else: self.set_cert(X509.load_cert(file)) def set_cert(self, cert): """Get the fedid from a certificate. Calculate the SHA1 hash over the bit string of the public key as defined in RFC3280. """ self.buf = None if (cert != None): if legacy: self.digest_bits(get_key_bits_from_cert(cert)) else: b = [] k = cert.get_pubkey() # Getting the key was easy, but getting the bit string of the # key requires a side trip through ASN.1 dec = decoder.decode(k.as_der()) # kv is a tuple of the bits in the key. The loop below # recombines these into bytes and then into a buffer for the # SSL digest function. kv = dec[0].getComponentByPosition(1) for i in range(0, len(kv), 8): v = 0 for j in range(0, 8): v = (v << 1) + kv[i+j] b.append(v) # The comprehension turns b from a list of bytes into a buffer # (string) of bytes self.digest_bits(str().join([chr(x) for x in b])) def pack_id(id): """ Return a dictionary with the field name set by the id type. Handy for creating dictionaries to be converted to messages. """ if isinstance(id, type(fedid())): return { 'fedid': id } elif id.startswith("http:") or id.startswith("https:"): return { 'uri': id } else: return { 'username': id} def unpack_id(id): """return id as a type determined by the key""" if id.has_key("fedid"): return fedid(id["fedid"]) else: for k in ("username", "uri", "kerberosUsername"): if id.has_key(k): return id[k] return None def pack_soap(container, name, contents): """ Convert the dictionary in contents into a tree of ZSI classes. The holder classes are constructed from factories in container and assigned to either the element or attribute name. This is used to recursively create the SOAP message. """ if getattr(contents, "__iter__", None) != None: obj = getattr(container, "new_%s" % name, None)() for e, v in contents.iteritems(): assign = getattr(obj, "set_element_%s" % e, None) or \ getattr(obj, "set_attribute_%s" % e, None) if isinstance(v, type(dict())): assign(pack_soap(obj, e, v)) elif getattr(v, "__iter__", None) != None: assign([ pack_soap(obj, e, val ) for val in v]) elif getattr(v, "pack_soap", None) != None: assign(v.pack_soap()) else: assign(v) return obj else: return contents def unpack_soap(element): """ Convert a tree of ZSI SOAP classes intro a hash. The inverse of pack_soap Elements or elements that are empty are ignored. """ methods = [ m for m in dir(element) \ if m.startswith("get_element") or m.startswith("get_attribute")] if len(methods) > 0: rv = { } for m in methods: if m.startswith("get_element_"): n = m.replace("get_element_","",1) else: n = m.replace("get_attribute_", "", 1) sub = getattr(element, m)() if sub != None: if isinstance(sub, basestring): rv[n] = sub elif getattr(sub, "__iter__", None) != None: if len(sub) > 0: rv[n] = [unpack_soap(e) for e in sub] else: rv[n] = unpack_soap(sub) return rv else: return element