Add a Timeout when getting Memcache connections
The old Timeout behavior when pulling connections of the MemcacheConnPool left ambiguity around what timed out and could put more placeholders in the queue than the configured max_connections. To avoid waiting indefinitely on slow severs we raise a custom Timeout when we fail get a connection from the pool. We still error limit the slow server, and move onto the next, but we still don't allow more than max_connections. Change-Id: I9e2409896423d52da69e35c038e5f457c71f705d
This commit is contained in:
parent
6607beab0d
commit
0b37026911
@ -60,6 +60,7 @@ from swift.common.utils import json
|
|||||||
DEFAULT_MEMCACHED_PORT = 11211
|
DEFAULT_MEMCACHED_PORT = 11211
|
||||||
|
|
||||||
CONN_TIMEOUT = 0.3
|
CONN_TIMEOUT = 0.3
|
||||||
|
POOL_TIMEOUT = 1.0 # WAG
|
||||||
IO_TIMEOUT = 2.0
|
IO_TIMEOUT = 2.0
|
||||||
PICKLE_FLAG = 1
|
PICKLE_FLAG = 1
|
||||||
JSON_FLAG = 2
|
JSON_FLAG = 2
|
||||||
@ -94,6 +95,10 @@ class MemcacheConnectionError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MemcachePoolTimeout(Timeout):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class MemcacheConnPool(Pool):
|
class MemcacheConnPool(Pool):
|
||||||
"""Connection pool for Memcache Connections"""
|
"""Connection pool for Memcache Connections"""
|
||||||
|
|
||||||
@ -128,8 +133,8 @@ class MemcacheRing(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
||||||
io_timeout=IO_TIMEOUT, tries=TRY_COUNT,
|
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||||
allow_pickle=False, allow_unpickle=False,
|
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||||
max_conns=2):
|
max_conns=2):
|
||||||
self._ring = {}
|
self._ring = {}
|
||||||
self._errors = dict(((serv, []) for serv in servers))
|
self._errors = dict(((serv, []) for serv in servers))
|
||||||
@ -145,11 +150,12 @@ class MemcacheRing(object):
|
|||||||
for server in servers))
|
for server in servers))
|
||||||
self._connect_timeout = connect_timeout
|
self._connect_timeout = connect_timeout
|
||||||
self._io_timeout = io_timeout
|
self._io_timeout = io_timeout
|
||||||
|
self._pool_timeout = pool_timeout
|
||||||
self._allow_pickle = allow_pickle
|
self._allow_pickle = allow_pickle
|
||||||
self._allow_unpickle = allow_unpickle or allow_pickle
|
self._allow_unpickle = allow_unpickle or allow_pickle
|
||||||
|
|
||||||
def _exception_occurred(self, server, e, action='talking',
|
def _exception_occurred(self, server, e, action='talking',
|
||||||
sock=None, fp=None):
|
sock=None, fp=None, got_connection=True):
|
||||||
if isinstance(e, Timeout):
|
if isinstance(e, Timeout):
|
||||||
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
|
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
|
||||||
{'action': action, 'server': server})
|
{'action': action, 'server': server})
|
||||||
@ -168,9 +174,10 @@ class MemcacheRing(object):
|
|||||||
del sock
|
del sock
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
# We need to return something to the pool
|
if got_connection:
|
||||||
# A new connection will be created the next time it is retreived
|
# We need to return something to the pool
|
||||||
self._return_conn(server, None, None)
|
# A new connection will be created the next time it is retreived
|
||||||
|
self._return_conn(server, None, None)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self._errors[server].append(time.time())
|
self._errors[server].append(time.time())
|
||||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
||||||
@ -197,17 +204,13 @@ class MemcacheRing(object):
|
|||||||
continue
|
continue
|
||||||
sock = None
|
sock = None
|
||||||
try:
|
try:
|
||||||
# NOTE: We do NOT place a Timeout over the MemcacheConnPool's
|
with MemcachePoolTimeout(self._pool_timeout):
|
||||||
# get() method. The MemcacheConnPool's create() method already
|
fp, sock = self._client_cache[server].get()
|
||||||
# places a timeout around the connect() system call, which we
|
|
||||||
# catch below. It is possible for the underlying Queue of the
|
|
||||||
# MemcacheConnPool to be contended such that this greenlet is
|
|
||||||
# waiting for one or more connections to be freed up. If there
|
|
||||||
# is a particularly slow memcache server causing that problme,
|
|
||||||
# then the IO_TIMEOUT will catch that behavior, so we do not
|
|
||||||
# have to place a Timeout here.
|
|
||||||
fp, sock = self._client_cache[server].get()
|
|
||||||
yield server, fp, sock
|
yield server, fp, sock
|
||||||
|
except MemcachePoolTimeout as e:
|
||||||
|
self._exception_occurred(
|
||||||
|
server, e, action='getting a connection',
|
||||||
|
got_connection=False)
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
# Typically a Timeout exception caught here is the one raised
|
# Typically a Timeout exception caught here is the one raised
|
||||||
# by the create() method of this server's MemcacheConnPool
|
# by the create() method of this server's MemcacheConnPool
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
"""Tests for swift.common.utils"""
|
"""Tests for swift.common.utils"""
|
||||||
|
|
||||||
from __future__ import with_statement
|
from __future__ import with_statement
|
||||||
|
from collections import defaultdict
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
@ -27,7 +28,7 @@ from eventlet import GreenPool, sleep, Queue
|
|||||||
from eventlet.pools import Pool
|
from eventlet.pools import Pool
|
||||||
|
|
||||||
from swift.common import memcached
|
from swift.common import memcached
|
||||||
from mock import patch
|
from mock import patch, MagicMock
|
||||||
from test.unit import NullLoggingHandler
|
from test.unit import NullLoggingHandler
|
||||||
|
|
||||||
|
|
||||||
@ -407,6 +408,65 @@ class TestMemcached(unittest.TestCase):
|
|||||||
connections.get_nowait()
|
connections.get_nowait()
|
||||||
self.assertTrue(connections.empty())
|
self.assertTrue(connections.empty())
|
||||||
|
|
||||||
|
def test_connection_pool_timeout(self):
|
||||||
|
orig_conn_pool = memcached.MemcacheConnPool
|
||||||
|
try:
|
||||||
|
connections = defaultdict(Queue)
|
||||||
|
pending = defaultdict(int)
|
||||||
|
served = defaultdict(int)
|
||||||
|
|
||||||
|
class MockConnectionPool(orig_conn_pool):
|
||||||
|
def get(self):
|
||||||
|
pending[self.server] += 1
|
||||||
|
conn = connections[self.server].get()
|
||||||
|
pending[self.server] -= 1
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def put(self, *args, **kwargs):
|
||||||
|
connections[self.server].put(*args, **kwargs)
|
||||||
|
served[self.server] += 1
|
||||||
|
|
||||||
|
memcached.MemcacheConnPool = MockConnectionPool
|
||||||
|
|
||||||
|
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||||
|
'1.2.3.5:11211'],
|
||||||
|
io_timeout=0.5,
|
||||||
|
pool_timeout=0.1)
|
||||||
|
|
||||||
|
p = GreenPool()
|
||||||
|
for i in range(10):
|
||||||
|
p.spawn(memcache_client.set, 'key', 'value')
|
||||||
|
|
||||||
|
# let everyone block
|
||||||
|
sleep(0)
|
||||||
|
self.assertEqual(pending['1.2.3.5:11211'], 10)
|
||||||
|
|
||||||
|
# hand out a couple slow connection
|
||||||
|
mock_conn = MagicMock(), MagicMock()
|
||||||
|
mock_conn[1].sendall = lambda x: sleep(0.2)
|
||||||
|
connections['1.2.3.5:11211'].put(mock_conn)
|
||||||
|
connections['1.2.3.5:11211'].put(mock_conn)
|
||||||
|
|
||||||
|
# so far so good, everyone is still waiting
|
||||||
|
sleep(0)
|
||||||
|
self.assertEqual(pending['1.2.3.5:11211'], 8)
|
||||||
|
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 0)
|
||||||
|
|
||||||
|
# but they won't wait longer than pool_timeout
|
||||||
|
mock_conn = MagicMock(), MagicMock()
|
||||||
|
connections['1.2.3.4:11211'].put(mock_conn)
|
||||||
|
connections['1.2.3.4:11211'].put(mock_conn)
|
||||||
|
p.waitall()
|
||||||
|
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||||
|
self.assertEqual(served['1.2.3.5:11211'], 2)
|
||||||
|
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||||
|
self.assertEqual(served['1.2.3.4:11211'], 8)
|
||||||
|
|
||||||
|
# and we never got more put in that we gave out
|
||||||
|
self.assertEqual(connections['1.2.3.5:11211'].qsize(), 2)
|
||||||
|
self.assertEqual(connections['1.2.3.4:11211'].qsize(), 2)
|
||||||
|
finally:
|
||||||
|
memcached.MemcacheConnPool = orig_conn_pool
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user