source: fedd/federation/proof.py @ d75005b

Last change on this file since d75005b was 67fa1cf, checked in by Ted Faber <faber@…>, 11 years ago

MOve over to ABAC 0.1.4

  • Property mode set to 100644
File size: 4.5 KB
RevLine 
[e83f2f2]1#/usr/local/bin/python
2
3
4import ABAC
5import sys
6import os, os.path
[df35876]7import re
[e83f2f2]8
9from string import join
10from base64 import b64encode, b64decode
11from xml.parsers.expat import ParserCreate
[cd5b279]12from tempfile import mkdtemp
13from shutil import rmtree
[e83f2f2]14
15class proof:
16    """
17    Class to make manipulating proofs a little simpler.  Keeps track of the
18    target and does some of the credential formatting.
19    """
20    def __init__(self, me, p, a, c):
21        """
22        P is the principal (string), a is the attribute to prove (string)
23        and c is the list of abac certificates that prove (or partially
24        prove) the connection.
25        """
26        self.prover = "%s" % me
27        self.principal = "%s" % p
28        self.attribute = a
29        self.creds = c or []
30
31    def creds_to_certs(self):
32        ids = set([c.issuer_cert() for c in self.creds])
33        attrs = set([c.attribute_cert() for c in self.creds])
[cd5b279]34        out_ids = []
35        d = mkdtemp()
36        for i, c in enumerate(ids):
37            try:
38                der_name = os.path.join(d, 'file%04d.der' % i)
39                pem_name = os.path.join(d, 'file%04d.pem' % i)
40                f = open(der_name, 'w')
41                f.write(c)
42                f.close()
43
[67fa1cf]44                cid = ABAC.ID(der_name)
[cd5b279]45
46                tf = open(pem_name, 'w')
47                cid.write_cert(tf)
48                tf.close()
49                tf = open(pem_name, 'r')
50                out_ids.append(tf.read())
51                tf.close()
52            except EnvironmentError, e:
53                print "Certs to creds Error: %s" % e
54                pass
55        rmtree(d)
56
57        return out_ids + list(attrs)
[e83f2f2]58
59    def __str__(self):
60        """
61        User friendly output format
62        """
63        rv = "prover: %s\nprincipal: %s\nattribute: %s" % \
64                (self.prover, self.principal, self.attribute)
65        if self.creds:
66            rv += '\n'
67            rv += join(["%s <- %s"%(c.head().string(), c.tail().string()) \
68                for c in self.creds], '\n')
69        return rv
70
71    def to_dict(self):
72        """
73        Dict with the credentials encoded as X.509 certificates
74        """
75
76        return { 
77                'prover': self.prover,
78                'principal': self.principal,
79                'attribute': self.attribute,
80                'credential': self.creds_to_certs(),
81                }
82    def to_context(self):
83        """
84        Return an ABAC context containing just the credentials in the proof
85        """
86        ctxt = ABAC.Context()
87        for c in self.creds_to_certs():
88            if ctxt.load_id_chunk(c) != ABAC.ABAC_CERT_SUCCESS:
89                ctxt.load_attribute_chunk(c)
90        return ctxt
91
92    def to_xml(self):
93        """
94        Convert the proof to a simple XML format
95        """
96        rv = "<proof>\n"
97        rv += "<prover>%s</prover>\n" % self.prover
98        rv += "<principal>%s</principal>\n" % self.principal
99        rv += "<attribute>%s</attribute>\n" % self.attribute
100        for c in self.creds_to_certs(): 
101            rv += "<credential>%s</credential>\n" % b64encode(c)
102        rv += "</proof>"
103        return rv
104
105    def save_to_xml(self, fn, mode='w'):
106        """
107        Write an XML reprsentation to fn.  Mode is 'w' or 'a' to overwrite or
108        append to the file.
109        """
110        f = open(fn, mode)
111        print >>f, self.to_xml()
112        f.close()
113
114    @staticmethod
115    def from_xml(filename=None, file=None, string=None):
116        """
117        Create a proof from the XML representation
118        """
119        class parse_proof:
120            def element_end(self, name):
121                if name =='proof':
122                    self.proof = proof(self.me, self.pr, self.attr, 
123                            self.context.credentials())
124                elif name =='prover':
125                    self.me = self.chars.strip()
126                elif name =='principal':
127                    self.pr = self.chars.strip()
128                elif name == 'attribute':
129                    self.attr = self.chars.strip()
130                elif name == 'credential':
131                    c = b64decode(self.chars)
132                    if self.context.load_id_chunk(c) != ABAC.ABAC_CERT_SUCCESS:
133                        self.context.load_attribute_chunk(c)
134                else:
135                    print "Doh!"
136
137            def element_start(self, name, attrs):
138                self.chars = ''
139
140            def cdata(self, data):
141                self.chars += data
142
143
144            def __init__(self):
145                self.p = ParserCreate()
146                self.me = None
147                self.pr = None
148                self.attr = None
149                self.proof = None
150                self.context = ABAC.Context()
151                self.chars = ''
152
153                self.p.CharacterDataHandler = self.cdata
154                self.p.EndElementHandler = self.element_end
155                self.p.StartElementHandler = self.element_start
156
157        pp = parse_proof()
158        if filename: pp.p.ParseFile(open(filename, 'r'))
159        elif file: pp.p.ParseFile(file)
160        elif string: pp.p.Parse(string, True)
161        else: print "Doh1"
162
163        return pp.proof
[df35876]164    @staticmethod
165    def pem_to_der(c):
166        pat = '-----BEGIN CERTIFICATE-----(.*)-----END CERTIFICATE'
167        m = re.match(pat, c, re.DOTALL)
168        if m: return b64decode(m.group(1))
169        else: return c
170
[e83f2f2]171
172    @staticmethod
173    def from_dict(d):
174        """
175        Reverse the to_dict process
176        """
177        if d: 
178            if 'prover' in d and 'principal' in d and 'attribute' in d:
179                ac = ABAC.Context()
180                self = proof(d['prover'], d['principal'], d['attribute'], None)
181                for c in d.get('credential', []):
[df35876]182                    c = self.pem_to_der(c)
183                    if ac.load_id_chunk(c)!= ABAC.ABAC_CERT_SUCCESS:
[e83f2f2]184                        ac.load_attribute_chunk(c)
185                self.creds = ac.credentials()
186                return self
187            else:
188                return None
189        else:
190            return None
Note: See TracBrowser for help on using the repository browser.