#!/usr/local/bin/python from threading import Lock, Condition class synch_store: """ Serialized write-once dictionary. Threads reading before the value for a key is assigned wait for the assignment, though polling is an option. Keys may also be deleted, though deleting a value too closely after the set may deadlock threads waiting for the assignment. """ class CollisionError(Exception): pass class BadDeletionError(Exception): pass class synch_value: def __init__(self, key, value=None): self.key = key self.value = value self.condition = Condition() def __str__(self): return "key %s value %s cond %s" % \ (self.key, self.value, self.condition) def __init__(self): self.main_lock = Lock() self.values = { } def set_value(self, key, value): self.main_lock.acquire() if self.values.has_key(key): v = self.values[key] v.condition.acquire() self.main_lock.release() if v.value is None: v.value = value v.condition.notifyAll() v.condition.release() else: v.condition.release() raise synch_store.CollisionError("%s already set" % key) else: self.values[key] = synch_store.synch_value(key, value) self.main_lock.release() def get_value(self, key, wait=True): self.main_lock.acquire() if not self.values.has_key(key): self.values[key] = synch_store.synch_value(key) v = self.values[key] v.condition.acquire() self.main_lock.release() while v.value is None: if wait: v.condition.wait() else: break rv = v.value v.condition.release() return rv def del_value(self, key): self.main_lock.acquire() if self.values.has_key(key): v = self.values[key] v.condition.acquire() if v.value is not None: del self.values[key] self.main_lock.release() v.condition.notifyAll() else: self.main_lock.release() raise synch_store.BadDeletionError("%s has not been set" % key) else: self.main_lock.release() raise synch_store.BadDeletionError("%s does not exist yet" % key) def __str__(self): rv = "" self.main_lock.acquire() for k, v in self.values.items(): self.values[k].condition.acquire() rv += "%s: %s" % ( k, str(self.values[k])) self.values[k].condition.release() self.main_lock.release() return rv #debugging scaffolding, left here in case we have problems if __name__ == '__main__': from threading import currentThread, Thread from time import sleep def get_val(k, ss, wait=True): print "%s: getting %s" % (currentThread(), k) v = ss.get_value(k, wait) print "%s: %s is %s" % (currentThread(), k, v) def set_val(k, v, ss): print "setting %s to %s" % (k, v) try: ss.set_value(k, v) except synch_store.CollisionError, e: print "Error: %s" %e return print "Done with set" def del_val(k, ss): print "deleteing %s" % k try: ss.del_value(k) except synch_store.BadDeletionError, e: print "Error: %s" %e return print "Done with del" ss = synch_store() ss.set_value('b', 'initial') print ss for i in range(0,10): t = Thread(target=get_val, args=('a', ss, i % 2 == 1)) t.start() t = Thread(target=set_val, args=('a', '1', ss)) t.start() for i in range(0,10): t = Thread(target=get_val, args=('a', ss)) t.start() print ss t = Thread(target=del_val, args=('a', ss)) t.start() print ss