Changeset af75d48
- Timestamp:
- Mar 1, 2010 10:50:00 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 9c2e4e1
- Parents:
- b78c9ea
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/synch_store.py
rb78c9ea raf75d48 20 20 class BadKeyError(Exception): pass 21 21 22 # Thrown when a variable has been revoked. 23 class RevokedKeyError(Exception): pass 24 22 25 # A value in the dict. Each has its own condition (and lock) for threads 23 26 # to wait on. … … 37 40 self.main_lock = Lock() 38 41 self.values = { } 42 self.revoked = set([]) 39 43 40 44 # Set the value in the dict. If threads are waiting, wake them. … … 44 48 45 49 self.main_lock.acquire() 50 if key in self.revoked: 51 self.main_lock.release() 52 raise synch_store.RevokedKeyError("Synch key %s revoked" % key) 46 53 if self.values.has_key(key): 47 54 v = self.values[key] … … 63 70 def get_value(self, key, wait=True): 64 71 self.main_lock.acquire() 72 if key in self.revoked: 73 self.main_lock.release() 74 raise synch_store.RevokedKeyError("Synch key %s revoked" % key) 65 75 if not self.values.has_key(key): 66 76 self.values[key] = synch_store.synch_value() … … 70 80 71 81 while v.value is None: 72 if wait: v.condition.wait() 73 else: break 82 if wait: 83 v.condition.wait() 84 v.condition.release() 85 self.main_lock.acquire() 86 if key in self.revoked: 87 self.main_lock.release() 88 raise synch_store.RevokedKeyError("Synch key %s revoked" % key) 89 else: 90 self.main_lock.release() 91 v.condition.acquire() 92 else: 93 break 74 94 rv = v.value 75 95 v.condition.release() … … 78 98 # Remove the key from the shared store. If the key is not present or has 79 99 # never been set, fail. Otherwise, wake any waiting threads (there 80 # shouldn't be any) and remove the value and key from the store. 100 # shouldn't be any) and remove the value and key from the store. Deleteing 101 # a revoked key removes its revocation. 81 102 def del_value(self, key): 82 103 self.main_lock.acquire() 104 if key in self.revoked: 105 self.revoked.remove(key) 106 self.main_lock.release() 107 else: 108 if self.values.has_key(key): 109 v = self.values[key] 110 v.condition.acquire() 111 if v.value is not None: 112 del self.values[key] 113 self.main_lock.release() 114 v.condition.notifyAll() 115 v.condition.release() 116 else: 117 self.main_lock.release() 118 v.condition.release() 119 raise synch_store.BadDeletionError("%s has not been set" \ 120 % key) 121 else: 122 self.main_lock.release() 123 raise synch_store.BadDeletionError("%s does not exist yet" \ 124 % key) 125 126 # Invlaidate this synchronization point. Those waiting for it will get 127 # exceptions, and those trying to wait for it in the future will as well. 128 def revoke_key(self, key): 129 self.main_lock.acquire() 130 self.revoked.add(key) 83 131 if self.values.has_key(key): 84 132 v = self.values[key] 133 del self.values[key] 134 self.main_lock.release() 85 135 v.condition.acquire() 86 if v.value is not None: 87 del self.values[key] 88 self.main_lock.release() 89 v.condition.notifyAll() 90 else: 91 self.main_lock.release() 92 raise synch_store.BadDeletionError("%s has not been set" % key) 136 v.condition.notifyAll() 137 v.condition.release() 93 138 else: 94 139 self.main_lock.release() 95 raise synch_store.BadDeletionError("%s does not exist yet" % key) 140 96 141 97 142 def all_keys(self): … … 145 190 def get_val(k, ss, wait=True): 146 191 print "%s: getting %s" % (currentThread(), k) 147 v = ss.get_value(k, wait) 192 try: 193 v = ss.get_value(k, wait) 194 except synch_store.RevokedKeyError, e: 195 print "Error: %s" %e 196 return 148 197 print "%s: %s is %s" % (currentThread(), k, v) 149 198 … … 156 205 print "Error: %s" %e 157 206 return 207 except synch_store.RevokedKeyError, e: 208 print "Error: %s" %e 209 return 158 210 print "Done with set" 159 211 … … 165 217 print "Error: %s" %e 166 218 return 219 except synch_store.RevokedKeyError, e: 220 print "Error: %s" %e 221 return 167 222 print "Done with del" 223 224 def rev_val(k, ss): 225 print "revoking %s" % k 226 try: 227 ss.revoke_key(k) 228 except synch_store.BadDeletionError, e: 229 print "Error: %s" %e 230 return 231 except synch_store.RevokedKeyError, e: 232 print "Error: %s" %e 233 return 234 print "Done with revoking" 168 235 169 236 … … 186 253 print ss 187 254 188 255 for i in range(0,10): 256 t = Thread(target=get_val, args=('c', ss)) 257 t.start() 258 259 260 t = Thread(target=rev_val, args=('c', ss)) 261 t.start() 262
Note: See TracChangeset
for help on using the changeset viewer.