source: fedd/federation/proof.py @ dc49681

Last change on this file since dc49681 was df35876, checked in by Ted Faber <faber@…>, 12 years ago

Somehow recoding the pems got missed here...

  • Property mode set to 100644
File size: 4.6 KB
Line 
1#/usr/local/bin/python
2
3
4import ABAC
5import Creddy
6import sys
7import os, os.path
8import re
9
10from string import join
11from base64 import b64encode, b64decode
12from xml.parsers.expat import ParserCreate
13from tempfile import mkdtemp
14from shutil import rmtree
15
16class proof:
17    """
18    Class to make manipulating proofs a little simpler.  Keeps track of the
19    target and does some of the credential formatting.
20    """
21    def __init__(self, me, p, a, c):
22        """
23        P is the principal (string), a is the attribute to prove (string)
24        and c is the list of abac certificates that prove (or partially
25        prove) the connection.
26        """
27        self.prover = "%s" % me
28        self.principal = "%s" % p
29        self.attribute = a
30        self.creds = c or []
31
32    def creds_to_certs(self):
33        ids = set([c.issuer_cert() for c in self.creds])
34        attrs = set([c.attribute_cert() for c in self.creds])
35        out_ids = []
36        d = mkdtemp()
37        for i, c in enumerate(ids):
38            try:
39                der_name = os.path.join(d, 'file%04d.der' % i)
40                pem_name = os.path.join(d, 'file%04d.pem' % i)
41                f = open(der_name, 'w')
42                f.write(c)
43                f.close()
44
45                cid = Creddy.ID(der_name)
46
47                tf = open(pem_name, 'w')
48                cid.write_cert(tf)
49                tf.close()
50                tf = open(pem_name, 'r')
51                out_ids.append(tf.read())
52                tf.close()
53            except EnvironmentError, e:
54                print "Certs to creds Error: %s" % e
55                pass
56        rmtree(d)
57
58        return out_ids + list(attrs)
59
60    def __str__(self):
61        """
62        User friendly output format
63        """
64        rv = "prover: %s\nprincipal: %s\nattribute: %s" % \
65                (self.prover, self.principal, self.attribute)
66        if self.creds:
67            rv += '\n'
68            rv += join(["%s <- %s"%(c.head().string(), c.tail().string()) \
69                for c in self.creds], '\n')
70        return rv
71
72    def to_dict(self):
73        """
74        Dict with the credentials encoded as X.509 certificates
75        """
76
77        return { 
78                'prover': self.prover,
79                'principal': self.principal,
80                'attribute': self.attribute,
81                'credential': self.creds_to_certs(),
82                }
83    def to_context(self):
84        """
85        Return an ABAC context containing just the credentials in the proof
86        """
87        ctxt = ABAC.Context()
88        for c in self.creds_to_certs():
89            if ctxt.load_id_chunk(c) != ABAC.ABAC_CERT_SUCCESS:
90                ctxt.load_attribute_chunk(c)
91        return ctxt
92
93    def to_xml(self):
94        """
95        Convert the proof to a simple XML format
96        """
97        rv = "<proof>\n"
98        rv += "<prover>%s</prover>\n" % self.prover
99        rv += "<principal>%s</principal>\n" % self.principal
100        rv += "<attribute>%s</attribute>\n" % self.attribute
101        for c in self.creds_to_certs(): 
102            rv += "<credential>%s</credential>\n" % b64encode(c)
103        rv += "</proof>"
104        return rv
105
106    def save_to_xml(self, fn, mode='w'):
107        """
108        Write an XML reprsentation to fn.  Mode is 'w' or 'a' to overwrite or
109        append to the file.
110        """
111        f = open(fn, mode)
112        print >>f, self.to_xml()
113        f.close()
114
115    @staticmethod
116    def from_xml(filename=None, file=None, string=None):
117        """
118        Create a proof from the XML representation
119        """
120        class parse_proof:
121            def element_end(self, name):
122                if name =='proof':
123                    self.proof = proof(self.me, self.pr, self.attr, 
124                            self.context.credentials())
125                elif name =='prover':
126                    self.me = self.chars.strip()
127                elif name =='principal':
128                    self.pr = self.chars.strip()
129                elif name == 'attribute':
130                    self.attr = self.chars.strip()
131                elif name == 'credential':
132                    c = b64decode(self.chars)
133                    if self.context.load_id_chunk(c) != ABAC.ABAC_CERT_SUCCESS:
134                        self.context.load_attribute_chunk(c)
135                else:
136                    print "Doh!"
137
138            def element_start(self, name, attrs):
139                self.chars = ''
140
141            def cdata(self, data):
142                self.chars += data
143
144
145            def __init__(self):
146                self.p = ParserCreate()
147                self.me = None
148                self.pr = None
149                self.attr = None
150                self.proof = None
151                self.context = ABAC.Context()
152                self.chars = ''
153
154                self.p.CharacterDataHandler = self.cdata
155                self.p.EndElementHandler = self.element_end
156                self.p.StartElementHandler = self.element_start
157
158        pp = parse_proof()
159        if filename: pp.p.ParseFile(open(filename, 'r'))
160        elif file: pp.p.ParseFile(file)
161        elif string: pp.p.Parse(string, True)
162        else: print "Doh1"
163
164        return pp.proof
165    @staticmethod
166    def pem_to_der(c):
167        pat = '-----BEGIN CERTIFICATE-----(.*)-----END CERTIFICATE'
168        m = re.match(pat, c, re.DOTALL)
169        if m: return b64decode(m.group(1))
170        else: return c
171
172
173    @staticmethod
174    def from_dict(d):
175        """
176        Reverse the to_dict process
177        """
178        if d: 
179            if 'prover' in d and 'principal' in d and 'attribute' in d:
180                ac = ABAC.Context()
181                self = proof(d['prover'], d['principal'], d['attribute'], None)
182                for c in d.get('credential', []):
183                    c = self.pem_to_der(c)
184                    if ac.load_id_chunk(c)!= ABAC.ABAC_CERT_SUCCESS:
185                        ac.load_attribute_chunk(c)
186                self.creds = ac.credentials()
187                return self
188            else:
189                return None
190        else:
191            return None
Note: See TracBrowser for help on using the repository browser.