Merge "memcache: Make error-limiting values configurable"
This commit is contained in:
commit
2593f7f264
@ -26,3 +26,11 @@
|
|||||||
# tries = 3
|
# tries = 3
|
||||||
# Timeout for read and writes
|
# Timeout for read and writes
|
||||||
# io_timeout = 2.0
|
# io_timeout = 2.0
|
||||||
|
#
|
||||||
|
# How long without an error before a server's error count is reset. This will
|
||||||
|
# also be how long before a server is reenabled after suppression is triggered.
|
||||||
|
# Set to 0 to disable error-limiting.
|
||||||
|
# error_suppression_interval = 60.0
|
||||||
|
#
|
||||||
|
# How many errors can accumulate before a server is temporarily ignored.
|
||||||
|
# error_suppression_limit = 10
|
||||||
|
@ -160,6 +160,7 @@ use = egg:swift#proxy
|
|||||||
#
|
#
|
||||||
# How long without an error before a node's error count is reset. This will
|
# How long without an error before a node's error count is reset. This will
|
||||||
# also be how long before a node is reenabled after suppression is triggered.
|
# also be how long before a node is reenabled after suppression is triggered.
|
||||||
|
# Set to 0 to disable error-limiting.
|
||||||
# error_suppression_interval = 60
|
# error_suppression_interval = 60
|
||||||
#
|
#
|
||||||
# How many errors can accumulate before a node is temporarily ignored.
|
# How many errors can accumulate before a node is temporarily ignored.
|
||||||
@ -703,6 +704,14 @@ use = egg:swift#memcache
|
|||||||
# Sets the maximum number of connections to each memcached server per worker
|
# Sets the maximum number of connections to each memcached server per worker
|
||||||
# memcache_max_connections = 2
|
# memcache_max_connections = 2
|
||||||
#
|
#
|
||||||
|
# How long without an error before a server's error count is reset. This will
|
||||||
|
# also be how long before a server is reenabled after suppression is triggered.
|
||||||
|
# Set to 0 to disable error-limiting.
|
||||||
|
# error_suppression_interval = 60.0
|
||||||
|
#
|
||||||
|
# How many errors can accumulate before a server is temporarily ignored.
|
||||||
|
# error_suppression_limit = 10
|
||||||
|
#
|
||||||
# More options documented in memcache.conf-sample
|
# More options documented in memcache.conf-sample
|
||||||
|
|
||||||
[filter:ratelimit]
|
[filter:ratelimit]
|
||||||
|
@ -72,8 +72,7 @@ TRY_COUNT = 3
|
|||||||
# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
|
# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
|
||||||
# will be considered failed for ERROR_LIMIT_DURATION seconds.
|
# will be considered failed for ERROR_LIMIT_DURATION seconds.
|
||||||
ERROR_LIMIT_COUNT = 10
|
ERROR_LIMIT_COUNT = 10
|
||||||
ERROR_LIMIT_TIME = 60
|
ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60
|
||||||
ERROR_LIMIT_DURATION = 60
|
|
||||||
|
|
||||||
|
|
||||||
def md5hash(key):
|
def md5hash(key):
|
||||||
@ -160,10 +159,16 @@ class MemcacheRing(object):
|
|||||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
||||||
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||||
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||||
max_conns=2, logger=None):
|
max_conns=2, logger=None,
|
||||||
|
error_limit_count=ERROR_LIMIT_COUNT,
|
||||||
|
error_limit_time=ERROR_LIMIT_TIME,
|
||||||
|
error_limit_duration=ERROR_LIMIT_DURATION):
|
||||||
self._ring = {}
|
self._ring = {}
|
||||||
self._errors = dict(((serv, []) for serv in servers))
|
self._errors = dict(((serv, []) for serv in servers))
|
||||||
self._error_limited = dict(((serv, 0) for serv in servers))
|
self._error_limited = dict(((serv, 0) for serv in servers))
|
||||||
|
self._error_limit_count = error_limit_count
|
||||||
|
self._error_limit_time = error_limit_time
|
||||||
|
self._error_limit_duration = error_limit_duration
|
||||||
for server in sorted(servers):
|
for server in sorted(servers):
|
||||||
for i in range(NODE_WEIGHT):
|
for i in range(NODE_WEIGHT):
|
||||||
self._ring[md5hash('%s-%s' % (server, i))] = server
|
self._ring[md5hash('%s-%s' % (server, i))] = server
|
||||||
@ -211,13 +216,17 @@ class MemcacheRing(object):
|
|||||||
# We need to return something to the pool
|
# We need to return something to the pool
|
||||||
# A new connection will be created the next time it is retrieved
|
# A new connection will be created the next time it is retrieved
|
||||||
self._return_conn(server, None, None)
|
self._return_conn(server, None, None)
|
||||||
|
|
||||||
|
if self._error_limit_time <= 0 or self._error_limit_duration <= 0:
|
||||||
|
return
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self._errors[server].append(time.time())
|
self._errors[server].append(now)
|
||||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
if len(self._errors[server]) > self._error_limit_count:
|
||||||
self._errors[server] = [err for err in self._errors[server]
|
self._errors[server] = [err for err in self._errors[server]
|
||||||
if err > now - ERROR_LIMIT_TIME]
|
if err > now - self._error_limit_time]
|
||||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
if len(self._errors[server]) > self._error_limit_count:
|
||||||
self._error_limited[server] = now + ERROR_LIMIT_DURATION
|
self._error_limited[server] = now + self._error_limit_duration
|
||||||
self.logger.error('Error limiting server %s', server)
|
self.logger.error('Error limiting server %s', server)
|
||||||
|
|
||||||
def _get_conns(self, key):
|
def _get_conns(self, key):
|
||||||
|
@ -17,8 +17,9 @@ import os
|
|||||||
|
|
||||||
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
||||||
|
|
||||||
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
|
from swift.common.memcached import (
|
||||||
IO_TIMEOUT, TRY_COUNT)
|
MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT,
|
||||||
|
ERROR_LIMIT_COUNT, ERROR_LIMIT_TIME)
|
||||||
from swift.common.utils import get_logger
|
from swift.common.utils import get_logger
|
||||||
|
|
||||||
|
|
||||||
@ -86,6 +87,10 @@ class MemcacheMiddleware(object):
|
|||||||
'pool_timeout', POOL_TIMEOUT))
|
'pool_timeout', POOL_TIMEOUT))
|
||||||
tries = int(memcache_options.get('tries', TRY_COUNT))
|
tries = int(memcache_options.get('tries', TRY_COUNT))
|
||||||
io_timeout = float(memcache_options.get('io_timeout', IO_TIMEOUT))
|
io_timeout = float(memcache_options.get('io_timeout', IO_TIMEOUT))
|
||||||
|
error_suppression_interval = float(memcache_options.get(
|
||||||
|
'error_suppression_interval', ERROR_LIMIT_TIME))
|
||||||
|
error_suppression_limit = float(memcache_options.get(
|
||||||
|
'error_suppression_limit', ERROR_LIMIT_COUNT))
|
||||||
|
|
||||||
if not self.memcache_servers:
|
if not self.memcache_servers:
|
||||||
self.memcache_servers = '127.0.0.1:11211'
|
self.memcache_servers = '127.0.0.1:11211'
|
||||||
@ -105,7 +110,10 @@ class MemcacheMiddleware(object):
|
|||||||
allow_pickle=(serialization_format == 0),
|
allow_pickle=(serialization_format == 0),
|
||||||
allow_unpickle=(serialization_format <= 1),
|
allow_unpickle=(serialization_format <= 1),
|
||||||
max_conns=max_conns,
|
max_conns=max_conns,
|
||||||
logger=self.logger)
|
logger=self.logger,
|
||||||
|
error_limit_count=error_suppression_limit,
|
||||||
|
error_limit_time=error_suppression_interval,
|
||||||
|
error_limit_duration=error_suppression_interval)
|
||||||
|
|
||||||
def __call__(self, env, start_response):
|
def __call__(self, env, start_response):
|
||||||
env['swift.cache'] = self.memcache
|
env['swift.cache'] = self.memcache
|
||||||
|
@ -160,6 +160,16 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
app.memcache._client_cache['6.7.8.9:10'].max_size, 5)
|
app.memcache._client_cache['6.7.8.9:10'].max_size, 5)
|
||||||
|
|
||||||
|
def test_conf_inline_ratelimiting(self):
|
||||||
|
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
||||||
|
app = memcache.MemcacheMiddleware(
|
||||||
|
FakeApp(),
|
||||||
|
{'error_suppression_limit': '5',
|
||||||
|
'error_suppression_interval': '2.5'})
|
||||||
|
self.assertEqual(app.memcache._error_limit_count, 5)
|
||||||
|
self.assertEqual(app.memcache._error_limit_time, 2.5)
|
||||||
|
self.assertEqual(app.memcache._error_limit_duration, 2.5)
|
||||||
|
|
||||||
def test_conf_extra_no_section(self):
|
def test_conf_extra_no_section(self):
|
||||||
with mock.patch.object(memcache, 'ConfigParser',
|
with mock.patch.object(memcache, 'ConfigParser',
|
||||||
get_config_parser(section='foobar')):
|
get_config_parser(section='foobar')):
|
||||||
@ -336,6 +346,9 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
# tries is limited to server count
|
# tries is limited to server count
|
||||||
self.assertEqual(memcache_ring._tries, 4)
|
self.assertEqual(memcache_ring._tries, 4)
|
||||||
self.assertEqual(memcache_ring._io_timeout, 1.0)
|
self.assertEqual(memcache_ring._io_timeout, 1.0)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_count, 10)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_time, 60)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_duration, 60)
|
||||||
|
|
||||||
@with_tempdir
|
@with_tempdir
|
||||||
def test_real_memcache_config(self, tempdir):
|
def test_real_memcache_config(self, tempdir):
|
||||||
@ -363,6 +376,8 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
10.0.0.4:11211
|
10.0.0.4:11211
|
||||||
connect_timeout = 0.5
|
connect_timeout = 0.5
|
||||||
io_timeout = 1.0
|
io_timeout = 1.0
|
||||||
|
error_suppression_limit = 0
|
||||||
|
error_suppression_interval = 1.5
|
||||||
"""
|
"""
|
||||||
memcache_config_path = os.path.join(tempdir, 'memcache.conf')
|
memcache_config_path = os.path.join(tempdir, 'memcache.conf')
|
||||||
with open(memcache_config_path, 'w') as f:
|
with open(memcache_config_path, 'w') as f:
|
||||||
@ -376,6 +391,9 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertEqual(memcache_ring._tries, 3)
|
self.assertEqual(memcache_ring._tries, 3)
|
||||||
# memcache conf options are defaults
|
# memcache conf options are defaults
|
||||||
self.assertEqual(memcache_ring._io_timeout, 1.0)
|
self.assertEqual(memcache_ring._io_timeout, 1.0)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_count, 0)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_time, 1.5)
|
||||||
|
self.assertEqual(memcache_ring._error_limit_duration, 1.5)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -50,19 +50,23 @@ class MockedMemcachePool(memcached.MemcacheConnPool):
|
|||||||
|
|
||||||
|
|
||||||
class ExplodingMockMemcached(object):
|
class ExplodingMockMemcached(object):
|
||||||
|
should_explode = True
|
||||||
exploded = False
|
exploded = False
|
||||||
|
|
||||||
def sendall(self, string):
|
def sendall(self, string):
|
||||||
self.exploded = True
|
if self.should_explode:
|
||||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
self.exploded = True
|
||||||
|
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||||
|
|
||||||
def readline(self):
|
def readline(self):
|
||||||
self.exploded = True
|
if self.should_explode:
|
||||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
self.exploded = True
|
||||||
|
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||||
|
|
||||||
def read(self, size):
|
def read(self, size):
|
||||||
self.exploded = True
|
if self.should_explode:
|
||||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
self.exploded = True
|
||||||
|
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
pass
|
pass
|
||||||
@ -512,6 +516,119 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
||||||
[])
|
[])
|
||||||
|
|
||||||
|
def test_error_limiting(self):
|
||||||
|
memcache_client = memcached.MemcacheRing(
|
||||||
|
['1.2.3.4:11211', '1.2.3.5:11211'], logger=self.logger)
|
||||||
|
mock1 = ExplodingMockMemcached()
|
||||||
|
mock2 = ExplodingMockMemcached()
|
||||||
|
mock2.should_explode = False
|
||||||
|
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||||
|
[(mock2, mock2)] * 12)
|
||||||
|
memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool(
|
||||||
|
[(mock1, mock1)] * 12)
|
||||||
|
|
||||||
|
for _ in range(12):
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
# twelfth one skips .5 because of error limiting and goes straight
|
||||||
|
# to .4
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 11 + [
|
||||||
|
'Error limiting server 1.2.3.5:11211'
|
||||||
|
])
|
||||||
|
self.logger.clear()
|
||||||
|
|
||||||
|
mock2.should_explode = True
|
||||||
|
for _ in range(12):
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
# as we keep going, eventually .4 gets error limited, too
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 11 + [
|
||||||
|
'Error limiting server 1.2.3.4:11211'
|
||||||
|
])
|
||||||
|
self.logger.clear()
|
||||||
|
|
||||||
|
# continued requests just keep bypassing memcache
|
||||||
|
for _ in range(12):
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
||||||
|
|
||||||
|
# and get()s are all a "cache miss"
|
||||||
|
self.assertIsNone(memcache_client.get('some_key'))
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
||||||
|
|
||||||
|
def test_error_disabled(self):
|
||||||
|
memcache_client = memcached.MemcacheRing(
|
||||||
|
['1.2.3.4:11211'], logger=self.logger, error_limit_time=0)
|
||||||
|
mock1 = ExplodingMockMemcached()
|
||||||
|
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||||
|
[(mock1, mock1)] * 20)
|
||||||
|
|
||||||
|
for _ in range(20):
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
# twelfth one skips .5 because of error limiting and goes straight
|
||||||
|
# to .4
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 20)
|
||||||
|
|
||||||
|
def test_error_limiting_custom_config(self):
|
||||||
|
def do_calls(time_step, num_calls, **memcache_kwargs):
|
||||||
|
self.logger.clear()
|
||||||
|
memcache_client = memcached.MemcacheRing(
|
||||||
|
['1.2.3.5:11211'], logger=self.logger,
|
||||||
|
**memcache_kwargs)
|
||||||
|
mock1 = ExplodingMockMemcached()
|
||||||
|
memcache_client._client_cache['1.2.3.5:11211'] = \
|
||||||
|
MockedMemcachePool([(mock1, mock1)] * num_calls)
|
||||||
|
|
||||||
|
for n in range(num_calls):
|
||||||
|
with mock.patch.object(memcached.time, 'time',
|
||||||
|
return_value=time_step * n):
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
|
||||||
|
# with default error_limit_time of 60, one call per 5 secs, twelfth one
|
||||||
|
# triggers error limit
|
||||||
|
do_calls(5, 12)
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 11 + [
|
||||||
|
'Error limiting server 1.2.3.5:11211'
|
||||||
|
])
|
||||||
|
|
||||||
|
# with default error_limit_time of 60, one call per 6 secs, error limit
|
||||||
|
# is not triggered
|
||||||
|
do_calls(6, 20)
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 20)
|
||||||
|
|
||||||
|
# with error_limit_time of 66, one call per 6 secs, twelfth one
|
||||||
|
# triggers error limit
|
||||||
|
do_calls(6, 12, error_limit_time=66)
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 11 + [
|
||||||
|
'Error limiting server 1.2.3.5:11211'
|
||||||
|
])
|
||||||
|
|
||||||
|
# with error_limit_time of 70, one call per 6 secs, error_limit_count
|
||||||
|
# of 11, 13th call triggers error limit
|
||||||
|
do_calls(6, 13, error_limit_time=70, error_limit_count=11)
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
|
'[Errno 32] Broken pipe',
|
||||||
|
] * 12 + [
|
||||||
|
'Error limiting server 1.2.3.5:11211'
|
||||||
|
])
|
||||||
|
|
||||||
def test_delete(self):
|
def test_delete(self):
|
||||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
|
Loading…
Reference in New Issue
Block a user