#!/usr/local/bin/python from threading import Lock, Condition class synch_store: """ Serialized write-once dictionary. Threads reading before the value for a key is assigned wait for the assignment, though polling is an option. Keys may also be deleted, though deleting a value too closely after the set may deadlock threads waiting for the assignment. """ # Thrown if a key is assigned more than once class CollisionError(Exception): pass # Thrown if a non-existant key is deleted class BadDeletionError(Exception): pass # Thrown if a key is specified with a white space class BadKeyError(Exception): pass # Thrown when a variable has been revoked. class RevokedKeyError(Exception): pass # A value in the dict. Each has its own condition (and lock) for threads # to wait on. class synch_value: # The synch_value constructor. def __init__(self, value=None): self.value = value self.condition = Condition() # Debugging representation def __str__(self): return "value %s cond %s" % (self.value, self.condition) # The synch_store constructor. Note that the store has its own lock for # serializing additions to the dict. def __init__(self): self.main_lock = Lock() self.values = { } self.revoked = set([]) # Set the value in the dict. If threads are waiting, wake them. def set_value(self, key, value): if key.find(' ') != -1: raise self.BadKeyError("Whitespace is not allowed in keys") self.main_lock.acquire() if key in self.revoked: self.main_lock.release() raise synch_store.RevokedKeyError("Synch key %s revoked" % key) if self.values.has_key(key): v = self.values[key] v.condition.acquire() self.main_lock.release() if v.value is None: v.value = value v.condition.notifyAll() v.condition.release() else: v.condition.release() raise synch_store.CollisionError("%s already set" % key) else: self.values[key] = synch_store.synch_value(value) self.main_lock.release() # Ask for the value. If wait is true, wait until the key is set, if it is # not already. If wait is false, poll. def get_value(self, key, wait=True): self.main_lock.acquire() if key in self.revoked: self.main_lock.release() raise synch_store.RevokedKeyError("Synch key %s revoked" % key) if not self.values.has_key(key): self.values[key] = synch_store.synch_value() v = self.values[key] v.condition.acquire() self.main_lock.release() while v.value is None: if wait: v.condition.wait() v.condition.release() self.main_lock.acquire() if key in self.revoked: self.main_lock.release() raise synch_store.RevokedKeyError("Synch key %s revoked" % key) else: self.main_lock.release() v.condition.acquire() else: break rv = v.value v.condition.release() return rv # Remove the key from the shared store. If the key is not present or has # never been set, fail. Otherwise, wake any waiting threads (there # shouldn't be any) and remove the value and key from the store. Deleteing # a revoked key removes its revocation. def del_value(self, key): self.main_lock.acquire() if key in self.revoked: self.revoked.remove(key) self.main_lock.release() else: if self.values.has_key(key): v = self.values[key] v.condition.acquire() if v.value is not None: del self.values[key] self.main_lock.release() v.condition.notifyAll() v.condition.release() else: self.main_lock.release() v.condition.release() raise synch_store.BadDeletionError("%s has not been set" \ % key) else: self.main_lock.release() raise synch_store.BadDeletionError("%s does not exist yet" \ % key) # Invlaidate this synchronization point. Those waiting for it will get # exceptions, and those trying to wait for it in the future will as well. def revoke_key(self, key): self.main_lock.acquire() self.revoked.add(key) if self.values.has_key(key): v = self.values[key] del self.values[key] self.main_lock.release() v.condition.acquire() v.condition.notifyAll() v.condition.release() else: self.main_lock.release() def all_keys(self): self.main_lock.acquire() rv = [ k for k in self.values.keys()] self.main_lock.release() return rv # Save the store contents to a file in a simple way def save(self, filename): self.main_lock.acquire() f = open(filename, 'w') for k, v in self.values.items(): self.values[k].condition.acquire() if self.values[k].value is not None: print >> f, "%s %s" % ( k, str(self.values[k].value)) else: print >> f, "%s" % k self.values[k].condition.release() self.main_lock.release() f.close() # Load the store contents from a file def load(self, filename): self.main_lock.acquire() self.values = { } f = open(filename, 'r') for l in f: i = l.find(' ') if i != -1: self.values[l[:i]] = synch_store.synch_value(l[i+1:]) else: self.values[l] = synch_store.synch_value() f.close() self.main_lock.release() # A debugging representation def __str__(self): rv = "" self.main_lock.acquire() for k, v in self.values.items(): self.values[k].condition.acquire() rv += "%s: %s\n" % ( k, str(self.values[k])) self.values[k].condition.release() self.main_lock.release() return rv #debugging scaffolding, left here in case we have problems if __name__ == '__main__': from threading import currentThread, Thread from time import sleep def get_val(k, ss, wait=True): print "%s: getting %s" % (currentThread(), k) try: v = ss.get_value(k, wait) except synch_store.RevokedKeyError, e: print "Error: %s" %e return print "%s: %s is %s" % (currentThread(), k, v) def set_val(k, v, ss): print "setting %s to %s" % (k, v) try: ss.set_value(k, v) except synch_store.CollisionError, e: print "Error: %s" %e return except synch_store.RevokedKeyError, e: print "Error: %s" %e return print "Done with set" def del_val(k, ss): print "deleteing %s" % k try: ss.del_value(k) except synch_store.BadDeletionError, e: print "Error: %s" %e return except synch_store.RevokedKeyError, e: print "Error: %s" %e return print "Done with del" def rev_val(k, ss): print "revoking %s" % k try: ss.revoke_key(k) except synch_store.BadDeletionError, e: print "Error: %s" %e return except synch_store.RevokedKeyError, e: print "Error: %s" %e return print "Done with revoking" ss = synch_store() ss.set_value('b', 'initial') print ss for i in range(0,10): t = Thread(target=get_val, args=('a', ss, i % 2 == 1)) t.start() t = Thread(target=set_val, args=('a', '1', ss)) t.start() for i in range(0,10): t = Thread(target=get_val, args=('a', ss)) t.start() print ss t = Thread(target=del_val, args=('a', ss)) t.start() print ss for i in range(0,10): t = Thread(target=get_val, args=('c', ss)) t.start() t = Thread(target=rev_val, args=('c', ss)) t.start()