From aff65242ff87b24d43d7a6ce2b1c33546363144b Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Mon, 2 Nov 2020 15:27:16 -0800 Subject: [PATCH] memcache: Make error-limiting values configurable Previously these were all hardcoded; let operators tweak them as needed. Significantly, this also allows operators to disable error-limiting entirely, which may be a useful protection in case proxies are configured with a single memcached server. Use error_suppression_limit and error_suppression_interval to mirror the option names used by the proxy-server to ratelimit backend Swift servers. Co-Authored-By: Alistair Coles Change-Id: Ife005cb8545dd966d7b0e34e5496a0354c003881 --- etc/memcache.conf-sample | 8 ++ etc/proxy-server.conf-sample | 9 ++ swift/common/memcached.py | 25 ++-- swift/common/middleware/memcache.py | 14 +- test/unit/common/middleware/test_memcache.py | 18 +++ test/unit/common/test_memcached.py | 129 ++++++++++++++++++- 6 files changed, 186 insertions(+), 17 deletions(-) diff --git a/etc/memcache.conf-sample b/etc/memcache.conf-sample index 813ecf9edb..1de13317d5 100644 --- a/etc/memcache.conf-sample +++ b/etc/memcache.conf-sample @@ -26,3 +26,11 @@ # tries = 3 # Timeout for read and writes # io_timeout = 2.0 +# +# How long without an error before a server's error count is reset. This will +# also be how long before a server is reenabled after suppression is triggered. +# Set to 0 to disable error-limiting. +# error_suppression_interval = 60.0 +# +# How many errors can accumulate before a server is temporarily ignored. +# error_suppression_limit = 10 diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 147a3c50e3..5cf7cb2aaf 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -160,6 +160,7 @@ use = egg:swift#proxy # # How long without an error before a node's error count is reset. This will # also be how long before a node is reenabled after suppression is triggered. +# Set to 0 to disable error-limiting. # error_suppression_interval = 60 # # How many errors can accumulate before a node is temporarily ignored. @@ -693,6 +694,14 @@ use = egg:swift#memcache # Sets the maximum number of connections to each memcached server per worker # memcache_max_connections = 2 # +# How long without an error before a server's error count is reset. This will +# also be how long before a server is reenabled after suppression is triggered. +# Set to 0 to disable error-limiting. +# error_suppression_interval = 60.0 +# +# How many errors can accumulate before a server is temporarily ignored. +# error_suppression_limit = 10 +# # More options documented in memcache.conf-sample [filter:ratelimit] diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 08da5c7bad..bc1216283a 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -72,8 +72,7 @@ TRY_COUNT = 3 # if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server # will be considered failed for ERROR_LIMIT_DURATION seconds. ERROR_LIMIT_COUNT = 10 -ERROR_LIMIT_TIME = 60 -ERROR_LIMIT_DURATION = 60 +ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60 def md5hash(key): @@ -160,10 +159,16 @@ class MemcacheRing(object): def __init__(self, servers, connect_timeout=CONN_TIMEOUT, io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT, tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False, - max_conns=2, logger=None): + max_conns=2, logger=None, + error_limit_count=ERROR_LIMIT_COUNT, + error_limit_time=ERROR_LIMIT_TIME, + error_limit_duration=ERROR_LIMIT_DURATION): self._ring = {} self._errors = dict(((serv, []) for serv in servers)) self._error_limited = dict(((serv, 0) for serv in servers)) + self._error_limit_count = error_limit_count + self._error_limit_time = error_limit_time + self._error_limit_duration = error_limit_duration for server in sorted(servers): for i in range(NODE_WEIGHT): self._ring[md5hash('%s-%s' % (server, i))] = server @@ -211,13 +216,17 @@ class MemcacheRing(object): # We need to return something to the pool # A new connection will be created the next time it is retrieved self._return_conn(server, None, None) + + if self._error_limit_time <= 0 or self._error_limit_duration <= 0: + return + now = time.time() - self._errors[server].append(time.time()) - if len(self._errors[server]) > ERROR_LIMIT_COUNT: + self._errors[server].append(now) + if len(self._errors[server]) > self._error_limit_count: self._errors[server] = [err for err in self._errors[server] - if err > now - ERROR_LIMIT_TIME] - if len(self._errors[server]) > ERROR_LIMIT_COUNT: - self._error_limited[server] = now + ERROR_LIMIT_DURATION + if err > now - self._error_limit_time] + if len(self._errors[server]) > self._error_limit_count: + self._error_limited[server] = now + self._error_limit_duration self.logger.error('Error limiting server %s', server) def _get_conns(self, key): diff --git a/swift/common/middleware/memcache.py b/swift/common/middleware/memcache.py index b5b9569a52..88df1dc815 100644 --- a/swift/common/middleware/memcache.py +++ b/swift/common/middleware/memcache.py @@ -17,8 +17,9 @@ import os from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError -from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, - IO_TIMEOUT, TRY_COUNT) +from swift.common.memcached import ( + MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT, + ERROR_LIMIT_COUNT, ERROR_LIMIT_TIME) from swift.common.utils import get_logger @@ -86,6 +87,10 @@ class MemcacheMiddleware(object): 'pool_timeout', POOL_TIMEOUT)) tries = int(memcache_options.get('tries', TRY_COUNT)) io_timeout = float(memcache_options.get('io_timeout', IO_TIMEOUT)) + error_suppression_interval = float(memcache_options.get( + 'error_suppression_interval', ERROR_LIMIT_TIME)) + error_suppression_limit = float(memcache_options.get( + 'error_suppression_limit', ERROR_LIMIT_COUNT)) if not self.memcache_servers: self.memcache_servers = '127.0.0.1:11211' @@ -105,7 +110,10 @@ class MemcacheMiddleware(object): allow_pickle=(serialization_format == 0), allow_unpickle=(serialization_format <= 1), max_conns=max_conns, - logger=self.logger) + logger=self.logger, + error_limit_count=error_suppression_limit, + error_limit_time=error_suppression_interval, + error_limit_duration=error_suppression_interval) def __call__(self, env, start_response): env['swift.cache'] = self.memcache diff --git a/test/unit/common/middleware/test_memcache.py b/test/unit/common/middleware/test_memcache.py index cbe04d2fe1..65eb034a51 100644 --- a/test/unit/common/middleware/test_memcache.py +++ b/test/unit/common/middleware/test_memcache.py @@ -160,6 +160,16 @@ class TestCacheMiddleware(unittest.TestCase): self.assertEqual( app.memcache._client_cache['6.7.8.9:10'].max_size, 5) + def test_conf_inline_ratelimiting(self): + with mock.patch.object(memcache, 'ConfigParser', get_config_parser()): + app = memcache.MemcacheMiddleware( + FakeApp(), + {'error_suppression_limit': '5', + 'error_suppression_interval': '2.5'}) + self.assertEqual(app.memcache._error_limit_count, 5) + self.assertEqual(app.memcache._error_limit_time, 2.5) + self.assertEqual(app.memcache._error_limit_duration, 2.5) + def test_conf_extra_no_section(self): with mock.patch.object(memcache, 'ConfigParser', get_config_parser(section='foobar')): @@ -336,6 +346,9 @@ class TestCacheMiddleware(unittest.TestCase): # tries is limited to server count self.assertEqual(memcache_ring._tries, 4) self.assertEqual(memcache_ring._io_timeout, 1.0) + self.assertEqual(memcache_ring._error_limit_count, 10) + self.assertEqual(memcache_ring._error_limit_time, 60) + self.assertEqual(memcache_ring._error_limit_duration, 60) @with_tempdir def test_real_memcache_config(self, tempdir): @@ -363,6 +376,8 @@ class TestCacheMiddleware(unittest.TestCase): 10.0.0.4:11211 connect_timeout = 0.5 io_timeout = 1.0 + error_suppression_limit = 0 + error_suppression_interval = 1.5 """ memcache_config_path = os.path.join(tempdir, 'memcache.conf') with open(memcache_config_path, 'w') as f: @@ -376,6 +391,9 @@ class TestCacheMiddleware(unittest.TestCase): self.assertEqual(memcache_ring._tries, 3) # memcache conf options are defaults self.assertEqual(memcache_ring._io_timeout, 1.0) + self.assertEqual(memcache_ring._error_limit_count, 0) + self.assertEqual(memcache_ring._error_limit_time, 1.5) + self.assertEqual(memcache_ring._error_limit_duration, 1.5) if __name__ == '__main__': diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 61ad6082da..7ae9e63066 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -50,19 +50,23 @@ class MockedMemcachePool(memcached.MemcacheConnPool): class ExplodingMockMemcached(object): + should_explode = True exploded = False def sendall(self, string): - self.exploded = True - raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) + if self.should_explode: + self.exploded = True + raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) def readline(self): - self.exploded = True - raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) + if self.should_explode: + self.exploded = True + raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) def read(self, size): - self.exploded = True - raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) + if self.should_explode: + self.exploded = True + raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE)) def close(self): pass @@ -512,6 +516,119 @@ class TestMemcached(unittest.TestCase): self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks, []) + def test_error_limiting(self): + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211', '1.2.3.5:11211'], logger=self.logger) + mock1 = ExplodingMockMemcached() + mock2 = ExplodingMockMemcached() + mock2.should_explode = False + memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock2, mock2)] * 12) + memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool( + [(mock1, mock1)] * 12) + + for _ in range(12): + memcache_client.set('some_key', [1, 2, 3]) + # twelfth one skips .5 because of error limiting and goes straight + # to .4 + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + ] * 11 + [ + 'Error limiting server 1.2.3.5:11211' + ]) + self.logger.clear() + + mock2.should_explode = True + for _ in range(12): + memcache_client.set('some_key', [1, 2, 3]) + # as we keep going, eventually .4 gets error limited, too + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ] * 11 + [ + 'Error limiting server 1.2.3.4:11211' + ]) + self.logger.clear() + + # continued requests just keep bypassing memcache + for _ in range(12): + memcache_client.set('some_key', [1, 2, 3]) + self.assertEqual(self.logger.get_lines_for_level('error'), []) + + # and get()s are all a "cache miss" + self.assertIsNone(memcache_client.get('some_key')) + self.assertEqual(self.logger.get_lines_for_level('error'), []) + + def test_error_disabled(self): + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, error_limit_time=0) + mock1 = ExplodingMockMemcached() + memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock1, mock1)] * 20) + + for _ in range(20): + memcache_client.set('some_key', [1, 2, 3]) + # twelfth one skips .5 because of error limiting and goes straight + # to .4 + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ] * 20) + + def test_error_limiting_custom_config(self): + def do_calls(time_step, num_calls, **memcache_kwargs): + self.logger.clear() + memcache_client = memcached.MemcacheRing( + ['1.2.3.5:11211'], logger=self.logger, + **memcache_kwargs) + mock1 = ExplodingMockMemcached() + memcache_client._client_cache['1.2.3.5:11211'] = \ + MockedMemcachePool([(mock1, mock1)] * num_calls) + + for n in range(num_calls): + with mock.patch.object(memcached.time, 'time', + return_value=time_step * n): + memcache_client.set('some_key', [1, 2, 3]) + + # with default error_limit_time of 60, one call per 5 secs, twelfth one + # triggers error limit + do_calls(5, 12) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + ] * 11 + [ + 'Error limiting server 1.2.3.5:11211' + ]) + + # with default error_limit_time of 60, one call per 6 secs, error limit + # is not triggered + do_calls(6, 20) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + ] * 20) + + # with error_limit_time of 66, one call per 6 secs, twelfth one + # triggers error limit + do_calls(6, 12, error_limit_time=66) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + ] * 11 + [ + 'Error limiting server 1.2.3.5:11211' + ]) + + # with error_limit_time of 70, one call per 6 secs, error_limit_count + # of 11, 13th call triggers error limit + do_calls(6, 13, error_limit_time=70, error_limit_count=11) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + ] * 12 + [ + 'Error limiting server 1.2.3.5:11211' + ]) + def test_delete(self): memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], logger=self.logger)