Switch from python-memcached to pymemcache.
Switch memcachepool to use pymemcache. This change switches from python-memcached to pymemcached for the memcachepool backend fixing an issue due to the __new__ attribute reassign. Change-Id: Icaa6b252145685d1b5667b883e3bf693920b5b93 Closes-Bug: #1812935
This commit is contained in:
parent
f1294c73a4
commit
8a8248d764
|
@ -46,6 +46,7 @@ psutil==5.4.3
|
||||||
pycparser==2.18
|
pycparser==2.18
|
||||||
pyflakes==0.8.1
|
pyflakes==0.8.1
|
||||||
pyinotify==0.9.6
|
pyinotify==0.9.6
|
||||||
|
pymemcache==2.1.1
|
||||||
pymongo==3.0.2
|
pymongo==3.0.2
|
||||||
pyparsing==2.2.0
|
pyparsing==2.2.0
|
||||||
python-dateutil==2.7.0
|
python-dateutil==2.7.0
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""Thread-safe connection pool for python-memcached."""
|
"""Thread-safe connection pool for pymemcache."""
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import contextlib
|
import contextlib
|
||||||
|
@ -21,13 +21,12 @@ import itertools
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
try:
|
|
||||||
import eventlet
|
|
||||||
except ImportError:
|
|
||||||
eventlet = None
|
|
||||||
import memcache
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
from pymemcache.client import hash as pymemcache_hash
|
||||||
|
from pymemcache import serde
|
||||||
from six.moves import queue
|
from six.moves import queue
|
||||||
|
from six.moves.urllib.parse import urlparse
|
||||||
from six.moves import zip
|
from six.moves import zip
|
||||||
|
|
||||||
from oslo_cache._i18n import _
|
from oslo_cache._i18n import _
|
||||||
|
@ -37,31 +36,10 @@ from oslo_cache import exception
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class _MemcacheClient(memcache.Client):
|
|
||||||
"""Thread global memcache client
|
|
||||||
|
|
||||||
As client is inherited from threading.local we have to restore object
|
|
||||||
methods overloaded by threading.local so we can reuse clients in
|
|
||||||
different threads
|
|
||||||
"""
|
|
||||||
__delattr__ = object.__delattr__
|
|
||||||
__getattribute__ = object.__getattribute__
|
|
||||||
__setattr__ = object.__setattr__
|
|
||||||
|
|
||||||
# Hack for lp 1812935
|
|
||||||
if eventlet and eventlet.patcher.is_monkey_patched('thread'):
|
|
||||||
# NOTE(bnemec): I'm not entirely sure why this works in a
|
|
||||||
# monkey-patched environment and not with vanilla stdlib, but it does.
|
|
||||||
def __new__(cls, *args, **kwargs):
|
|
||||||
return object.__new__(cls)
|
|
||||||
else:
|
|
||||||
__new__ = object.__new__
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
|
_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
|
||||||
|
DEFAULT_MEMCACHEPORT = 11211
|
||||||
|
DEFAULT_SERIALIZER = serde.python_memcache_serializer
|
||||||
|
DEFAULT_DESERIALIZER = serde.python_memcache_deserializer
|
||||||
|
|
||||||
|
|
||||||
class ConnectionPool(queue.Queue):
|
class ConnectionPool(queue.Queue):
|
||||||
|
@ -204,35 +182,46 @@ class MemcacheClientPool(ConnectionPool):
|
||||||
# super() cannot be used here because Queue in stdlib is an
|
# super() cannot be used here because Queue in stdlib is an
|
||||||
# old-style class
|
# old-style class
|
||||||
ConnectionPool.__init__(self, **kwargs)
|
ConnectionPool.__init__(self, **kwargs)
|
||||||
self.urls = urls
|
self._format_urls(urls)
|
||||||
self._arguments = arguments
|
self._arguments = arguments
|
||||||
|
self._init_arguments()
|
||||||
# NOTE(morganfainberg): The host objects expect an int for the
|
# NOTE(morganfainberg): The host objects expect an int for the
|
||||||
# deaduntil value. Initialize this at 0 for each host with 0 indicating
|
# deaduntil value. Initialize this at 0 for each host with 0 indicating
|
||||||
# the host is not dead.
|
# the host is not dead.
|
||||||
self._hosts_deaduntil = [0] * len(urls)
|
self._hosts_deaduntil = [0] * len(urls)
|
||||||
|
|
||||||
|
def _init_arguments(self):
|
||||||
|
if 'serializer' not in self._arguments:
|
||||||
|
self._arguments['serializer'] = DEFAULT_SERIALIZER
|
||||||
|
if 'deserializer' not in self._arguments:
|
||||||
|
self._arguments['deserializer'] = DEFAULT_DESERIALIZER
|
||||||
|
if 'socket_timeout' in self._arguments:
|
||||||
|
timeout = self._arguments['socket_timeout']
|
||||||
|
self._arguments['connection_timeout'] = timeout
|
||||||
|
del self._arguments['socket_timeout']
|
||||||
|
|
||||||
|
def _format_urls(self, urls):
|
||||||
|
self.urls = []
|
||||||
|
for url in urls:
|
||||||
|
parsed = urlparse(url)
|
||||||
|
if not parsed.port:
|
||||||
|
port = DEFAULT_MEMCACHEPORT
|
||||||
|
self._trace_logger(
|
||||||
|
"Using port ({port}) with {url}".format(url=url, port=port)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# NOTE(hberaud) removing port information from url to avoid
|
||||||
|
# pymemcache to concat twice. Don't use string replace to avoid
|
||||||
|
# ipv6 format override. Some port values can already be used
|
||||||
|
# inside the ipv6 format and we don't want to replace them.
|
||||||
|
# (example: https://[2620:52:0:13b8:8080:ff:fe3e:1]:8080)
|
||||||
|
port_info = len(str(parsed.port)) + 1
|
||||||
|
url = url[:-port_info]
|
||||||
|
port = parsed.port
|
||||||
|
self.urls.append((url, int(port)))
|
||||||
|
|
||||||
def _create_connection(self):
|
def _create_connection(self):
|
||||||
# NOTE(morgan): Explicitly set flush_on_reconnect for pooled
|
return pymemcache_hash.HashClient(self.urls, **self._arguments)
|
||||||
# connections. This should ensure that stale data is never consumed
|
|
||||||
# from a server that pops in/out due to a network partition
|
|
||||||
# or disconnect.
|
|
||||||
#
|
|
||||||
# See the help from python-memcached:
|
|
||||||
#
|
|
||||||
# param flush_on_reconnect: optional flag which prevents a
|
|
||||||
# scenario that can cause stale data to be read: If there's more
|
|
||||||
# than one memcached server and the connection to one is
|
|
||||||
# interrupted, keys that mapped to that server will get
|
|
||||||
# reassigned to another. If the first server comes back, those
|
|
||||||
# keys will map to it again. If it still has its data, get()s
|
|
||||||
# can read stale data that was overwritten on another
|
|
||||||
# server. This flag is off by default for backwards
|
|
||||||
# compatibility.
|
|
||||||
#
|
|
||||||
# The normal non-pooled clients connect explicitly on each use and
|
|
||||||
# does not need the explicit flush_on_reconnect
|
|
||||||
return _MemcacheClient(self.urls, flush_on_reconnect=True,
|
|
||||||
**self._arguments)
|
|
||||||
|
|
||||||
def _destroy_connection(self, conn):
|
def _destroy_connection(self, conn):
|
||||||
conn.disconnect_all()
|
conn.disconnect_all()
|
||||||
|
|
|
@ -10,11 +10,10 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
import six
|
from pymemcache.client import hash as pymemcache_hash
|
||||||
from six.moves import queue
|
from six.moves import queue
|
||||||
import testtools
|
import testtools
|
||||||
from testtools import matchers
|
from testtools import matchers
|
||||||
|
@ -133,32 +132,34 @@ class TestConnectionPool(test_cache.BaseTestCase):
|
||||||
_acquire_connection()
|
_acquire_connection()
|
||||||
|
|
||||||
|
|
||||||
class TestMemcacheClientOverrides(test_cache.BaseTestCase):
|
class TestMemcacheClientPool(test_cache.BaseTestCase):
|
||||||
|
|
||||||
def test_client_stripped_of_threading_local(self):
|
def test_memcache_client_pool_create_connection(self):
|
||||||
"""threading.local overrides are restored for _MemcacheClient"""
|
urls = [
|
||||||
client_class = _memcache_pool._MemcacheClient
|
"foo",
|
||||||
# get the genuine thread._local from MRO
|
"http://foo",
|
||||||
thread_local = client_class.__mro__[2]
|
"http://bar:11211",
|
||||||
self.assertTrue(thread_local is threading.local)
|
"http://bar:8080",
|
||||||
for field in six.iterkeys(thread_local.__dict__):
|
"https://[2620:52:0:13b8:5054:ff:fe3e:1]:8080",
|
||||||
if field not in ('__dict__', '__weakref__'):
|
# testing port format is already in use in ipv6 format
|
||||||
self.assertNotEqual(id(getattr(thread_local, field, None)),
|
"https://[2620:52:0:13b8:8080:ff:fe3e:1]:8080",
|
||||||
id(getattr(client_class, field, None)))
|
"https://[2620:52:0:13b8:5054:ff:fe3e:1]",
|
||||||
|
"https://[::192.9.5.5]",
|
||||||
def test_can_create_with_kwargs(self):
|
]
|
||||||
"""Test for lp 1812935
|
mcp = _memcache_pool.MemcacheClientPool(urls=urls,
|
||||||
|
arguments={},
|
||||||
Note that in order to reproduce the bug, it is necessary to add the
|
maxsize=10,
|
||||||
following to the top of oslo_cache/tests/__init__.py::
|
unused_timeout=10)
|
||||||
|
mc = mcp._create_connection()
|
||||||
import eventlet
|
self.assertTrue(type(mc) is pymemcache_hash.HashClient)
|
||||||
eventlet.monkey_patch()
|
self.assertTrue("foo:11211" in mc.clients)
|
||||||
|
self.assertTrue("http://foo:11211" in mc.clients)
|
||||||
This should happen before any other imports in that file.
|
self.assertTrue("http://bar:11211" in mc.clients)
|
||||||
"""
|
self.assertTrue("http://bar:8080" in mc.clients)
|
||||||
client = _memcache_pool._MemcacheClient('foo', check_keys=False)
|
self.assertTrue("https://[2620:52:0:13b8:5054:ff:fe3e:1]:8080" in
|
||||||
# Make sure kwargs are properly processed by the client
|
mc.clients)
|
||||||
self.assertFalse(client.do_check_key)
|
self.assertTrue("https://[2620:52:0:13b8:8080:ff:fe3e:1]:8080" in
|
||||||
# Make sure our __new__ override still results in the right type
|
mc.clients)
|
||||||
self.assertIsInstance(client, _memcache_pool._MemcacheClient)
|
self.assertTrue("https://[2620:52:0:13b8:5054:ff:fe3e:1]:11211" in
|
||||||
|
mc.clients)
|
||||||
|
self.assertTrue("https://[::192.9.5.5]:11211" in mc.clients)
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Switch from python-memcached to pymemcached for memcache_pool.
|
||||||
|
This avoids issues with thread.local usage and fixes errors
|
||||||
|
seen with inheritance. This is only applicable with dogpile.cache
|
||||||
|
memcached backend.
|
|
@ -34,7 +34,7 @@ dogpile.cache =
|
||||||
|
|
||||||
[extras]
|
[extras]
|
||||||
dogpile =
|
dogpile =
|
||||||
python-memcached>=1.56 # PSF
|
pymemcache>=2.1.1 # Apache 2.0
|
||||||
mongo =
|
mongo =
|
||||||
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
||||||
etcd3gw =
|
etcd3gw =
|
||||||
|
|
|
@ -8,6 +8,6 @@ pifpaf>=0.10.0 # Apache-2.0
|
||||||
# Bandit security code scanner
|
# Bandit security code scanner
|
||||||
bandit>=1.1.0,<1.6.0 # Apache-2.0
|
bandit>=1.1.0,<1.6.0 # Apache-2.0
|
||||||
stestr>=2.0.0 # Apache-2.0
|
stestr>=2.0.0 # Apache-2.0
|
||||||
python-memcached>=1.56 # PSF
|
pymemcache>=2.1.1 # Apache 2.0
|
||||||
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
pymongo!=3.1,>=3.0.2 # Apache-2.0
|
||||||
etcd3gw>=0.2.0 # Apache-2.0
|
etcd3gw>=0.2.0 # Apache-2.0
|
||||||
|
|
Loading…
Reference in New Issue