Changeset af75d48


Ignore:
Timestamp:
Mar 1, 2010 10:50:00 AM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
9c2e4e1
Parents:
b78c9ea
Message:

Add the ability to revoke a synch key

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/synch_store.py

    rb78c9ea raf75d48  
    2020    class BadKeyError(Exception): pass
    2121
     22    # Thrown when a variable has been revoked.
     23    class RevokedKeyError(Exception): pass
     24
    2225    # A value in the dict.  Each has its own condition (and lock) for threads
    2326    # to wait on.
     
    3740        self.main_lock = Lock()
    3841        self.values = { }
     42        self.revoked = set([])
    3943
    4044    # Set the value in the dict.  If threads are waiting, wake them.
     
    4448
    4549        self.main_lock.acquire()
     50        if key in self.revoked:
     51            self.main_lock.release()
     52            raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
    4653        if self.values.has_key(key):
    4754            v = self.values[key]
     
    6370    def get_value(self, key, wait=True):
    6471        self.main_lock.acquire()
     72        if key in self.revoked:
     73            self.main_lock.release()
     74            raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
    6575        if not self.values.has_key(key):
    6676            self.values[key] = synch_store.synch_value()
     
    7080
    7181        while v.value is None:
    72             if wait: v.condition.wait()
    73             else: break
     82            if wait:
     83                v.condition.wait()
     84                v.condition.release()
     85                self.main_lock.acquire()
     86                if key in self.revoked:
     87                    self.main_lock.release()
     88                    raise synch_store.RevokedKeyError("Synch key %s revoked" % key)
     89                else:
     90                    self.main_lock.release()
     91                v.condition.acquire()
     92            else:
     93                break
    7494        rv = v.value
    7595        v.condition.release()
     
    7898    # Remove the key from the shared store.  If the key is not present or has
    7999    # never been set, fail.  Otherwise, wake any waiting threads (there
    80     # shouldn't be any) and remove the value and key from the store.
     100    # shouldn't be any) and remove the value and key from the store.  Deleteing
     101    # a revoked key removes its revocation.
    81102    def del_value(self, key):
    82103        self.main_lock.acquire()
     104        if key in self.revoked:
     105            self.revoked.remove(key)
     106            self.main_lock.release()
     107        else:
     108            if self.values.has_key(key):
     109                v = self.values[key]
     110                v.condition.acquire()
     111                if v.value is not None:
     112                    del self.values[key]
     113                    self.main_lock.release()
     114                    v.condition.notifyAll()
     115                    v.condition.release()
     116                else:
     117                    self.main_lock.release()
     118                    v.condition.release()
     119                    raise synch_store.BadDeletionError("%s has not been set" \
     120                            % key)
     121            else:
     122                self.main_lock.release()
     123                raise synch_store.BadDeletionError("%s does not exist yet" \
     124                        % key)
     125
     126    # Invlaidate this synchronization point.  Those waiting for it will get
     127    # exceptions, and those trying to wait for it in the future will as well.
     128    def revoke_key(self, key):
     129        self.main_lock.acquire()
     130        self.revoked.add(key)
    83131        if self.values.has_key(key):
    84132            v = self.values[key]
     133            del self.values[key]
     134            self.main_lock.release()
    85135            v.condition.acquire()
    86             if v.value is not None:
    87                 del self.values[key]
    88                 self.main_lock.release()
    89                 v.condition.notifyAll()
    90             else:
    91                 self.main_lock.release()
    92                 raise synch_store.BadDeletionError("%s has not been set" % key)
     136            v.condition.notifyAll()
     137            v.condition.release()
    93138        else:
    94139            self.main_lock.release()
    95             raise synch_store.BadDeletionError("%s does not exist yet" % key)
     140
    96141
    97142    def all_keys(self):
     
    145190    def get_val(k, ss, wait=True):
    146191        print "%s: getting %s" % (currentThread(), k)
    147         v = ss.get_value(k, wait)
     192        try:
     193            v = ss.get_value(k, wait)
     194        except synch_store.RevokedKeyError, e:
     195            print "Error: %s" %e
     196            return
    148197        print "%s: %s is %s" % (currentThread(), k, v)
    149198
     
    156205            print "Error: %s" %e
    157206            return
     207        except synch_store.RevokedKeyError, e:
     208            print "Error: %s" %e
     209            return
    158210        print "Done with set"
    159211
     
    165217            print "Error: %s" %e
    166218            return
     219        except synch_store.RevokedKeyError, e:
     220            print "Error: %s" %e
     221            return
    167222        print "Done with del"
     223
     224    def rev_val(k, ss):
     225        print "revoking %s" % k
     226        try:
     227            ss.revoke_key(k)
     228        except synch_store.BadDeletionError, e:
     229            print "Error: %s" %e
     230            return
     231        except synch_store.RevokedKeyError, e:
     232            print "Error: %s" %e
     233            return
     234        print "Done with revoking"
    168235
    169236
     
    186253    print ss
    187254
    188 
     255    for i in range(0,10):
     256        t = Thread(target=get_val, args=('c', ss))
     257        t.start()
     258
     259   
     260    t = Thread(target=rev_val, args=('c', ss))
     261    t.start()
     262
Note: See TracChangeset for help on using the changeset viewer.