diff --git a/swift/common/memcached.py b/swift/common/memcached.py index fdcbc745ae..4b0d8ede00 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -50,10 +50,11 @@ import time from bisect import bisect from swift import gettext_ as _ from hashlib import md5 +from distutils.version import StrictVersion from eventlet.green import socket from eventlet.pools import Pool -from eventlet import Timeout +from eventlet import Timeout, __version__ as eventlet_version from swift.common.utils import json @@ -106,6 +107,14 @@ class MemcacheConnPool(Pool): Pool.__init__(self, max_size=size) self.server = server self._connect_timeout = connect_timeout + self._parent_class_getter = super(MemcacheConnPool, self).get + try: + # call the patched .get() if eventlet is older than 0.9.17 + if StrictVersion(eventlet_version) < StrictVersion('0.9.17'): + self._parent_class_getter = self._upstream_fixed_get + except ValueError: + # "invalid" version number or otherwise error parsing version + pass def create(self): if ':' in self.server: @@ -120,12 +129,37 @@ class MemcacheConnPool(Pool): return (sock.makefile(), sock) def get(self): - fp, sock = Pool.get(self) + fp, sock = self._parent_class_getter() if fp is None: # An error happened previously, so we need a new connection fp, sock = self.create() return fp, sock + # The following method is from eventlet post 0.9.16. This version + # properly keeps track of pool size accounting, and therefore doesn't + # let the pool grow without bound. This patched version is the result + # of commit f5e5b2bda7b442f0262ee1084deefcc5a1cc0694 in eventlet and is + # documented at https://bitbucket.org/eventlet/eventlet/issue/91 + def _upstream_fixed_get(self): + """Return an item from the pool, when one is available. This may + cause the calling greenthread to block. + """ + if self.free_items: + return self.free_items.popleft() + self.current_size += 1 + if self.current_size <= self.max_size: + try: + created = self.create() + # This was really supposed to be "except:" but ran afoul of the + # H201 check, which does not implement the "noqa" exception. Once + # that's fixed, the except here can be changed to "except: # noqa" + except (Exception, BaseException): + self.current_size -= 1 + raise + return created + self.current_size -= 1 # did not create + return self.channel.get() + class MemcacheRing(object): """ diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 5db798334d..9169c67970 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -36,6 +36,8 @@ class MockedMemcachePool(memcached.MemcacheConnPool): def __init__(self, mocks): Pool.__init__(self, max_size=2) self.mocks = mocks + # setting this for the eventlet workaround in the MemcacheConnPool + self._parent_class_getter = super(memcached.MemcacheConnPool, self).get def create(self): return self.mocks.pop(0) @@ -363,6 +365,7 @@ class TestMemcached(unittest.TestCase): def wait_connect(addr): connected.append(addr) + sleep(0.1) # yield val = connections.get() if val is not None: errors.append(val) @@ -408,6 +411,68 @@ class TestMemcached(unittest.TestCase): connections.get_nowait() self.assertTrue(connections.empty()) + # Ensure we exercise the backported-for-pre-eventlet-version-0.9.17 get() + # code, even if the executing eventlet's version is already newer. + @patch.object(memcached, 'eventlet_version', '0.9.16') + def test_connection_pooling_pre_0_9_17(self): + with patch('swift.common.memcached.socket') as mock_module: + connected = [] + count = [0] + + def _slow_yielding_connector(addr): + count[0] += 1 + if count[0] % 3 == 0: + raise ValueError('whoops!') + sleep(0.1) + connected.append(addr) + + mock_module.socket.return_value.connect.side_effect = \ + _slow_yielding_connector + + # If POOL_SIZE is not small enough relative to USER_COUNT, the + # "free_items" business in the eventlet.pools.Pool will cause + # spurious failures below. I found these values to work well on a + # VM running in VirtualBox on a late 2013 Retina MacbookPro: + POOL_SIZE = 5 + USER_COUNT = 50 + + pool = memcached.MemcacheConnPool('1.2.3.4:11211', size=POOL_SIZE, + connect_timeout=10) + self.assertEqual(POOL_SIZE, pool.max_size) + + def _user(): + got = None + while not got: + try: + got = pool.get() + # This was really supposed to be "except:" but ran afoul + # of the H201 check, which does not implement the "noqa" + # exception. Once that's fixed, the except here can be + # changed to "except: # noqa" + except (Exception, BaseException): + pass + pool.put(got) + + # make a bunch of requests "at the same time" + p = GreenPool() + for i in range(USER_COUNT): + p.spawn(_user) + p.waitall() + + # If the except block after the "created = self.create()" call + # doesn't correctly decrement self.current_size, this test will + # fail by having some number less than POOL_SIZE connections (in my + # testing, anyway). + self.assertEqual(POOL_SIZE, len(connected)) + + # Subsequent requests should get and use the existing + # connections, not creating any more. + for i in range(USER_COUNT): + p.spawn(_user) + p.waitall() + + self.assertEqual(POOL_SIZE, len(connected)) + def test_connection_pool_timeout(self): orig_conn_pool = memcached.MemcacheConnPool try: