source: fedd/federation/synch_store.py @ ef252e9

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since ef252e9 was ef252e9, checked in by Ted Faber <faber@…>, 14 years ago

Add support for loading and saving the synch points (pickle didn't work after
all) and enforce space free keys. That requirement is because of the very
simple file format.

  • Property mode set to 100644
File size: 5.1 KB
Line 
1#!/usr/local/bin/python
2
3from threading import Lock, Condition
4
5class synch_store:
6    """
7    Serialized write-once dictionary.  Threads reading before the value for a
8    key is assigned wait for the assignment, though polling is an option.  Keys
9    may also be deleted, though deleting a value too closely after the set may
10    deadlock threads waiting for the assignment.
11    """
12
13    # Thrown if a key is assigned more than once
14    class CollisionError(Exception): pass
15
16    # Thrown if a non-existant key is deleted
17    class BadDeletionError(Exception): pass
18
19    # Thrown if a key is specified with a white space
20    class BadKeyError(Exception): pass
21
22    # A value in the dict.  Each has its own condition (and lock) for threads
23    # to wait on.
24    class synch_value:
25        # The synch_value constructor.
26        def __init__(self, value=None):
27            self.value = value
28            self.condition = Condition()
29
30        # Debugging representation
31        def __str__(self):
32            return "value %s cond %s" % (self.value, self.condition)
33
34    # The synch_store constructor.  Note that the store has its own lock for
35    # serializing additions to the dict.
36    def __init__(self):
37        self.main_lock = Lock()
38        self.values = { }
39
40    # Set the value in the dict.  If threads are waiting, wake them.
41    def set_value(self, key, value):
42        if key.find(' ') != -1:
43            raise self.BadKeyError("Whitespace is not allowed in keys")
44
45        self.main_lock.acquire()
46        if self.values.has_key(key):
47            v = self.values[key]
48            v.condition.acquire()
49            self.main_lock.release()
50            if v.value is None:
51                v.value = value
52                v.condition.notifyAll()
53                v.condition.release()
54            else:
55                v.condition.release()
56                raise synch_store.CollisionError("%s already set" % key)
57        else:
58            self.values[key] = synch_store.synch_value(value)
59            self.main_lock.release()
60
61    # Ask for the value.  If wait is true, wait until the key is set, if it is
62    # not already.  If wait is false, poll.
63    def get_value(self, key, wait=True):
64        self.main_lock.acquire()
65        if not self.values.has_key(key):
66            self.values[key] = synch_store.synch_value()
67        v = self.values[key]
68        v.condition.acquire()
69        self.main_lock.release()
70
71        while v.value is None:
72            if wait: v.condition.wait()
73            else: break
74        rv = v.value
75        v.condition.release()
76        return rv
77
78    # Remove the key from the shared store.  If the key is not present or has
79    # never been set, fail.  Otherwise, wake any waiting threads (there
80    # shouldn't be any) and remove the value and key from the store.
81    def del_value(self, key):
82        self.main_lock.acquire()
83        if self.values.has_key(key):
84            v = self.values[key]
85            v.condition.acquire()
86            if v.value is not None:
87                del self.values[key]
88                self.main_lock.release()
89                v.condition.notifyAll()
90            else:
91                self.main_lock.release()
92                raise synch_store.BadDeletionError("%s has not been set" % key)
93        else:
94            self.main_lock.release()
95            raise synch_store.BadDeletionError("%s does not exist yet" % key)
96
97    def all_keys(self):
98        self.main_lock.acquire()
99        rv = [ k for k in self.values.keys()]
100        self.main_lock.release()
101        return rv
102
103    # Save the store contents to a file in a simple way
104    def save(self, filename):
105        self.main_lock.acquire()
106        f = open(filename, 'w')
107        for k, v in self.values.items():
108            self.values[k].condition.acquire()
109            if self.values[k].value is not None:
110                print >> f, "%s %s" % ( k, str(self.values[k].value))
111            else:
112                print >> f, "%s" % k
113            self.values[k].condition.release()
114        self.main_lock.release()
115        f.close()
116
117    # Load the store contents from a file
118    def load(self, filename):
119        self.main_lock.acquire()
120        self.values = { }
121        f = open(filename, 'r')
122        for l in f:
123            i = l.find(' ')
124            if i != -1: self.values[l[:i]] = synch_store.synch_value(l[i+1:])
125            else: self.values[l] = synch_store.synch_value()
126        f.close()
127        self.main_lock.release()
128
129    # A debugging representation
130    def __str__(self):
131        rv = ""
132        self.main_lock.acquire()
133        for k, v in self.values.items():
134            self.values[k].condition.acquire()
135            rv += "%s: %s\n" % ( k, str(self.values[k]))
136            self.values[k].condition.release()
137        self.main_lock.release()
138        return rv
139
140#debugging scaffolding, left here in case we have problems
141if __name__ == '__main__':
142    from threading import currentThread, Thread
143    from time import sleep
144
145    def get_val(k, ss, wait=True):
146        print "%s: getting %s" % (currentThread(), k)
147        v = ss.get_value(k, wait)
148        print "%s: %s is %s" % (currentThread(), k, v)
149
150
151    def set_val(k, v, ss):
152        print "setting %s to %s" % (k, v)
153        try:
154            ss.set_value(k, v)
155        except synch_store.CollisionError, e:
156            print "Error: %s" %e
157            return
158        print "Done with set"
159
160    def del_val(k, ss):
161        print "deleteing %s" % k
162        try:
163            ss.del_value(k)
164        except synch_store.BadDeletionError, e:
165            print "Error: %s" %e
166            return
167        print "Done with del"
168
169
170    ss = synch_store()
171    ss.set_value('b', 'initial')
172    print ss
173
174    for i in range(0,10):
175        t = Thread(target=get_val, args=('a', ss, i % 2 == 1))
176        t.start()
177    t = Thread(target=set_val, args=('a', '1', ss))
178    t.start()
179    for i in range(0,10):
180        t = Thread(target=get_val, args=('a', ss))
181        t.start()
182
183    print ss
184    t = Thread(target=del_val, args=('a', ss))
185    t.start()
186    print ss
187
188
Note: See TracBrowser for help on using the repository browser.