Source code for wheezy.caching.lockout

""" ``lockout`` module.
"""

from warnings import warn

from wheezy.caching.utils import total_seconds


[docs]class Locker(object): """Used to define lockout terms.""" def __init__( self, cache, forbid_action, namespace=None, key_prefix="c", **terms ): self.cache = cache self.forbid_action = forbid_action self.namespace = namespace self.key_prefix = key_prefix self.terms = terms
[docs] def define(self, name, **terms): """Defines a new lockout with given `name` and `terms`. The `terms` keys must correspond to `known terms` of locker. """ if not terms: # pragma: nocover warn("Locker: no terms", stacklevel=2) key_prefix = "%s:%s:" % (self.key_prefix, name.replace(" ", "_")) counters = [self.terms[t](**terms[t]) for t in terms] return Lockout( name, counters, self.forbid_action, self.cache, self.namespace, key_prefix, )
[docs]class NullLocker(object): """Null locker implementation.""" def __init__( self, cache, forbid_action, namespace=None, key_prefix="c", **terms ): pass def define(self, name, **terms): if not terms: # pragma: nocover warn("NullLocker: no terms", stacklevel=2) return NullLockout()
[docs]class Counter(object): """A container of various attributes used by lockout.""" def __init__( self, key_func, count, period, duration, reset=True, alert=None ): self.key_func = key_func self.count = count self.period = total_seconds(period) self.duration = total_seconds(duration) self.reset = reset self.alert = alert
[docs]class Lockout(object): """A lockout is used to enforce terms of use policy.""" def __init__( self, name, counters, forbid_action, cache, namespace, key_prefix ): self.name = name self.counters = counters self.cache = cache self.namespace = namespace self.key_prefix = key_prefix self.forbid_action = forbid_action
[docs] def guard(self, func): """A guard decorator is applied to a `func` which returns a boolean indicating success or failure. Each failure is a subject to increase counter. The counters that support `reset` (and related locks) are deleted on success. """ def guard_wrapper(ctx, *args, **kwargs): succeed = func(ctx, *args, **kwargs) if succeed: self.reset(ctx) else: self.incr(ctx) return succeed return guard_wrapper
[docs] def quota(self, func): """A quota decorator is applied to a `func` which returns a boolean indicating success or failure. Each success is a subject to increase counter. """ def quota_wrapper(ctx, *args, **kwargs): succeed = func(ctx, *args, **kwargs) if succeed: self.incr(ctx) return succeed return quota_wrapper
[docs] def forbid_locked(self, wrapped=None, action=None): """A decorator that forbids access (by a call to `forbid_action`) to `func` once the counter threshold is reached (lock is set). You can override default forbid action by `action`. See `test_lockout.py` for an example. """ action = action or self.forbid_action assert action def decorate(func): key_prefix = "lock:" + self.key_prefix def forbid_locked_wrapper(ctx, *args, **kwargs): locks = self.cache.get_multi( [key_prefix + c.key_func(ctx) for c in self.counters], self.namespace, ) if locks: return action(ctx) return func(ctx, *args, **kwargs) return forbid_locked_wrapper if wrapped is None: return decorate else: return decorate(wrapped)
[docs] def reset(self, ctx): """Removes locks for counters that support reset.""" key_prefix = self.key_prefix keys = [key_prefix + c.key_func(ctx) for c in self.counters if c.reset] keys.extend(["lock:" + key for key in keys]) keys and self.cache.delete_multi(keys, 0, self.namespace)
[docs] def force_reset(self, ctx): """Removes locks for all counters.""" key_prefix = self.key_prefix keys = [key_prefix + c.key_func(ctx) for c in self.counters] keys.extend(["lock:" + key for key in keys]) keys and self.cache.delete_multi(keys, 0, self.namespace)
[docs] def incr(self, ctx): """Increments lockout counters for given context.""" key_prefix = self.key_prefix for c in self.counters: key = key_prefix + c.key_func(ctx) max_try = ( self.cache.add(key, 1, c.period, self.namespace) and 1 or self.cache.incr(key, 1, self.namespace) ) # print("%s ~ %d" % (key, max_try)) if max_try >= c.count: self.cache.delete(key, 0, self.namespace) self.cache.add("lock:" + key, 1, c.duration, self.namespace) c.alert and c.alert(ctx, self.name, c)
[docs]class NullLockout(object): """Null lockout implementation.""" def guard(self, func): return func def quota(self, func): return func def forbid_locked(self, wrapped=None, action=None): def decorate(func): return func if wrapped is None: return decorate else: return wrapped def reset(self, ctx): pass def force_reset(self, ctx): pass def incr(self, ctx): pass