source: fedd/federation/synch_store.py @ 444790d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 444790d was af75d48, checked in by Ted Faber <faber@…>, 15 years ago

Add the ability to revoke a synch key

  • Property mode set to 100644
File size: 7.0 KB
RevLine 
[7dc855c]1#!/usr/local/bin/python
2
3from threading import Lock, Condition
4
5class synch_store:
6    """
7    Serialized write-once dictionary.  Threads reading before the value for a
8    key is assigned wait for the assignment, though polling is an option.  Keys
9    may also be deleted, though deleting a value too closely after the set may
10    deadlock threads waiting for the assignment.
11    """
[5e3d889]12
13    # Thrown if a key is assigned more than once
[7dc855c]14    class CollisionError(Exception): pass
[5e3d889]15
16    # Thrown if a non-existant key is deleted
[7dc855c]17    class BadDeletionError(Exception): pass
18
[ef252e9]19    # Thrown if a key is specified with a white space
20    class BadKeyError(Exception): pass
21
[af75d48]22    # Thrown when a variable has been revoked.
23    class RevokedKeyError(Exception): pass
24
[5e3d889]25    # A value in the dict.  Each has its own condition (and lock) for threads
26    # to wait on.
[7dc855c]27    class synch_value:
[5e3d889]28        # The synch_value constructor.
29        def __init__(self, value=None):
[7dc855c]30            self.value = value
31            self.condition = Condition()
32
[5e3d889]33        # Debugging representation
[7dc855c]34        def __str__(self):
[5e3d889]35            return "value %s cond %s" % (self.value, self.condition)
[7dc855c]36
[5e3d889]37    # The synch_store constructor.  Note that the store has its own lock for
38    # serializing additions to the dict.
[7dc855c]39    def __init__(self):
40        self.main_lock = Lock()
41        self.values = { }
[af75d48]42        self.revoked = set([])
[7dc855c]43
[5e3d889]44    # Set the value in the dict.  If threads are waiting, wake them.
[7dc855c]45    def set_value(self, key, value):
[ef252e9]46        if key.find(' ') != -1:
47            raise self.BadKeyError("Whitespace is not allowed in keys")
48
[7dc855c]49        self.main_lock.acquire()
[af75d48]50        if key in self.revoked:
51            self.main_lock.release()
52            raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
[7dc855c]53        if self.values.has_key(key):
54            v = self.values[key]
55            v.condition.acquire()
56            self.main_lock.release()
57            if v.value is None:
58                v.value = value
59                v.condition.notifyAll()
60                v.condition.release()
61            else:
62                v.condition.release()
63                raise synch_store.CollisionError("%s already set" % key)
64        else:
[5e3d889]65            self.values[key] = synch_store.synch_value(value)
[7dc855c]66            self.main_lock.release()
67
[5e3d889]68    # Ask for the value.  If wait is true, wait until the key is set, if it is
69    # not already.  If wait is false, poll.
[7dc855c]70    def get_value(self, key, wait=True):
71        self.main_lock.acquire()
[af75d48]72        if key in self.revoked:
73            self.main_lock.release()
74            raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
[7dc855c]75        if not self.values.has_key(key):
[5e3d889]76            self.values[key] = synch_store.synch_value()
[7dc855c]77        v = self.values[key]
78        v.condition.acquire()
79        self.main_lock.release()
80
81        while v.value is None:
[af75d48]82            if wait: 
83                v.condition.wait()
84                v.condition.release()
85                self.main_lock.acquire()
86                if key in self.revoked:
87                    self.main_lock.release()
88                    raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
89                else:
90                    self.main_lock.release()
91                v.condition.acquire()
92            else:
93                break
[7dc855c]94        rv = v.value
95        v.condition.release()
96        return rv
97
[5e3d889]98    # Remove the key from the shared store.  If the key is not present or has
99    # never been set, fail.  Otherwise, wake any waiting threads (there
[af75d48]100    # shouldn't be any) and remove the value and key from the store.  Deleteing
101    # a revoked key removes its revocation.
[7dc855c]102    def del_value(self, key):
103        self.main_lock.acquire()
[af75d48]104        if key in self.revoked:
105            self.revoked.remove(key)
106            self.main_lock.release()
107        else:
108            if self.values.has_key(key):
109                v = self.values[key]
110                v.condition.acquire()
111                if v.value is not None:
112                    del self.values[key]
113                    self.main_lock.release()
114                    v.condition.notifyAll()
115                    v.condition.release()
116                else:
117                    self.main_lock.release()
118                    v.condition.release()
119                    raise synch_store.BadDeletionError("%s has not been set" \
120                            % key)
121            else:
122                self.main_lock.release()
123                raise synch_store.BadDeletionError("%s does not exist yet" \
124                        % key)
125
126    # Invlaidate this synchronization point.  Those waiting for it will get
127    # exceptions, and those trying to wait for it in the future will as well.
128    def revoke_key(self, key):
129        self.main_lock.acquire()
130        self.revoked.add(key)
[7dc855c]131        if self.values.has_key(key):
132            v = self.values[key]
[af75d48]133            del self.values[key]
134            self.main_lock.release()
[7dc855c]135            v.condition.acquire()
[af75d48]136            v.condition.notifyAll()
137            v.condition.release()
[7dc855c]138        else:
139            self.main_lock.release()
[af75d48]140
[7dc855c]141
[ef252e9]142    def all_keys(self):
143        self.main_lock.acquire()
144        rv = [ k for k in self.values.keys()]
145        self.main_lock.release()
146        return rv
147
148    # Save the store contents to a file in a simple way
149    def save(self, filename):
150        self.main_lock.acquire()
151        f = open(filename, 'w')
152        for k, v in self.values.items():
153            self.values[k].condition.acquire()
154            if self.values[k].value is not None:
155                print >> f, "%s %s" % ( k, str(self.values[k].value))
156            else:
157                print >> f, "%s" % k
158            self.values[k].condition.release()
159        self.main_lock.release()
160        f.close()
161
162    # Load the store contents from a file
163    def load(self, filename):
164        self.main_lock.acquire()
165        self.values = { }
166        f = open(filename, 'r')
167        for l in f:
168            i = l.find(' ')
169            if i != -1: self.values[l[:i]] = synch_store.synch_value(l[i+1:])
170            else: self.values[l] = synch_store.synch_value()
171        f.close()
172        self.main_lock.release()
173
[5e3d889]174    # A debugging representation
[7dc855c]175    def __str__(self):
176        rv = ""
177        self.main_lock.acquire()
178        for k, v in self.values.items():
179            self.values[k].condition.acquire()
[5e3d889]180            rv += "%s: %s\n" % ( k, str(self.values[k]))
[7dc855c]181            self.values[k].condition.release()
182        self.main_lock.release()
183        return rv
184
185#debugging scaffolding, left here in case we have problems
186if __name__ == '__main__':
187    from threading import currentThread, Thread
188    from time import sleep
189
190    def get_val(k, ss, wait=True):
191        print "%s: getting %s" % (currentThread(), k)
[af75d48]192        try:
193            v = ss.get_value(k, wait)
194        except synch_store.RevokedKeyError, e:
195            print "Error: %s" %e
196            return
[7dc855c]197        print "%s: %s is %s" % (currentThread(), k, v)
198
199
200    def set_val(k, v, ss):
201        print "setting %s to %s" % (k, v)
202        try:
203            ss.set_value(k, v)
204        except synch_store.CollisionError, e:
205            print "Error: %s" %e
206            return
[af75d48]207        except synch_store.RevokedKeyError, e:
208            print "Error: %s" %e
209            return
[7dc855c]210        print "Done with set"
211
212    def del_val(k, ss):
213        print "deleteing %s" % k
214        try:
215            ss.del_value(k)
216        except synch_store.BadDeletionError, e:
217            print "Error: %s" %e
218            return
[af75d48]219        except synch_store.RevokedKeyError, e:
220            print "Error: %s" %e
221            return
[7dc855c]222        print "Done with del"
223
[af75d48]224    def rev_val(k, ss):
225        print "revoking %s" % k
226        try:
227            ss.revoke_key(k)
228        except synch_store.BadDeletionError, e:
229            print "Error: %s" %e
230            return
231        except synch_store.RevokedKeyError, e:
232            print "Error: %s" %e
233            return
234        print "Done with revoking"
235
[7dc855c]236
237    ss = synch_store()
238    ss.set_value('b', 'initial')
239    print ss
240
241    for i in range(0,10):
242        t = Thread(target=get_val, args=('a', ss, i % 2 == 1))
243        t.start()
244    t = Thread(target=set_val, args=('a', '1', ss))
245    t.start()
246    for i in range(0,10):
247        t = Thread(target=get_val, args=('a', ss))
248        t.start()
249
250    print ss
251    t = Thread(target=del_val, args=('a', ss))
252    t.start()
253    print ss
254
[af75d48]255    for i in range(0,10):
256        t = Thread(target=get_val, args=('c', ss))
257        t.start()
258
259   
260    t = Thread(target=rev_val, args=('c', ss))
261    t.start()
[7dc855c]262
Note: See TracBrowser for help on using the repository browser.