Merge dogpile.core into dogpile.cache.

The dogpile.core package itself is EOLed.   The features
within it are now moved underneath the "dogpile"
and "dogpile.util" namespaces, and the namespace packaging
directives are removed.   dogpile.cache has no
dependencies on "dogpile.core" as a namespace any longer,
though it does provide dogpile/core.py for backwards
compatibility.

fixes #91

Co-authored-by: Mike Bayer <mike_mp@zzzcomputing.com>
Change-Id: Ia1ca428616073755aec74c2ac4780cd634092ca8
Pull-request: https://bitbucket.org/zzzeek/dogpile.cache/pull-requests/48
This commit is contained in:
Morgan Fainberg 2016-05-26 10:33:44 -04:00 committed by Mike Bayer
parent a0ad86064b
commit 761dc0a9e4
40 changed files with 1538 additions and 503 deletions

View File

@ -1,4 +1,4 @@
Copyright (c) 2011-2015 Mike Bayer
Copyright (c) 2011-2016 Mike Bayer
All rights reserved.

View File

@ -1,7 +1,7 @@
recursive-include docs *.html *.css *.txt *.js *.jpg *.png *.py Makefile *.rst *.sty
recursive-include tests *.py *.dat
include README* LICENSE distribute_setup.py CHANGES* test.cfg
include README* LICENSE CHANGES*
prune docs/build/output

View File

@ -1,18 +1,31 @@
dogpile.cache
=============
dogpile
=======
A caching API built around the concept of a "dogpile lock", which allows
continued access to an expiring data value while a single thread generates a
new value.
Dogpile consists of two subsystems, one building on top of the other.
dogpile.cache builds on the `dogpile.core <http://pypi.python.org/pypi/dogpile.core>`_
locking system, which implements the idea of "allow one creator to write while
others read" in the abstract. Overall, dogpile.cache is intended as a
replacement to the `Beaker <http://beaker.groovie.org>`_ caching system, the internals
of which are written by the same author. All the ideas of Beaker which "work"
are re-implemented in dogpile.cache in a more efficient and succinct manner,
and all the cruft (Beaker's internals were first written in 2005) relegated
to the trash heap.
``dogpile`` provides the concept of a "dogpile lock", a control structure
which allows a single thread of execution to be selected as the "creator" of
some resource, while allowing other threads of execution to refer to the previous
version of this resource as the creation proceeds; if there is no previous
version, then those threads block until the object is available.
``dogpile.cache`` is a caching API which provides a generic interface to
caching backends of any variety, and additionally provides API hooks which
integrate these cache backends with the locking mechanism of ``dogpile``.
Overall, dogpile.cache is intended as a replacement to the `Beaker
<http://beaker.groovie.org>`_ caching system, the internals of which are
written by the same author. All the ideas of Beaker which "work" are re-
implemented in dogpile.cache in a more efficient and succinct manner, and all
the cruft (Beaker's internals were first written in 2005) relegated to the
trash heap.
Documentation
-------------
See dogpile.cache's full documentation at
`dogpile.cache documentation <http://dogpilecache.readthedocs.org>`_. The
sections below provide a brief synopsis of the ``dogpile`` packages.
Features
--------
@ -47,45 +60,6 @@ Features
* Included backends feature three memcached backends (python-memcached, pylibmc,
bmemcached), a Redis backend, a backend based on Python's
anydbm, and a plain dictionary backend.
* Space for third party plugins, including the first which provides the
* Space for third party plugins, including one which provides the
dogpile.cache engine to Mako templates.
* Python 3 compatible in place - no 2to3 required.
Synopsis
--------
dogpile.cache features a single public usage object known as the ``CacheRegion``.
This object then refers to a particular ``CacheBackend``. Typical usage
generates a region using ``make_region()``, which can then be used at the
module level to decorate functions, or used directly in code with a traditional
get/set interface. Configuration of the backend is applied to the region
using ``configure()`` or ``configure_from_config()``, allowing deferred
config-file based configuration to occur after modules have been imported::
from dogpile.cache import make_region
region = make_region().configure(
'dogpile.cache.pylibmc',
expiration_time = 3600,
arguments = {
'url':["127.0.0.1"],
'binary':True,
'behaviors':{"tcp_nodelay": True,"ketama":True}
}
)
@region.cache_on_arguments()
def load_user_info(user_id):
return some_database.lookup_user_by_id(user_id)
Documentation
-------------
See dogpile.cache's full documentation at
`dogpile.cache documentation <http://dogpilecache.readthedocs.org>`_.

16
docs/build/api.rst vendored
View File

@ -2,6 +2,7 @@
API
===
Region
======
@ -59,3 +60,18 @@ Utilities
.. autofunction:: length_conditional_mangler
dogpile Core
============
.. autoclass:: dogpile.Lock
:members:
.. autoclass:: dogpile.NeedRegenerationException
:members:
.. autoclass:: dogpile.util.ReadWriteMutex
:members:
.. autoclass:: dogpile.util.NameRegistry
:members:

View File

@ -4,6 +4,22 @@ Changelog
.. changelog::
:version: 0.6.0
.. change::
:tags: feature
:tickets: 91
The ``dogpile.core`` library has been rolled in as part of the
``dogpile.cache`` distribution. The configuration of the ``dogpile``
name as a namespace package is also removed from ``dogpile.cache``.
In order to allow existing installations of ``dogpile.core`` as a separate
package to remain unaffected, the ``.core`` package has been retired
within ``dogpile.cache`` directly; the :class:`.Lock` class is now
available directly as ``dogpile.Lock`` and the additional ``dogpile.core``
constructs are under the ``dogpile.util`` namespace.
Additionally, the long-deprecated ``dogpile.core.Dogpile`` and
``dogpile.core.SyncReaderDogpile`` classes have been removed.
.. change::
:tags: bug

8
docs/build/conf.py vendored
View File

@ -23,7 +23,7 @@ import sys, os
# absolute, like shown here.
sys.path.insert(0, os.path.abspath('../../'))
import dogpile.cache
import dogpile
# -- General configuration -----------------------------------------------------
@ -52,16 +52,16 @@ master_doc = 'index'
# General information about the project.
project = u'dogpile.cache'
copyright = u'2011-2015 Mike Bayer'
copyright = u'2011-2016 Mike Bayer'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = dogpile.cache.__version__
version = dogpile.__version__
# The full version, including alpha/beta/rc tags.
release = dogpile.cache.__version__
release = dogpile.__version__
# The language for content autogenerated by Sphinx. Refer to documentation

248
docs/build/core_usage.rst vendored Normal file
View File

@ -0,0 +1,248 @@
============
dogpile Core
============
``dogpile`` provides a locking interface around a "value creation" and
"value retrieval" pair of functions.
.. versionchanged:: 0.6.0 The ``dogpile`` package encapsulates the
functionality that was previously provided by the separate
``dogpile.core`` package.
The primary interface is the :class:`.Lock` object, which provides for
the invocation of the creation function by only one thread and/or process at
a time, deferring all other threads/processes to the "value retrieval" function
until the single creation thread is completed.
Do I Need to Learn the dogpile Core API Directly?
=================================================
It's anticipated that most users of ``dogpile`` will be using it indirectly
via the ``dogpile.cache`` caching
front-end. If you fall into this category, then the short answer is no.
Using the core ``dogpile`` APIs described here directly implies you're building your own
resource-usage system outside, or in addition to, the one
``dogpile.cache`` provides.
Rudimentary Usage
==================
The primary API dogpile provides is the :class:`.Lock` object. This object allows for
functions that provide mutexing, value creation, as well as value retrieval.
An example usage is as follows::
from dogpile import Lock, NeedRegenerationException
import threading
import time
# store a reference to a "resource", some
# object that is expensive to create.
the_resource = [None]
def some_creation_function():
# call a value creation function
value = create_some_resource()
# get creationtime using time.time()
creationtime = time.time()
# keep track of the value and creation time in the "cache"
the_resource[0] = tup = (value, creationtime)
# return the tuple of (value, creationtime)
return tup
def retrieve_resource():
# function that retrieves the resource and
# creation time.
# if no resource, then raise NeedRegenerationException
if the_resource[0] is None:
raise NeedRegenerationException()
# else return the tuple of (value, creationtime)
return the_resource[0]
# a mutex, which needs here to be shared across all invocations
# of this particular creation function
mutex = threading.Lock()
with Lock(mutex, some_creation_function, retrieve_resource, 3600) as value:
# some function that uses
# the resource. Won't reach
# here until some_creation_function()
# has completed at least once.
value.do_something()
Above, ``some_creation_function()`` will be called
when :class:`.Lock` is first invoked as a context manager. The value returned by this
function is then passed into the ``with`` block, where it can be used
by application code. Concurrent threads which
call :class:`.Lock` during this initial period
will be blocked until ``some_creation_function()`` completes.
Once the creation function has completed successfully the first time,
new calls to :class:`.Lock` will call ``retrieve_resource()``
in order to get the current cached value as well as its creation
time; if the creation time is older than the current time minus
an expiration time of 3600, then ``some_creation_function()``
will be called again, but only by one thread/process, using the given
mutex object as a source of synchronization. Concurrent threads/processes
which call :class:`.Lock` during this period will fall through,
and not be blocked; instead, the "stale" value just returned by
``retrieve_resource()`` will continue to be returned until the creation
function has finished.
The :class:`.Lock` API is designed to work with simple cache backends
like Memcached. It addresses such issues as:
* Values can disappear from the cache at any time, before our expiration
time is reached. The :class:`.NeedRegenerationException` class is used
to alert the :class:`.Lock` object that a value needs regeneration ahead
of the usual expiration time.
* There's no function in a Memcached-like system to "check" for a key without
actually retrieving it. The usage of the ``retrieve_resource()`` function
allows that we check for an existing key and also return the existing value,
if any, at the same time, without the need for two separate round trips.
* The "creation" function used by :class:`.Lock` is expected to store the
newly created value in the cache, as well as to return it. This is also
more efficient than using two separate round trips to separately store,
and re-retrieve, the object.
.. _caching_decorator:
Example: Using dogpile directly for Caching
===========================================
The following example approximates Beaker's "cache decoration" function, to
decorate any function and store the value in Memcached. Note that
normally, **we'd just use dogpile.cache here**, however for the purposes
of example, we'll illustrate how the :class:`.Lock` object is used
directly.
We create a Python decorator function called ``cached()`` which will provide
caching for the output of a single function. It's given the "key" which we'd
like to use in Memcached, and internally it makes usage of :class:`.Lock`,
along with a thread based mutex (we'll see a distributed mutex in the next
section)::
import pylibmc
import threading
import time
from dogpile import Lock, NeedRegenerationException
mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
def cached(key, expiration_time):
"""A decorator that will cache the return value of a function
in memcached given a key."""
mutex = threading.Lock()
def get_value():
with mc_pool.reserve() as mc:
value_plus_time = mc.get(key)
if value_plus_time is None:
raise NeedRegenerationException()
# return a tuple (value, createdtime)
return value_plus_time
def decorate(fn):
def gen_cached():
value = fn()
with mc_pool.reserve() as mc:
# create a tuple (value, createdtime)
value_plus_time = (value, time.time())
mc.put(key, value_plus_time)
return value_plus_time
def invoke():
with Lock(mutex, gen_cached, get_value, expiration_time) as value:
return value
return invoke
return decorate
Using the above, we can decorate any function as::
@cached("some key", 3600)
def generate_my_expensive_value():
return slow_database.lookup("stuff")
The :class:`.Lock` object will ensure that only one thread at a time performs
``slow_database.lookup()``, and only every 3600 seconds, unless Memcached has
removed the value, in which case it will be called again as needed.
In particular, dogpile.core's system allows us to call the memcached get()
function at most once per access, instead of Beaker's system which calls it
twice, and doesn't make us call get() when we just created the value.
For the mutex object, we keep a ``threading.Lock`` object that's local
to the decorated function, rather than using a global lock. This localizes
the in-process locking to be local to this one decorated function. In the next section,
we'll see the usage of a cross-process lock that accomplishes this differently.
Using a File or Distributed Lock with Dogpile
==============================================
The examples thus far use a ``threading.Lock()`` object for synchronization.
If our application uses multiple processes, we will want to coordinate creation
operations not just on threads, but on some mutex that other processes can access.
In this example we'll use a file-based lock as provided by the `lockfile
<http://pypi.python.org/pypi/lockfile>`_ package, which uses a unix-symlink
concept to provide a filesystem-level lock (which also has been made
threadsafe). Another strategy may base itself directly off the Unix
``os.flock()`` call, or use an NFS-safe file lock like `flufl.lock
<http://pypi.python.org/pypi/flufl.lock>`_, and still another approach is to
lock against a cache server, using a recipe such as that described at `Using
Memcached as a Distributed Locking Service <http://www.regexprn.com/2010/05
/using-memcached-as-distributed-locking.html>`_.
What all of these locking schemes have in common is that unlike the Python
``threading.Lock`` object, they all need access to an actual key which acts as
the symbol that all processes will coordinate upon. So here, we will also
need to create the "mutex" which we pass to :class:`.Lock` using the ``key``
argument::
import lockfile
import os
from hashlib import sha1
# ... other imports and setup from the previous example
def cached(key, expiration_time):
"""A decorator that will cache the return value of a function
in memcached given a key."""
lock_path = os.path.join("/tmp", "%s.lock" % sha1(key).hexdigest())
# ... get_value() from the previous example goes here
def decorate(fn):
# ... gen_cached() from the previous example goes here
def invoke():
# create an ad-hoc FileLock
mutex = lockfile.FileLock(lock_path)
with Lock(mutex, gen_cached, get_value, expiration_time) as value:
return value
return invoke
return decorate
For a given key "some_key", we generate a hex digest of the key,
then use ``lockfile.FileLock()`` to create a lock against the file
``/tmp/53def077a4264bd3183d4eb21b1f56f883e1b572.lock``. Any number of :class:`.Lock`
objects in various processes will now coordinate with each other, using this common
filename as the "baton" against which creation of a new value proceeds.
Unlike when we used ``threading.Lock``, the file lock is ultimately locking
on a file, so multiple instances of ``FileLock()`` will all coordinate on
that same file - it's often the case that file locks that rely upon ``flock()``
require non-threaded usage, so a unique filesystem lock per thread is often a good
idea in any case.

22
docs/build/front.rst vendored
View File

@ -9,8 +9,6 @@ Project Homepage
dogpile.cache is hosted on `Bitbucket <http://bitbucket.org>`_ - the lead project page is at https://bitbucket.org/zzzeek/dogpile.cache. Source code is tracked here using Git.
.. versionchanged:: 0.5.0 Moved source repository to git.
Releases and project status are available on Pypi at http://pypi.python.org/pypi/dogpile.cache.
The most recent published version of this documentation should be at http://dogpilecache.readthedocs.org.
@ -22,27 +20,9 @@ Install released versions of dogpile.cache from the Python package index with `p
pip install dogpile.cache
Installation via source distribution is via the ``setup.py`` script::
python setup.py install
Community
=========
dogpile.cache is developed by `Mike Bayer <http://techspot.zzzeek.org>`_, and is
loosely associated with the `Pylons Project <http://www.pylonsproject.org/>`_.
As dogpile.cache's usage increases, it is anticipated that the Pylons mailing list and IRC channel
will become the primary channels for support.
Bugs
====
Bugs and feature enhancements to dogpile.cache should be reported on the `Bitbucket
issue tracker
<https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_. If you're not sure
that a particular issue is specific to either dogpile.cache or `dogpile.core <https://bitbucket.org/zzzeek/dogpile.core>`_, posting to the dogpile.cache
tracker is likely the better place to post first.
* `dogpile.cache issue tracker <https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_ (post here if unsure)
* `dogpile.core issue tracker <https://bitbucket.org/zzzeek/dogpile.core/issues?status=new&status=open>`_
<https://bitbucket.org/zzzeek/dogpile.cache/issues?status=new&status=open>`_.

25
docs/build/index.rst vendored
View File

@ -2,16 +2,21 @@
Welcome to dogpile.cache's documentation!
==========================================
`dogpile.cache <http://bitbucket.org/zzzeek/dogpile.cache>`_ provides a simple
caching pattern based on the `dogpile.core <http://pypi.python.org/pypi/dogpile.core>`_
locking system, including rudimentary backends. It effectively completes the
replacement of `Beaker <http://beaker.groovie.org>`_ as far as caching (though **not** HTTP sessions)
is concerned, providing an open-ended, simple, and higher-performing pattern to configure and use
cache backends. New backends are very easy to create
and use; users are encouraged to adapt the provided backends for their own
needs, as high volume caching requires lots of tweaks and adjustments specific
to an application and its environment.
Dogpile consists of two subsystems, one building on top of the other.
``dogpile`` provides the concept of a "dogpile lock", a control structure
which allows a single thread of execution to be selected as the "creator" of
some resource, while allowing other threads of execution to refer to the previous
version of this resource as the creation proceeds; if there is no previous
version, then those threads block until the object is available.
``dogpile.cache`` is a caching API which provides a generic interface to
caching backends of any variety, and additionally provides API hooks which
integrate these cache backends with the locking mechanism of ``dogpile``.
New backends are very easy to create and use; users are encouraged to adapt the
provided backends for their own needs, as high volume caching requires lots of
tweaks and adjustments specific to an application and its environment.
.. toctree::
@ -19,6 +24,8 @@ to an application and its environment.
front
usage
recipes
core_usage
api
changelog

254
docs/build/recipes.rst vendored Normal file
View File

@ -0,0 +1,254 @@
Recipes
=======
Invalidating a group of related keys
-------------------------------------
This recipe presents a way to track the cache keys related to a particular region,
for the purposes of invalidating a series of keys that relate to a particular id.
Three cached functions, ``user_fn_one()``, ``user_fn_two()``, ``user_fn_three()``
each perform a different function based on a ``user_id`` integer value. The
region applied to cache them uses a custom key generator which tracks each cache
key generated, pulling out the integer "id" and replacing with a template.
When all three functions have been called, the key generator is now aware of
these three keys: ``user_fn_one_%d``, ``user_fn_two_%d``, and
``user_fn_three_%d``. The ``invalidate_user_id()`` function then knows that
for a particular ``user_id``, it needs to hit all three of those keys
in order to invalidate everything having to do with that id.
::
from dogpile.cache import make_region
from itertools import count
user_keys = set()
def my_key_generator(namespace, fn):
fname = fn.__name__
def generate_key(*arg):
# generate a key template:
# "fname_%d_arg1_arg2_arg3..."
key_template = fname + "_" + \
"%d" + \
"_".join(str(s) for s in arg[1:])
# store key template
user_keys.add(key_template)
# return cache key
user_id = arg[0]
return key_template % user_id
return generate_key
def invalidate_user_id(region, user_id):
for key in user_keys:
region.delete(key % user_id)
region = make_region(
function_key_generator=my_key_generator
).configure(
"dogpile.cache.memory"
)
counter = count()
@region.cache_on_arguments()
def user_fn_one(user_id):
return "user fn one: %d, %d" % (next(counter), user_id)
@region.cache_on_arguments()
def user_fn_two(user_id):
return "user fn two: %d, %d" % (next(counter), user_id)
@region.cache_on_arguments()
def user_fn_three(user_id):
return "user fn three: %d, %d" % (next(counter), user_id)
print user_fn_one(5)
print user_fn_two(5)
print user_fn_three(7)
print user_fn_two(7)
invalidate_user_id(region, 5)
print "invalidated:"
print user_fn_one(5)
print user_fn_two(5)
print user_fn_three(7)
print user_fn_two(7)
Asynchronous Data Updates with ORM Events
-----------------------------------------
This recipe presents one technique of optimistically pushing new data
into the cache when an update is sent to a database.
Using SQLAlchemy for database querying, suppose a simple cache-decorated
function returns the results of a database query::
@region.cache_on_arguments()
def get_some_data(argument):
# query database to get data
data = Session().query(DBClass).filter(DBClass.argument == argument).all()
return data
We would like this particular function to be re-queried when the data
has changed. We could call ``get_some_data.invalidate(argument, hard=False)``
at the point at which the data changes, however this only
leads to the invalidation of the old value; a new value is not generated until
the next call, and also means at least one client has to block while the
new value is generated. We could also call
``get_some_data.refresh(argument)``, which would perform the data refresh
at that moment, but then the writer is delayed by the re-query.
A third variant is to instead offload the work of refreshing for this query
into a background thread or process. This can be acheived using
a system such as the :paramref:`.CacheRegion.async_creation_runner`.
However, an expedient approach for smaller use cases is to link cache refresh
operations to the ORM session's commit, as below::
from sqlalchemy import event
from sqlalchemy.orm import Session
def cache_refresh(session, refresher, *args, **kwargs):
"""
Refresh the functions cache data in a new thread. Starts refreshing only
after the session was committed so all database data is available.
"""
assert isinstance(session, Session), \
"Need a session, not a sessionmaker or scoped_session"
@event.listens_for(session, "after_commit")
def do_refresh(session):
t = Thread(target=refresher, args=args, kwargs=kwargs)
t.daemon = True
t.start()
Within a sequence of data persistence, ``cache_refresh`` can be called
given a particular SQLAlchemy ``Session`` and a callable to do the work::
def add_new_data(session, argument):
# add some data
session.add(something_new(argument))
# add a hook to refresh after the Session is committed.
cache_refresh(session, get_some_data.refresh, argument)
Note that the event to refresh the data is associated with the ``Session``
being used for persistence; however, the actual refresh operation is called
with a **different** ``Session``, typically one that is local to the refresh
operation, either through a thread-local registry or via direct instantiation.
Prefixing all keys in Redis
---------------------------
If you use a redis instance as backend that contains other keys besides the ones
set by dogpile.cache, it is a good idea to uniquely prefix all dogpile.cache
keys, to avoid potential collisions with keys set by your own code. This can
easily be done using a key mangler function::
from dogpile.cache import make_region
region = make_region(
key_mangler=lambda key: "myapp:dogpile:" + key
)
Encoding/Decoding data into another format
------------------------------------------
.. sidebar:: A Note on Data Encoding
Under the hood, dogpile.cache wraps cached data in an instance of
``dogpile.cache.api.CachedValue`` and then pickles that data for storage
along with some bookkeeping metadata. If you implement a ProxyBackend to
encode/decode data, that transformation will happen on the pre-pickled data-
dogpile does not store the data 'raw' and will still pass a pickled payload
to the backend. This behavior can negate the hopeful improvements of some
encoding schemes.
Since dogpile is managing cached data, you may be concerned with the size of
your payloads. A possible method of helping minimize payloads is to use a
ProxyBackend to recode the data on-the-fly or otherwise transform data as it
enters or leaves persistent storage.
In the example below, we define 2 classes to implement msgpack encoding. Msgpack
(http://msgpack.org/) is a serialization format that works exceptionally well
with json-like data and can serialize nested dicts into a much smaller payload
than Python's own pickle. ``_EncodedProxy`` is our base class
for building data encoders, and inherits from dogpile's own `ProxyBackend`. You
could just use one class. This class passes 4 of the main `key/value` functions
into a configurable decoder and encoder. The ``MsgpackProxy`` class simply
inherits from ``_EncodedProxy`` and implements the necessary ``value_decode``
and ``value_encode`` functions.
Encoded ProxyBackend Example::
from dogpile.cache.proxy import ProxyBackend
import msgpack
class _EncodedProxy(ProxyBackend):
"""base class for building value-mangling proxies"""
def value_decode(self, value):
raise NotImplementedError("override me")
def value_encode(self, value):
raise NotImplementedError("override me")
def set(self, k, v):
v = self.value_encode(v)
self.proxied.set(k, v)
def get(self, key):
v = self.proxied.get(key)
return self.value_decode(v)
def set_multi(self, mapping):
"""encode to a new dict to preserve unencoded values in-place when
called by `get_or_create_multi`
"""
mapping_set = {}
for (k, v) in mapping.iteritems():
mapping_set[k] = self.value_encode(v)
return self.proxied.set_multi(mapping_set)
def get_multi(self, keys):
results = self.proxied.get_multi(keys)
translated = []
for record in results:
try:
translated.append(self.value_decode(record))
except Exception as e:
raise
return translated
class MsgpackProxy(_EncodedProxy):
"""custom decode/encode for value mangling"""
def value_decode(self, v):
if not v or v is NO_VALUE:
return NO_VALUE
# you probably want to specify a custom decoder via `object_hook`
v = msgpack.unpackb(payload, encoding="utf-8")
return CachedValue(*v)
def value_encode(self, v):
# you probably want to specify a custom encoder via `default`
v = msgpack.packb(payload, use_bin_type=True)
return v
# extend our region configuration from above with a 'wrap'
region = make_region().configure(
'dogpile.cache.pylibmc',
expiration_time = 3600,
arguments = {
'url': ["127.0.0.1"],
},
wrap = [MsgpackProxy, ]
)

254
docs/build/usage.rst vendored
View File

@ -268,257 +268,3 @@ requests on behalf of the original dogpile.cache.pylibmc backend.
.. versionadded:: 0.4.4 Added support for the :class:`.ProxyBackend` class.
Recipes
=======
Invalidating a group of related keys
-------------------------------------
This recipe presents a way to track the cache keys related to a particular region,
for the purposes of invalidating a series of keys that relate to a particular id.
Three cached functions, ``user_fn_one()``, ``user_fn_two()``, ``user_fn_three()``
each perform a different function based on a ``user_id`` integer value. The
region applied to cache them uses a custom key generator which tracks each cache
key generated, pulling out the integer "id" and replacing with a template.
When all three functions have been called, the key generator is now aware of
these three keys: ``user_fn_one_%d``, ``user_fn_two_%d``, and
``user_fn_three_%d``. The ``invalidate_user_id()`` function then knows that
for a particular ``user_id``, it needs to hit all three of those keys
in order to invalidate everything having to do with that id.
::
from dogpile.cache import make_region
from itertools import count
user_keys = set()
def my_key_generator(namespace, fn):
fname = fn.__name__
def generate_key(*arg):
# generate a key template:
# "fname_%d_arg1_arg2_arg3..."
key_template = fname + "_" + \
"%d" + \
"_".join(str(s) for s in arg[1:])
# store key template
user_keys.add(key_template)
# return cache key
user_id = arg[0]
return key_template % user_id
return generate_key
def invalidate_user_id(region, user_id):
for key in user_keys:
region.delete(key % user_id)
region = make_region(
function_key_generator=my_key_generator
).configure(
"dogpile.cache.memory"
)
counter = count()
@region.cache_on_arguments()
def user_fn_one(user_id):
return "user fn one: %d, %d" % (next(counter), user_id)
@region.cache_on_arguments()
def user_fn_two(user_id):
return "user fn two: %d, %d" % (next(counter), user_id)
@region.cache_on_arguments()
def user_fn_three(user_id):
return "user fn three: %d, %d" % (next(counter), user_id)
print user_fn_one(5)
print user_fn_two(5)
print user_fn_three(7)
print user_fn_two(7)
invalidate_user_id(region, 5)
print "invalidated:"
print user_fn_one(5)
print user_fn_two(5)
print user_fn_three(7)
print user_fn_two(7)
Asynchronous Data Updates with ORM Events
-----------------------------------------
This recipe presents one technique of optimistically pushing new data
into the cache when an update is sent to a database.
Using SQLAlchemy for database querying, suppose a simple cache-decorated
function returns the results of a database query::
@region.cache_on_arguments()
def get_some_data(argument):
# query database to get data
data = Session().query(DBClass).filter(DBClass.argument == argument).all()
return data
We would like this particular function to be re-queried when the data
has changed. We could call ``get_some_data.invalidate(argument, hard=False)``
at the point at which the data changes, however this only
leads to the invalidation of the old value; a new value is not generated until
the next call, and also means at least one client has to block while the
new value is generated. We could also call
``get_some_data.refresh(argument)``, which would perform the data refresh
at that moment, but then the writer is delayed by the re-query.
A third variant is to instead offload the work of refreshing for this query
into a background thread or process. This can be acheived using
a system such as the :paramref:`.CacheRegion.async_creation_runner`.
However, an expedient approach for smaller use cases is to link cache refresh
operations to the ORM session's commit, as below::
from sqlalchemy import event
from sqlalchemy.orm import Session
def cache_refresh(session, refresher, *args, **kwargs):
"""
Refresh the functions cache data in a new thread. Starts refreshing only
after the session was committed so all database data is available.
"""
assert isinstance(session, Session), \
"Need a session, not a sessionmaker or scoped_session"
@event.listens_for(session, "after_commit")
def do_refresh(session):
t = Thread(target=refresher, args=args, kwargs=kwargs)
t.daemon = True
t.start()
Within a sequence of data persistence, ``cache_refresh`` can be called
given a particular SQLAlchemy ``Session`` and a callable to do the work::
def add_new_data(session, argument):
# add some data
session.add(something_new(argument))
# add a hook to refresh after the Session is committed.
cache_refresh(session, get_some_data.refresh, argument)
Note that the event to refresh the data is associated with the ``Session``
being used for persistence; however, the actual refresh operation is called
with a **different** ``Session``, typically one that is local to the refresh
operation, either through a thread-local registry or via direct instantiation.
Prefixing all keys in Redis
---------------------------
If you use a redis instance as backend that contains other keys besides the ones
set by dogpile.cache, it is a good idea to uniquely prefix all dogpile.cache
keys, to avoid potential collisions with keys set by your own code. This can
easily be done using a key mangler function::
from dogpile.cache import make_region
region = make_region(
key_mangler=lambda key: "myapp:dogpile:" + key
)
Encoding/Decoding data into another format
------------------------------------------
.. sidebar:: A Note on Data Encoding
Under the hood, dogpile.cache wraps cached data in an instance of
``dogpile.cache.api.CachedValue`` and then pickles that data for storage
along with some bookkeeping metadata. If you implement a ProxyBackend to
encode/decode data, that transformation will happen on the pre-pickled data-
dogpile does not store the data 'raw' and will still pass a pickled payload
to the backend. This behavior can negate the hopeful improvements of some
encoding schemes.
Since dogpile is managing cached data, you may be concerned with the size of
your payloads. A possible method of helping minimize payloads is to use a
ProxyBackend to recode the data on-the-fly or otherwise transform data as it
enters or leaves persistent storage.
In the example below, we define 2 classes to implement msgpack encoding. Msgpack
(http://msgpack.org/) is a serialization format that works exceptionally well
with json-like data and can serialize nested dicts into a much smaller payload
than Python's own pickle. ``_EncodedProxy`` is our base class
for building data encoders, and inherits from dogpile's own `ProxyBackend`. You
could just use one class. This class passes 4 of the main `key/value` functions
into a configurable decoder and encoder. The ``MsgpackProxy`` class simply
inherits from ``_EncodedProxy`` and implements the necessary ``value_decode``
and ``value_encode`` functions.
Encoded ProxyBackend Example::
from dogpile.cache.proxy import ProxyBackend
import msgpack
class _EncodedProxy(ProxyBackend):
"""base class for building value-mangling proxies"""
def value_decode(self, value):
raise NotImplementedError("override me")
def value_encode(self, value):
raise NotImplementedError("override me")
def set(self, k, v):
v = self.value_encode(v)
self.proxied.set(k, v)
def get(self, key):
v = self.proxied.get(key)
return self.value_decode(v)
def set_multi(self, mapping):
"""encode to a new dict to preserve unencoded values in-place when
called by `get_or_create_multi`
"""
mapping_set = {}
for (k, v) in mapping.iteritems():
mapping_set[k] = self.value_encode(v)
return self.proxied.set_multi(mapping_set)
def get_multi(self, keys):
results = self.proxied.get_multi(keys)
translated = []
for record in results:
try:
translated.append(self.value_decode(record))
except Exception as e:
raise
return translated
class MsgpackProxy(_EncodedProxy):
"""custom decode/encode for value mangling"""
def value_decode(self, v):
if not v or v is NO_VALUE:
return NO_VALUE
# you probably want to specify a custom decoder via `object_hook`
v = msgpack.unpackb(payload, encoding="utf-8")
return CachedValue(*v)
def value_encode(self, v):
# you probably want to specify a custom encoder via `default`
v = msgpack.packb(payload, use_bin_type=True)
return v
# extend our region configuration from above with a 'wrap'
region = make_region().configure(
'dogpile.cache.pylibmc',
expiration_time = 3600,
arguments = {
'url': ["127.0.0.1"],
},
wrap = [MsgpackProxy, ]
)

View File

@ -1,6 +1,4 @@
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
__version__ = '0.6.0'
from .lock import Lock # noqa
from .lock import NeedRegenerationException # noqa

View File

@ -1,3 +1,4 @@
__version__ = '0.6.0'
from .region import CacheRegion, register_backend, make_region # noqa
# backwards compat
from .. import __version__ # noqa

View File

@ -1,5 +1,5 @@
import operator
from .compat import py3k
from ..util.compat import py3k
class NoValue(object):
@ -161,13 +161,13 @@ class CacheBackend(object):
"key mangling" function, if any.
The value will always be an instance
of :class:`.CachedValue`.
When implementing a new :class:`.CacheBackend` or cutomizing via
When implementing a new :class:`.CacheBackend` or cutomizing via
:class:`.ProxyBackend`, be aware that when this method is invoked by
:meth:`.Region.get_or_create_multi`, the ``mapping`` values are the
same ones returned to the upstream caller. If the subclass alters the
values in any way, it must not do so 'in-place' on the ``mapping`` dict
-- that will have the undesirable effect of modifying the returned
-- that will have the undesirable effect of modifying the returned
values as well.
.. versionadded:: 0.5.0

View File

@ -7,10 +7,10 @@ Provides backends that deal with local filesystem access.
"""
from __future__ import with_statement
from dogpile.cache.api import CacheBackend, NO_VALUE
from ..api import CacheBackend, NO_VALUE
from contextlib import contextmanager
from dogpile.cache import compat
from dogpile.cache import util
from ...util import compat
from ... import util
import os
__all__ = 'DBMBackend', 'FileLock', 'AbstractFileLock'

View File

@ -6,9 +6,9 @@ Provides backends for talking to `memcached <http://memcached.org>`_.
"""
from dogpile.cache.api import CacheBackend, NO_VALUE
from dogpile.cache import compat
from dogpile.cache import util
from ..api import CacheBackend, NO_VALUE
from ...util import compat
from ... import util
import random
import time

View File

@ -10,8 +10,8 @@ places the value as given into the dictionary.
"""
from dogpile.cache.api import CacheBackend, NO_VALUE
from dogpile.cache.compat import pickle
from ..api import CacheBackend, NO_VALUE
from ...util.compat import pickle
class MemoryBackend(CacheBackend):

View File

@ -10,7 +10,7 @@ caching for a region that is otherwise used normally.
"""
from dogpile.cache.api import CacheBackend, NO_VALUE
from ..api import CacheBackend, NO_VALUE
__all__ = ['NullBackend']

View File

@ -7,8 +7,8 @@ Provides backends for talking to `Redis <http://redis.io>`_.
"""
from __future__ import absolute_import
from dogpile.cache.api import CacheBackend, NO_VALUE
from dogpile.cache.compat import pickle, u
from ..api import CacheBackend, NO_VALUE
from ..util.compat import pickle, u
redis = None

View File

@ -1,12 +1,12 @@
from __future__ import with_statement
from dogpile.core import Lock, NeedRegenerationException
from dogpile.core.nameregistry import NameRegistry
from .. import Lock, NeedRegenerationException
from ..util import NameRegistry
from . import exception
from .util import function_key_generator, PluginLoader, \
memoized_property, coerce_string_conf, function_multi_key_generator
from ..util import PluginLoader, memoized_property, coerce_string_conf
from .util import function_key_generator, function_multi_key_generator
from .api import NO_VALUE, CachedValue
from .proxy import ProxyBackend
from . import compat
from ..util import compat
import time
import datetime
from numbers import Number
@ -671,11 +671,11 @@ class CacheRegion(object):
The method uses the same approach as :meth:`.Region.get_multi`
and :meth:`.Region.set_multi` to get and set values from the
backend.
If you are using a :class:`.CacheBackend` or :class:`.ProxyBackend`
that modifies values, take note this function invokes
If you are using a :class:`.CacheBackend` or :class:`.ProxyBackend`
that modifies values, take note this function invokes
``.set_multi()`` for newly generated values using the same values it
returns to the calling function. A correct implementation of
returns to the calling function. A correct implementation of
``.set_multi()`` will not modify values in-place on the submitted
``mapping`` dict.

119
dogpile/cache/util.py vendored
View File

@ -1,55 +1,6 @@
from hashlib import sha1
import inspect
import re
import collections
from . import compat
def coerce_string_conf(d):
result = {}
for k, v in d.items():
if not isinstance(v, compat.string_types):
result[k] = v
continue
v = v.strip()
if re.match(r'^[-+]?\d+$', v):
result[k] = int(v)
elif re.match(r'^[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?$', v):
result[k] = float(v)
elif v.lower() in ('false', 'true'):
result[k] = v.lower() == 'true'
elif v == 'None':
result[k] = None
else:
result[k] = v
return result
class PluginLoader(object):
def __init__(self, group):
self.group = group
self.impls = {}
def load(self, name):
if name in self.impls:
return self.impls[name]()
else: # pragma NO COVERAGE
import pkg_resources
for impl in pkg_resources.iter_entry_points(
self.group, name):
self.impls[name] = impl.load
return impl.load()
else:
raise Exception(
"Can't load plugin %s %s" %
(self.group, name))
def register(self, name, modulepath, objname):
def load():
mod = __import__(modulepath, fromlist=[objname])
return getattr(mod, objname)
self.impls[name] = load
from ..util import compat
def function_key_generator(namespace, fn, to_str=compat.string_type):
@ -125,71 +76,3 @@ def length_conditional_mangler(length, mangler):
return mangle
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
def __get__(self, obj, cls):
if obj is None:
return self
obj.__dict__[self.__name__] = result = self.fget(obj)
return result
def to_list(x, default=None):
"""Coerce to a list."""
if x is None:
return default
if not isinstance(x, (list, tuple)):
return [x]
else:
return x
class KeyReentrantMutex(object):
def __init__(self, key, mutex, keys):
self.key = key
self.mutex = mutex
self.keys = keys
@classmethod
def factory(cls, mutex):
# this collection holds zero or one
# thread idents as the key; a set of
# keynames held as the value.
keystore = collections.defaultdict(set)
def fac(key):
return KeyReentrantMutex(key, mutex, keystore)
return fac
def acquire(self, wait=True):
current_thread = compat.threading.current_thread().ident
keys = self.keys.get(current_thread)
if keys is not None and \
self.key not in keys:
# current lockholder, new key. add it in
keys.add(self.key)
return True
elif self.mutex.acquire(wait=wait):
# after acquire, create new set and add our key
self.keys[current_thread].add(self.key)
return True
else:
return False
def release(self):
current_thread = compat.threading.current_thread().ident
keys = self.keys.get(current_thread)
assert keys is not None, "this thread didn't do the acquire"
assert self.key in keys, "No acquire held for key '%s'" % self.key
keys.remove(self.key)
if not keys:
# when list of keys empty, remove
# the thread ident and unlock.
del self.keys[current_thread]
self.mutex.release()

15
dogpile/core.py Normal file
View File

@ -0,0 +1,15 @@
"""Compatibility namespace for those using dogpile.core.
As of dogpile.cache 0.6.0, dogpile.core as a separate package
is no longer used by dogpile.cache.
Note that this namespace will not take effect if an actual
dogpile.core installation is present.
"""
from .util import nameregistry # noqa
from .util import readwrite_lock # noqa
from .lock import Lock # noqa
from .lock import NeedRegenerationException # noqa
from . import __version__ # noqa

158
dogpile/lock.py Normal file
View File

@ -0,0 +1,158 @@
import time
import logging
log = logging.getLogger(__name__)
class NeedRegenerationException(Exception):
"""An exception that when raised in the 'with' block,
forces the 'has_value' flag to False and incurs a
regeneration of the value.
"""
NOT_REGENERATED = object()
class Lock(object):
"""Dogpile lock class.
Provides an interface around an arbitrary mutex
that allows one thread/process to be elected as
the creator of a new value, while other threads/processes
continue to return the previous version
of that value.
:param mutex: A mutex object that provides ``acquire()``
and ``release()`` methods.
:param creator: Callable which returns a tuple of the form
(new_value, creation_time). "new_value" should be a newly
generated value representing completed state. "creation_time"
should be a floating point time value which is relative
to Python's ``time.time()`` call, representing the time
at which the value was created. This time value should
be associated with the created value.
:param value_and_created_fn: Callable which returns
a tuple of the form (existing_value, creation_time). This
basically should return what the last local call to the ``creator()``
callable has returned, i.e. the value and the creation time,
which would be assumed here to be from a cache. If the
value is not available, the :class:`.NeedRegenerationException`
exception should be thrown.
:param expiretime: Expiration time in seconds. Set to
``None`` for never expires. This timestamp is compared
to the creation_time result and ``time.time()`` to determine if
the value returned by value_and_created_fn is "expired".
:param async_creator: A callable. If specified, this callable will be
passed the mutex as an argument and is responsible for releasing the mutex
after it finishes some asynchronous value creation. The intent is for
this to be used to defer invocation of the creator callable until some
later time.
"""
def __init__(
self,
mutex,
creator,
value_and_created_fn,
expiretime,
async_creator=None,
):
self.mutex = mutex
self.creator = creator
self.value_and_created_fn = value_and_created_fn
self.expiretime = expiretime
self.async_creator = async_creator
def _is_expired(self, createdtime):
"""Return true if the expiration time is reached, or no
value is available."""
return not self._has_value(createdtime) or \
(
self.expiretime is not None and
time.time() - createdtime > self.expiretime
)
def _has_value(self, createdtime):
"""Return true if the creation function has proceeded
at least once."""
return createdtime > 0
def _enter(self):
value_fn = self.value_and_created_fn
try:
value = value_fn()
value, createdtime = value
except NeedRegenerationException:
log.debug("NeedRegenerationException")
value = NOT_REGENERATED
createdtime = -1
generated = self._enter_create(createdtime)
if generated is not NOT_REGENERATED:
generated, createdtime = generated
return generated
elif value is NOT_REGENERATED:
try:
value, createdtime = value_fn()
return value
except NeedRegenerationException:
raise Exception("Generation function should "
"have just been called by a concurrent "
"thread.")
else:
return value
def _enter_create(self, createdtime):
if not self._is_expired(createdtime):
return NOT_REGENERATED
async = False
if self._has_value(createdtime):
if not self.mutex.acquire(False):
log.debug("creation function in progress "
"elsewhere, returning")
return NOT_REGENERATED
else:
log.debug("no value, waiting for create lock")
self.mutex.acquire()
try:
log.debug("value creation lock %r acquired" % self.mutex)
# see if someone created the value already
try:
value, createdtime = self.value_and_created_fn()
except NeedRegenerationException:
pass
else:
if not self._is_expired(createdtime):
log.debug("value already present")
return value, createdtime
elif self.async_creator:
log.debug("Passing creation lock to async runner")
self.async_creator(self.mutex)
async = True
return value, createdtime
log.debug("Calling creation function")
created = self.creator()
return created
finally:
if not async:
self.mutex.release()
log.debug("Released creation lock")
def __enter__(self):
return self._enter()
def __exit__(self, type, value, traceback):
pass

4
dogpile/util/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .nameregistry import NameRegistry # noqa
from .readwrite_lock import ReadWriteMutex # noqa
from .langhelpers import PluginLoader, memoized_property, \
coerce_string_conf, to_list, KeyReentrantMutex # noqa

120
dogpile/util/langhelpers.py Normal file
View File

@ -0,0 +1,120 @@
import re
import collections
from . import compat
def coerce_string_conf(d):
result = {}
for k, v in d.items():
if not isinstance(v, compat.string_types):
result[k] = v
continue
v = v.strip()
if re.match(r'^[-+]?\d+$', v):
result[k] = int(v)
elif re.match(r'^[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?$', v):
result[k] = float(v)
elif v.lower() in ('false', 'true'):
result[k] = v.lower() == 'true'
elif v == 'None':
result[k] = None
else:
result[k] = v
return result
class PluginLoader(object):
def __init__(self, group):
self.group = group
self.impls = {}
def load(self, name):
if name in self.impls:
return self.impls[name]()
else: # pragma NO COVERAGE
import pkg_resources
for impl in pkg_resources.iter_entry_points(
self.group, name):
self.impls[name] = impl.load
return impl.load()
else:
raise Exception(
"Can't load plugin %s %s" %
(self.group, name))
def register(self, name, modulepath, objname):
def load():
mod = __import__(modulepath, fromlist=[objname])
return getattr(mod, objname)
self.impls[name] = load
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
self.__name__ = fget.__name__
def __get__(self, obj, cls):
if obj is None:
return self
obj.__dict__[self.__name__] = result = self.fget(obj)
return result
def to_list(x, default=None):
"""Coerce to a list."""
if x is None:
return default
if not isinstance(x, (list, tuple)):
return [x]
else:
return x
class KeyReentrantMutex(object):
def __init__(self, key, mutex, keys):
self.key = key
self.mutex = mutex
self.keys = keys
@classmethod
def factory(cls, mutex):
# this collection holds zero or one
# thread idents as the key; a set of
# keynames held as the value.
keystore = collections.defaultdict(set)
def fac(key):
return KeyReentrantMutex(key, mutex, keystore)
return fac
def acquire(self, wait=True):
current_thread = compat.threading.current_thread().ident
keys = self.keys.get(current_thread)
if keys is not None and \
self.key not in keys:
# current lockholder, new key. add it in
keys.add(self.key)
return True
elif self.mutex.acquire(wait=wait):
# after acquire, create new set and add our key
self.keys[current_thread].add(self.key)
return True
else:
return False
def release(self):
current_thread = compat.threading.current_thread().ident
keys = self.keys.get(current_thread)
assert keys is not None, "this thread didn't do the acquire"
assert self.key in keys, "No acquire held for key '%s'" % self.key
keys.remove(self.key)
if not keys:
# when list of keys empty, remove
# the thread ident and unlock.
del self.keys[current_thread]
self.mutex.release()

View File

@ -0,0 +1,84 @@
from .compat import threading
import weakref
class NameRegistry(object):
"""Generates and return an object, keeping it as a
singleton for a certain identifier for as long as its
strongly referenced.
e.g.::
class MyFoo(object):
"some important object."
def __init__(self, identifier):
self.identifier = identifier
registry = NameRegistry(MyFoo)
# thread 1:
my_foo = registry.get("foo1")
# thread 2
my_foo = registry.get("foo1")
Above, ``my_foo`` in both thread #1 and #2 will
be *the same object*. The constructor for
``MyFoo`` will be called once, passing the
identifier ``foo1`` as the argument.
When thread 1 and thread 2 both complete or
otherwise delete references to ``my_foo``, the
object is *removed* from the :class:`.NameRegistry` as
a result of Python garbage collection.
:param creator: A function that will create a new
value, given the identifier passed to the :meth:`.NameRegistry.get`
method.
"""
_locks = weakref.WeakValueDictionary()
_mutex = threading.RLock()
def __init__(self, creator):
"""Create a new :class:`.NameRegistry`.
"""
self._values = weakref.WeakValueDictionary()
self._mutex = threading.RLock()
self.creator = creator
def get(self, identifier, *args, **kw):
"""Get and possibly create the value.
:param identifier: Hash key for the value.
If the creation function is called, this identifier
will also be passed to the creation function.
:param \*args, \**kw: Additional arguments which will
also be passed to the creation function if it is
called.
"""
try:
if identifier in self._values:
return self._values[identifier]
else:
return self._sync_get(identifier, *args, **kw)
except KeyError:
return self._sync_get(identifier, *args, **kw)
def _sync_get(self, identifier, *args, **kw):
self._mutex.acquire()
try:
try:
if identifier in self._values:
return self._values[identifier]
else:
self._values[identifier] = value = self.creator(identifier, *args, **kw)
return value
except KeyError:
self._values[identifier] = value = self.creator(identifier, *args, **kw)
return value
finally:
self._mutex.release()

View File

@ -0,0 +1,132 @@
from .compat import threading
import logging
log = logging.getLogger(__name__)
class LockError(Exception):
pass
class ReadWriteMutex(object):
"""A mutex which allows multiple readers, single writer.
:class:`.ReadWriteMutex` uses a Python ``threading.Condition``
to provide this functionality across threads within a process.
The Beaker package also contained a file-lock based version
of this concept, so that readers/writers could be synchronized
across processes with a common filesystem. A future Dogpile
release may include this additional class at some point.
"""
def __init__(self):
# counts how many asynchronous methods are executing
self.async = 0
# pointer to thread that is the current sync operation
self.current_sync_operation = None
# condition object to lock on
self.condition = threading.Condition(threading.Lock())
def acquire_read_lock(self, wait = True):
"""Acquire the 'read' lock."""
self.condition.acquire()
try:
# see if a synchronous operation is waiting to start
# or is already running, in which case we wait (or just
# give up and return)
if wait:
while self.current_sync_operation is not None:
self.condition.wait()
else:
if self.current_sync_operation is not None:
return False
self.async += 1
log.debug("%s acquired read lock", self)
finally:
self.condition.release()
if not wait:
return True
def release_read_lock(self):
"""Release the 'read' lock."""
self.condition.acquire()
try:
self.async -= 1
# check if we are the last asynchronous reader thread
# out the door.
if self.async == 0:
# yes. so if a sync operation is waiting, notifyAll to wake
# it up
if self.current_sync_operation is not None:
self.condition.notifyAll()
elif self.async < 0:
raise LockError("Synchronizer error - too many "
"release_read_locks called")
log.debug("%s released read lock", self)
finally:
self.condition.release()
def acquire_write_lock(self, wait = True):
"""Acquire the 'write' lock."""
self.condition.acquire()
try:
# here, we are not a synchronous reader, and after returning,
# assuming waiting or immediate availability, we will be.
if wait:
# if another sync is working, wait
while self.current_sync_operation is not None:
self.condition.wait()
else:
# if another sync is working,
# we dont want to wait, so forget it
if self.current_sync_operation is not None:
return False
# establish ourselves as the current sync
# this indicates to other read/write operations
# that they should wait until this is None again
self.current_sync_operation = threading.currentThread()
# now wait again for asyncs to finish
if self.async > 0:
if wait:
# wait
self.condition.wait()
else:
# we dont want to wait, so forget it
self.current_sync_operation = None
return False
log.debug("%s acquired write lock", self)
finally:
self.condition.release()
if not wait:
return True
def release_write_lock(self):
"""Release the 'write' lock."""
self.condition.acquire()
try:
if self.current_sync_operation is not threading.currentThread():
raise LockError("Synchronizer error - current thread doesn't "
"have the write lock")
# reset the current sync operation so
# another can get it
self.current_sync_operation = None
# tell everyone to get ready
self.condition.notifyAll()
log.debug("%s released write lock", self)
finally:
# everyone go !!
self.condition.release()

34
log_tests.ini Normal file
View File

@ -0,0 +1,34 @@
[loggers]
keys = root, dogpilecore, tests
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = CRITICAL
handlers = console
[logger_dogpilecore]
level = DEBUG
qualname = dogpile.core
handlers =
[logger_tests]
level = DEBUG
qualname = tests
handlers =
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] [%(thread)s] %(message)s
datefmt = %Y-%m-%d %H:%M:%S

View File

@ -28,7 +28,7 @@ class PyTest(TestCommand):
v = open(
os.path.join(
os.path.dirname(__file__),
'dogpile', 'cache', '__init__.py')
'dogpile', '__init__.py')
)
VERSION = re.compile(r".*__version__ = '(.*?)'", re.S).match(v.read()).group(1)
v.close()
@ -53,13 +53,11 @@ setup(
url='http://bitbucket.org/zzzeek/dogpile.cache',
license='BSD',
packages=find_packages('.', exclude=['ez_setup', 'tests*']),
namespace_packages=['dogpile'],
entry_points="""
[mako.cache]
dogpile.cache = dogpile.cache.plugins.mako_cache:MakoPlugin
""",
zip_safe=False,
install_requires=['dogpile.core>=0.4.1'],
tests_require=['pytest', 'pytest-cov', 'mock', 'Mako'],
cmdclass={'test': PyTest},
)

View File

@ -1,7 +1,8 @@
import re
import pytest
from functools import wraps
from dogpile.cache import compat
from dogpile.util import compat
from dogpile.util.compat import configparser, io # noqa
import time
@ -27,8 +28,6 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
except except_cls as e:
assert re.search(msg, str(e)), "%r !~ %s" % (msg, e)
from dogpile.cache.compat import configparser, io # noqa
def winsleep():
# sleep a for an amount of time

View File

@ -2,7 +2,7 @@ from ._fixtures import _GenericBackendTest, _GenericMutexTest
from . import assert_raises_message
import os
import sys
from dogpile.core.readwrite_lock import ReadWriteMutex
from dogpile.util.readwrite_lock import ReadWriteMutex
from dogpile.cache.backends.file import AbstractFileLock
try:

View File

@ -4,7 +4,8 @@ from ._fixtures import _GenericBackendFixture
from . import eq_, requires_py3k, winsleep
from unittest import TestCase
import time
from dogpile.cache import util, compat
from dogpile.cache import util
from dogpile.util import compat
import itertools
from dogpile.cache.api import NO_VALUE

View File

@ -4,7 +4,7 @@ from unittest import TestCase
from threading import Thread
import time
import pytest
from dogpile.cache import compat
from dogpile.util import compat
import os

View File

@ -1,5 +1,9 @@
from _pytest.unittest import UnitTestCase
import sys
import logging
import logging.config
logging.config.fileConfig("log_tests.ini")
def is_unittest(obj):

View File

@ -0,0 +1,25 @@
import unittest
import threading
import dogpile
class TestAsyncRunner(unittest.TestCase):
def test_async_release(self):
self.called = False
def runner(mutex):
self.called = True
mutex.release()
mutex = threading.Lock()
create = lambda: ("value", 1)
get = lambda: ("value", 1)
expiretime = 1
assert not self.called
with dogpile.Lock(mutex, create, get, expiretime, runner) as l:
assert self.called
assert self.called

279
tests/test_lock.py Normal file
View File

@ -0,0 +1,279 @@
from unittest import TestCase
import time
import threading
from dogpile import Lock, NeedRegenerationException
from dogpile.util import ReadWriteMutex
import contextlib
import math
import logging
log = logging.getLogger(__name__)
class ConcurrencyTest(TestCase):
# expiretime, time to create, num usages, time spend using, delay btw usage
_assertion_lock = threading.Lock()
def test_quick(self):
self._test_multi(
10, 2, .5, 50, .05, .1,
)
def test_slow(self):
self._test_multi(
10, 5, 2, 50, .1, .1,
)
# TODO: this is a port from the legacy test_dogpile test.
# sequence and calculations need to be revised.
# def test_get_value_plus_created_slow_write(self):
# self._test_multi(
# 10, 2, .5, 50, .05, .1,
# slow_write_time=2
# )
def test_return_while_in_progress(self):
self._test_multi(
10, 5, 2, 50, 1, .1
)
def test_get_value_plus_created_long_create(self):
self._test_multi(
10, 2, 2.5, 50, .05, .1,
)
def test_get_value_plus_created_registry_unsafe_cache(self):
self._test_multi(
10, 1, .6, 100, .05, .1,
cache_expire_time='unsafe'
)
def test_get_value_plus_created_registry_safe_cache_quick(self):
self._test_multi(
10, 2, .5, 50, .05, .1,
cache_expire_time='safe'
)
def test_get_value_plus_created_registry_safe_cache_slow(self):
self._test_multi(
10, 5, 2, 50, .1, .1,
cache_expire_time='safe'
)
def _assert_synchronized(self):
acq = self._assertion_lock.acquire(False)
assert acq, "Could not acquire"
@contextlib.contextmanager
def go():
try:
yield {}
except:
raise
finally:
self._assertion_lock.release()
return go()
def _assert_log(self, cond, msg, *args):
if cond:
log.debug(msg, *args)
else:
log.error("Assertion failed: " + msg, *args)
assert False, msg % args
def _test_multi(
self, num_threads,
expiretime,
creation_time,
num_usages,
usage_time,
delay_time,
cache_expire_time=None,
slow_write_time=None
):
mutex = threading.Lock()
if slow_write_time:
readwritelock = ReadWriteMutex()
unsafe_cache = False
if cache_expire_time:
if cache_expire_time == 'unsafe':
unsafe_cache = True
cache_expire_time = expiretime * .8
elif cache_expire_time == 'safe':
cache_expire_time = (expiretime + creation_time) * 1.1
else:
assert False, cache_expire_time
log.info("Cache expire time: %s", cache_expire_time)
effective_expiretime = min(cache_expire_time, expiretime)
else:
effective_expiretime = expiretime
effective_creation_time = creation_time
max_stale = (
effective_expiretime + effective_creation_time +
usage_time + delay_time) * 1.1
the_resource = []
slow_waiters = [0]
failures = [0]
def create_resource():
with self._assert_synchronized():
log.debug(
"creating resource, will take %f sec" % creation_time)
time.sleep(creation_time)
if slow_write_time:
readwritelock.acquire_write_lock()
try:
saved = list(the_resource)
# clear out the resource dict so that
# usage threads hitting it will
# raise
the_resource[:] = []
time.sleep(slow_write_time)
the_resource[:] = saved
finally:
readwritelock.release_write_lock()
the_resource.append(time.time())
value = the_resource[-1]
log.debug("finished creating resource")
return value, time.time()
def get_value():
if not the_resource:
raise NeedRegenerationException()
if cache_expire_time:
if time.time() - the_resource[-1] > cache_expire_time:
# should never hit a cache invalidation
# if we've set expiretime below the cache
# expire time (assuming a cache which
# honors this).
self._assert_log(
cache_expire_time < expiretime,
"Cache expiration hit, cache "
"expire time %s, expiretime %s",
cache_expire_time,
expiretime,
)
raise NeedRegenerationException()
if slow_write_time:
readwritelock.acquire_read_lock()
try:
return the_resource[-1], the_resource[-1]
finally:
if slow_write_time:
readwritelock.release_read_lock()
def use_dogpile():
try:
for i in range(num_usages):
now = time.time()
with Lock(
mutex, create_resource,
get_value, expiretime) as value:
waited = time.time() - now
if waited > .01:
slow_waiters[0] += 1
check_value(value, waited)
time.sleep(usage_time)
time.sleep(delay_time)
except:
log.error("thread failed", exc_info=True)
failures[0] += 1
def check_value(value, waited):
assert value
# time since the current resource was
# created
time_since_create = time.time() - value
self._assert_log(
time_since_create < max_stale,
"Time since create %.4f max stale time %s, "
"total waited %s",
time_since_create, max_stale,
slow_waiters[0]
)
started_at = time.time()
threads = []
for i in range(num_threads):
t = threading.Thread(target=use_dogpile)
t.start()
threads.append(t)
for t in threads:
t.join()
actual_run_time = time.time() - started_at
# time spent starts with num usages * time per usage, with a 10% fudge
expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
expected_generations = math.ceil(
expected_run_time / effective_expiretime)
if unsafe_cache:
expected_slow_waiters = expected_generations * num_threads
else:
expected_slow_waiters = expected_generations + num_threads - 1
if slow_write_time:
expected_slow_waiters = num_threads * expected_generations
# time spent also increments by one wait period in the beginning...
expected_run_time += effective_creation_time
# and a fudged version of the periodic waiting time anticipated
# for a single thread...
expected_run_time += (
expected_slow_waiters * effective_creation_time) / num_threads
expected_run_time *= 1.1
log.info("Test Summary")
log.info(
"num threads: %s; expiretime: %s; creation_time: %s; "
"num_usages: %s; "
"usage_time: %s; delay_time: %s",
num_threads, expiretime, creation_time, num_usages,
usage_time, delay_time
)
log.info(
"cache expire time: %s; unsafe cache: %s",
cache_expire_time, unsafe_cache)
log.info(
"Estimated run time %.2f actual run time %.2f",
expected_run_time, actual_run_time)
log.info(
"Effective expiretime (min(cache_exp_time, exptime)) %s",
effective_expiretime)
log.info(
"Expected slow waits %s, Total slow waits %s",
expected_slow_waiters, slow_waiters[0])
log.info(
"Total generations %s Max generations expected %s" % (
len(the_resource), expected_generations
)
)
assert not failures[0], "%s failures occurred" % failures[0]
assert actual_run_time <= expected_run_time
assert slow_waiters[0] <= expected_slow_waiters, \
"Number of slow waiters %s exceeds expected slow waiters %s" % (
slow_waiters[0],
expected_slow_waiters
)
assert len(the_resource) <= expected_generations,\
"Number of resource generations %d exceeded "\
"expected %d" % (
len(the_resource),
expected_generations)

View File

@ -1,6 +1,6 @@
from unittest import TestCase
from dogpile.cache import util
from dogpile import util
class UtilsTest(TestCase):

0
tests/util/__init__.py Normal file
View File

View File

@ -0,0 +1,59 @@
from unittest import TestCase
import time
import threading
from dogpile.util import NameRegistry
import random
import logging
log = logging.getLogger(__name__)
class NameRegistryTest(TestCase):
def test_name_registry(self):
success = [True]
num_operations = [0]
def create(identifier):
log.debug("Creator running for id: " + identifier)
return threading.Lock()
registry = NameRegistry(create)
baton = {
"beans":False,
"means":False,
"please":False
}
def do_something(name):
for iteration in range(20):
name = list(baton)[random.randint(0, 2)]
lock = registry.get(name)
lock.acquire()
try:
if baton[name]:
success[0] = False
log.debug("Baton is already populated")
break
baton[name] = True
try:
time.sleep(random.random() * .01)
finally:
num_operations[0] += 1
baton[name] = False
finally:
lock.release()
log.debug("thread completed operations")
threads = []
for id_ in range(1, 20):
t = threading.Thread(target=do_something, args=("somename",))
t.start()
threads.append(t)
for t in threads:
t.join()
assert success[0]