source: fedd/federation/synch_store.py @ 7dc855c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 7dc855c was 7dc855c, checked in by Ted Faber <faber@…>, 14 years ago

Synchronized write once hash for storing stitching values

  • Property mode set to 100644
File size: 3.3 KB
Line 
1#!/usr/local/bin/python
2
3from threading import Lock, Condition
4
5class synch_store:
6    """
7    Serialized write-once dictionary.  Threads reading before the value for a
8    key is assigned wait for the assignment, though polling is an option.  Keys
9    may also be deleted, though deleting a value too closely after the set may
10    deadlock threads waiting for the assignment.
11    """
12    class CollisionError(Exception): pass
13    class BadDeletionError(Exception): pass
14
15    class synch_value:
16        def __init__(self, key, value=None):
17            self.key = key
18            self.value = value
19            self.condition = Condition()
20
21        def __str__(self):
22            return "key %s value %s cond %s" % \
23                (self.key, self.value, self.condition)
24
25    def __init__(self):
26        self.main_lock = Lock()
27        self.values = { }
28
29    def set_value(self, key, value):
30        self.main_lock.acquire()
31        if self.values.has_key(key):
32            v = self.values[key]
33            v.condition.acquire()
34            self.main_lock.release()
35            if v.value is None:
36                v.value = value
37                v.condition.notifyAll()
38                v.condition.release()
39            else:
40                v.condition.release()
41                raise synch_store.CollisionError("%s already set" % key)
42        else:
43            self.values[key] = synch_store.synch_value(key, value)
44            self.main_lock.release()
45
46    def get_value(self, key, wait=True):
47        self.main_lock.acquire()
48        if not self.values.has_key(key):
49            self.values[key] = synch_store.synch_value(key)
50        v = self.values[key]
51        v.condition.acquire()
52        self.main_lock.release()
53
54        while v.value is None:
55            if wait: v.condition.wait()
56            else: break
57        rv = v.value
58        v.condition.release()
59        return rv
60
61    def del_value(self, key):
62        self.main_lock.acquire()
63        if self.values.has_key(key):
64            v = self.values[key]
65            v.condition.acquire()
66            if v.value is not None:
67                del self.values[key]
68                self.main_lock.release()
69                v.condition.notifyAll()
70            else:
71                self.main_lock.release()
72                raise synch_store.BadDeletionError("%s has not been set" % key)
73        else:
74            self.main_lock.release()
75            raise synch_store.BadDeletionError("%s does not exist yet" % key)
76
77    def __str__(self):
78        rv = ""
79        self.main_lock.acquire()
80        for k, v in self.values.items():
81            self.values[k].condition.acquire()
82            rv += "%s: %s" % ( k, str(self.values[k]))
83            self.values[k].condition.release()
84        self.main_lock.release()
85        return rv
86
87
88
89
90
91#debugging scaffolding, left here in case we have problems
92if __name__ == '__main__':
93    from threading import currentThread, Thread
94    from time import sleep
95
96    def get_val(k, ss, wait=True):
97        print "%s: getting %s" % (currentThread(), k)
98        v = ss.get_value(k, wait)
99        print "%s: %s is %s" % (currentThread(), k, v)
100
101
102    def set_val(k, v, ss):
103        print "setting %s to %s" % (k, v)
104        try:
105            ss.set_value(k, v)
106        except synch_store.CollisionError, e:
107            print "Error: %s" %e
108            return
109        print "Done with set"
110
111    def del_val(k, ss):
112        print "deleteing %s" % k
113        try:
114            ss.del_value(k)
115        except synch_store.BadDeletionError, e:
116            print "Error: %s" %e
117            return
118        print "Done with del"
119
120
121    ss = synch_store()
122    ss.set_value('b', 'initial')
123    print ss
124
125    for i in range(0,10):
126        t = Thread(target=get_val, args=('a', ss, i % 2 == 1))
127        t.start()
128    t = Thread(target=set_val, args=('a', '1', ss))
129    t.start()
130    for i in range(0,10):
131        t = Thread(target=get_val, args=('a', ss))
132        t.start()
133
134    print ss
135    t = Thread(target=del_val, args=('a', ss))
136    t.start()
137    print ss
138
139
Note: See TracBrowser for help on using the repository browser.