source: fedd/federation/synch_store.py @ 5e3d889

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5e3d889 was 5e3d889, checked in by Ted Faber <faber@…>, 14 years ago

Documentation and remove the redundant, unused key attribute from the synch_value

  • Property mode set to 100644
File size: 4.1 KB
Line 
1#!/usr/local/bin/python
2
3from threading import Lock, Condition
4
5class synch_store:
6    """
7    Serialized write-once dictionary.  Threads reading before the value for a
8    key is assigned wait for the assignment, though polling is an option.  Keys
9    may also be deleted, though deleting a value too closely after the set may
10    deadlock threads waiting for the assignment.
11    """
12
13    # Thrown if a key is assigned more than once
14    class CollisionError(Exception): pass
15
16    # Thrown if a non-existant key is deleted
17    class BadDeletionError(Exception): pass
18
19    # A value in the dict.  Each has its own condition (and lock) for threads
20    # to wait on.
21    class synch_value:
22        # The synch_value constructor.
23        def __init__(self, value=None):
24            self.value = value
25            self.condition = Condition()
26
27        # Debugging representation
28        def __str__(self):
29            return "value %s cond %s" % (self.value, self.condition)
30
31    # The synch_store constructor.  Note that the store has its own lock for
32    # serializing additions to the dict.
33    def __init__(self):
34        self.main_lock = Lock()
35        self.values = { }
36
37    # Set the value in the dict.  If threads are waiting, wake them.
38    def set_value(self, key, value):
39        self.main_lock.acquire()
40        if self.values.has_key(key):
41            v = self.values[key]
42            v.condition.acquire()
43            self.main_lock.release()
44            if v.value is None:
45                v.value = value
46                v.condition.notifyAll()
47                v.condition.release()
48            else:
49                v.condition.release()
50                raise synch_store.CollisionError("%s already set" % key)
51        else:
52            self.values[key] = synch_store.synch_value(value)
53            self.main_lock.release()
54
55    # Ask for the value.  If wait is true, wait until the key is set, if it is
56    # not already.  If wait is false, poll.
57    def get_value(self, key, wait=True):
58        self.main_lock.acquire()
59        if not self.values.has_key(key):
60            self.values[key] = synch_store.synch_value()
61        v = self.values[key]
62        v.condition.acquire()
63        self.main_lock.release()
64
65        while v.value is None:
66            if wait: v.condition.wait()
67            else: break
68        rv = v.value
69        v.condition.release()
70        return rv
71
72    # Remove the key from the shared store.  If the key is not present or has
73    # never been set, fail.  Otherwise, wake any waiting threads (there
74    # shouldn't be any) and remove the value and key from the store.
75    def del_value(self, key):
76        self.main_lock.acquire()
77        if self.values.has_key(key):
78            v = self.values[key]
79            v.condition.acquire()
80            if v.value is not None:
81                del self.values[key]
82                self.main_lock.release()
83                v.condition.notifyAll()
84            else:
85                self.main_lock.release()
86                raise synch_store.BadDeletionError("%s has not been set" % key)
87        else:
88            self.main_lock.release()
89            raise synch_store.BadDeletionError("%s does not exist yet" % key)
90
91    # A debugging representation
92    def __str__(self):
93        rv = ""
94        self.main_lock.acquire()
95        for k, v in self.values.items():
96            self.values[k].condition.acquire()
97            rv += "%s: %s\n" % ( k, str(self.values[k]))
98            self.values[k].condition.release()
99        self.main_lock.release()
100        return rv
101
102
103
104
105
106#debugging scaffolding, left here in case we have problems
107if __name__ == '__main__':
108    from threading import currentThread, Thread
109    from time import sleep
110
111    def get_val(k, ss, wait=True):
112        print "%s: getting %s" % (currentThread(), k)
113        v = ss.get_value(k, wait)
114        print "%s: %s is %s" % (currentThread(), k, v)
115
116
117    def set_val(k, v, ss):
118        print "setting %s to %s" % (k, v)
119        try:
120            ss.set_value(k, v)
121        except synch_store.CollisionError, e:
122            print "Error: %s" %e
123            return
124        print "Done with set"
125
126    def del_val(k, ss):
127        print "deleteing %s" % k
128        try:
129            ss.del_value(k)
130        except synch_store.BadDeletionError, e:
131            print "Error: %s" %e
132            return
133        print "Done with del"
134
135
136    ss = synch_store()
137    ss.set_value('b', 'initial')
138    print ss
139
140    for i in range(0,10):
141        t = Thread(target=get_val, args=('a', ss, i % 2 == 1))
142        t.start()
143    t = Thread(target=set_val, args=('a', '1', ss))
144    t.start()
145    for i in range(0,10):
146        t = Thread(target=get_val, args=('a', ss))
147        t.start()
148
149    print ss
150    t = Thread(target=del_val, args=('a', ss))
151    t.start()
152    print ss
153
154
Note: See TracBrowser for help on using the repository browser.