diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 07f0ec3a0b..fdcbc745ae 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -60,6 +60,7 @@ from swift.common.utils import json DEFAULT_MEMCACHED_PORT = 11211 CONN_TIMEOUT = 0.3 +POOL_TIMEOUT = 1.0 # WAG IO_TIMEOUT = 2.0 PICKLE_FLAG = 1 JSON_FLAG = 2 @@ -94,6 +95,10 @@ class MemcacheConnectionError(Exception): pass +class MemcachePoolTimeout(Timeout): + pass + + class MemcacheConnPool(Pool): """Connection pool for Memcache Connections""" @@ -128,8 +133,8 @@ class MemcacheRing(object): """ def __init__(self, servers, connect_timeout=CONN_TIMEOUT, - io_timeout=IO_TIMEOUT, tries=TRY_COUNT, - allow_pickle=False, allow_unpickle=False, + io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT, + tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False, max_conns=2): self._ring = {} self._errors = dict(((serv, []) for serv in servers)) @@ -145,11 +150,12 @@ class MemcacheRing(object): for server in servers)) self._connect_timeout = connect_timeout self._io_timeout = io_timeout + self._pool_timeout = pool_timeout self._allow_pickle = allow_pickle self._allow_unpickle = allow_unpickle or allow_pickle def _exception_occurred(self, server, e, action='talking', - sock=None, fp=None): + sock=None, fp=None, got_connection=True): if isinstance(e, Timeout): logging.error(_("Timeout %(action)s to memcached: %(server)s"), {'action': action, 'server': server}) @@ -168,9 +174,10 @@ class MemcacheRing(object): del sock except Exception: pass - # We need to return something to the pool - # A new connection will be created the next time it is retreived - self._return_conn(server, None, None) + if got_connection: + # We need to return something to the pool + # A new connection will be created the next time it is retreived + self._return_conn(server, None, None) now = time.time() self._errors[server].append(time.time()) if len(self._errors[server]) > ERROR_LIMIT_COUNT: @@ -197,17 +204,13 @@ class MemcacheRing(object): continue sock = None try: - # NOTE: We do NOT place a Timeout over the MemcacheConnPool's - # get() method. The MemcacheConnPool's create() method already - # places a timeout around the connect() system call, which we - # catch below. It is possible for the underlying Queue of the - # MemcacheConnPool to be contended such that this greenlet is - # waiting for one or more connections to be freed up. If there - # is a particularly slow memcache server causing that problme, - # then the IO_TIMEOUT will catch that behavior, so we do not - # have to place a Timeout here. - fp, sock = self._client_cache[server].get() + with MemcachePoolTimeout(self._pool_timeout): + fp, sock = self._client_cache[server].get() yield server, fp, sock + except MemcachePoolTimeout as e: + self._exception_occurred( + server, e, action='getting a connection', + got_connection=False) except (Exception, Timeout) as e: # Typically a Timeout exception caught here is the one raised # by the create() method of this server's MemcacheConnPool diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index cfb55bbd76..5db798334d 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -17,6 +17,7 @@ """Tests for swift.common.utils""" from __future__ import with_statement +from collections import defaultdict import logging import socket import time @@ -27,7 +28,7 @@ from eventlet import GreenPool, sleep, Queue from eventlet.pools import Pool from swift.common import memcached -from mock import patch +from mock import patch, MagicMock from test.unit import NullLoggingHandler @@ -407,6 +408,65 @@ class TestMemcached(unittest.TestCase): connections.get_nowait() self.assertTrue(connections.empty()) + def test_connection_pool_timeout(self): + orig_conn_pool = memcached.MemcacheConnPool + try: + connections = defaultdict(Queue) + pending = defaultdict(int) + served = defaultdict(int) + + class MockConnectionPool(orig_conn_pool): + def get(self): + pending[self.server] += 1 + conn = connections[self.server].get() + pending[self.server] -= 1 + return conn + + def put(self, *args, **kwargs): + connections[self.server].put(*args, **kwargs) + served[self.server] += 1 + + memcached.MemcacheConnPool = MockConnectionPool + + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211', + '1.2.3.5:11211'], + io_timeout=0.5, + pool_timeout=0.1) + + p = GreenPool() + for i in range(10): + p.spawn(memcache_client.set, 'key', 'value') + + # let everyone block + sleep(0) + self.assertEqual(pending['1.2.3.5:11211'], 10) + + # hand out a couple slow connection + mock_conn = MagicMock(), MagicMock() + mock_conn[1].sendall = lambda x: sleep(0.2) + connections['1.2.3.5:11211'].put(mock_conn) + connections['1.2.3.5:11211'].put(mock_conn) + + # so far so good, everyone is still waiting + sleep(0) + self.assertEqual(pending['1.2.3.5:11211'], 8) + self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 0) + + # but they won't wait longer than pool_timeout + mock_conn = MagicMock(), MagicMock() + connections['1.2.3.4:11211'].put(mock_conn) + connections['1.2.3.4:11211'].put(mock_conn) + p.waitall() + self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8) + self.assertEqual(served['1.2.3.5:11211'], 2) + self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0) + self.assertEqual(served['1.2.3.4:11211'], 8) + + # and we never got more put in that we gave out + self.assertEqual(connections['1.2.3.5:11211'].qsize(), 2) + self.assertEqual(connections['1.2.3.4:11211'].qsize(), 2) + finally: + memcached.MemcacheConnPool = orig_conn_pool if __name__ == '__main__': unittest.main()