Merge "Memcached: emit memcache timing metrics when exceptions raised"
This commit is contained in:
@@ -48,7 +48,9 @@ import os
|
||||
import six
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
# the name of 'time' module is changed to 'tm', to avoid changing the
|
||||
# signatures of member functions in this file.
|
||||
import time as tm
|
||||
from bisect import bisect
|
||||
|
||||
from eventlet.green import socket, ssl
|
||||
@@ -99,7 +101,7 @@ def sanitize_timeout(timeout):
|
||||
additional second), client beware.
|
||||
"""
|
||||
if timeout > (30 * 24 * 60 * 60):
|
||||
timeout += time.time()
|
||||
timeout += tm.time()
|
||||
return int(timeout)
|
||||
|
||||
|
||||
@@ -223,24 +225,38 @@ class MemcacheRing(object):
|
||||
def memcache_servers(self):
|
||||
return list(self._client_cache.keys())
|
||||
|
||||
def _exception_occurred(self, server, e, key_prefix, action='talking',
|
||||
sock=None, fp=None, got_connection=True):
|
||||
def _exception_occurred(self, server, e, key_prefix, method,
|
||||
conn_start_time, action='talking', sock=None,
|
||||
fp=None, got_connection=True):
|
||||
if isinstance(e, Timeout):
|
||||
self.logger.error(
|
||||
"Timeout %(action)s to memcached: %(server)s"
|
||||
": with key_prefix %(key_prefix)s",
|
||||
{'action': action, 'server': server, 'key_prefix': key_prefix})
|
||||
": with key_prefix %(key_prefix)s, method %(method)s, "
|
||||
"config_timeout %(config_timeout)s, time_spent %(time_spent)s",
|
||||
{'action': action, 'server': server,
|
||||
'key_prefix': key_prefix, 'method': method,
|
||||
'config_timeout': e.seconds,
|
||||
'time_spent': tm.time() - conn_start_time})
|
||||
self.logger.timing_since(
|
||||
'memcached.' + method + '.timeout.timing', conn_start_time)
|
||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||
self.logger.error(
|
||||
"Error %(action)s to memcached: %(server)s: "
|
||||
"with key_prefix %(key_prefix)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e,
|
||||
'key_prefix': key_prefix})
|
||||
"with key_prefix %(key_prefix)s, method %(method)s, "
|
||||
"time_spent %(time_spent)s, %(err)s",
|
||||
{'action': action, 'server': server,
|
||||
'key_prefix': key_prefix, 'method': method,
|
||||
'time_spent': tm.time() - conn_start_time, 'err': e})
|
||||
self.logger.timing_since(
|
||||
'memcached.' + method + '.conn_err.timing', conn_start_time)
|
||||
else:
|
||||
self.logger.exception("Error %(action)s to memcached: %(server)s"
|
||||
": with key_prefix %(key_prefix)s",
|
||||
{'action': action, 'server': server,
|
||||
'key_prefix': key_prefix})
|
||||
self.logger.timing_since(
|
||||
'memcached.' + method + '.errors.timing', conn_start_time)
|
||||
|
||||
try:
|
||||
if fp:
|
||||
fp.close()
|
||||
@@ -261,7 +277,7 @@ class MemcacheRing(object):
|
||||
if self._error_limit_time <= 0 or self._error_limit_duration <= 0:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
now = tm.time()
|
||||
self._errors[server].append(now)
|
||||
if len(self._errors[server]) > self._error_limit_count:
|
||||
self._errors[server] = [err for err in self._errors[server]
|
||||
@@ -270,11 +286,12 @@ class MemcacheRing(object):
|
||||
self._error_limited[server] = now + self._error_limit_duration
|
||||
self.logger.error('Error limiting server %s', server)
|
||||
|
||||
def _get_conns(self, key_prefix, hash_key):
|
||||
def _get_conns(self, method, key_prefix, hash_key):
|
||||
"""
|
||||
Retrieves a server conn from the pool, or connects a new one.
|
||||
Chooses the server based on a consistent hash of "key".
|
||||
|
||||
:param method: the name of memcache method.
|
||||
:param key_prefix: the prefix of user provided key.
|
||||
:param hash_key: the consistent hash of user key, or server key for
|
||||
set_multi and get_multi.
|
||||
@@ -289,7 +306,8 @@ class MemcacheRing(object):
|
||||
if server in served:
|
||||
continue
|
||||
served.append(server)
|
||||
if self._error_limited[server] > time.time():
|
||||
pool_start_time = tm.time()
|
||||
if self._error_limited[server] > pool_start_time:
|
||||
continue
|
||||
sock = None
|
||||
try:
|
||||
@@ -299,14 +317,15 @@ class MemcacheRing(object):
|
||||
yield server, fp, sock
|
||||
except MemcachePoolTimeout as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, action='getting a connection',
|
||||
got_connection=False)
|
||||
server, e, key_prefix, method, pool_start_time,
|
||||
action='getting a connection', got_connection=False)
|
||||
except (Exception, Timeout) as e:
|
||||
# Typically a Timeout exception caught here is the one raised
|
||||
# by the create() method of this server's MemcacheConnPool
|
||||
# object.
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, action='connecting', sock=sock)
|
||||
server, e, key_prefix, method, pool_start_time,
|
||||
action='connecting', sock=sock)
|
||||
if not any_yielded:
|
||||
self.logger.error('All memcached servers error-limited')
|
||||
|
||||
@@ -346,7 +365,8 @@ class MemcacheRing(object):
|
||||
elif not isinstance(value, bytes):
|
||||
value = str(value).encode('utf-8')
|
||||
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||
for (server, fp, sock) in self._get_conns('set', key_prefix, hash_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(set_msg(hash_key, flags, timeout, value))
|
||||
@@ -370,7 +390,8 @@ class MemcacheRing(object):
|
||||
return
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
server, e, key_prefix, 'set', conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError(
|
||||
"No memcached connections succeeded.")
|
||||
@@ -389,7 +410,8 @@ class MemcacheRing(object):
|
||||
key_prefix = get_key_prefix(key)
|
||||
hash_key = md5hash(key)
|
||||
value = None
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||
for (server, fp, sock) in self._get_conns('get', key_prefix, hash_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(b'get ' + hash_key + b'\r\n')
|
||||
@@ -412,7 +434,8 @@ class MemcacheRing(object):
|
||||
return value
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
server, e, key_prefix, 'get', conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError(
|
||||
"No memcached connections succeeded.")
|
||||
@@ -442,7 +465,10 @@ class MemcacheRing(object):
|
||||
command = b'decr'
|
||||
delta = str(abs(int(delta))).encode('ascii')
|
||||
timeout = sanitize_timeout(time)
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||
method = command.decode()
|
||||
for (server, fp, sock) in self._get_conns(method, key_prefix,
|
||||
hash_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(b' '.join([
|
||||
@@ -474,8 +500,9 @@ class MemcacheRing(object):
|
||||
return ret
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
||||
server, e, key_prefix, method, conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
raise MemcacheConnectionError("No memcached connections succeeded.")
|
||||
|
||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
|
||||
def decr(self, key, delta=1, time=0):
|
||||
@@ -505,7 +532,9 @@ class MemcacheRing(object):
|
||||
key_prefix = get_key_prefix(key)
|
||||
hash_key = md5hash(key)
|
||||
server_key = md5hash(server_key) if server_key else hash_key
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, server_key):
|
||||
for (server, fp, sock) in self._get_conns('delete', key_prefix,
|
||||
server_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(b'delete ' + hash_key + b'\r\n')
|
||||
@@ -515,7 +544,8 @@ class MemcacheRing(object):
|
||||
return
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
server, e, key_prefix, 'delete', conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
|
||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||
def set_multi(self, mapping, server_key, serialize=True, time=0,
|
||||
@@ -547,7 +577,9 @@ class MemcacheRing(object):
|
||||
value = json.dumps(value).encode('ascii')
|
||||
flags |= JSON_FLAG
|
||||
msg.append(set_msg(key, flags, timeout, value))
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||
for (server, fp, sock) in self._get_conns('set_multi', key_prefix,
|
||||
hash_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(b''.join(msg))
|
||||
@@ -558,7 +590,8 @@ class MemcacheRing(object):
|
||||
return
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
server, e, key_prefix, 'set_multi', conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
|
||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||
def get_multi(self, keys, server_key):
|
||||
@@ -573,7 +606,9 @@ class MemcacheRing(object):
|
||||
key_prefix = get_key_prefix(server_key)
|
||||
server_key = md5hash(server_key)
|
||||
hash_keys = [md5hash(key) for key in keys]
|
||||
for (server, fp, sock) in self._get_conns(key_prefix, server_key):
|
||||
for (server, fp, sock) in self._get_conns('get_multi', key_prefix,
|
||||
server_key):
|
||||
conn_start_time = tm.time()
|
||||
try:
|
||||
with Timeout(self._io_timeout):
|
||||
sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n')
|
||||
@@ -604,7 +639,8 @@ class MemcacheRing(object):
|
||||
return values
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(
|
||||
server, e, key_prefix, sock=sock, fp=fp)
|
||||
server, e, key_prefix, 'get_multi', conn_start_time,
|
||||
sock=sock, fp=fp)
|
||||
|
||||
|
||||
def load_memcache(conf, logger):
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""Tests for swift.common.utils"""
|
||||
import itertools
|
||||
from collections import defaultdict
|
||||
import errno
|
||||
import io
|
||||
@@ -235,7 +236,7 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertIs(client._client_cache[server]._tls_context, context)
|
||||
|
||||
key = uuid4().hex.encode('ascii')
|
||||
list(client._get_conns('test', key))
|
||||
list(client._get_conns('set', 'test', key))
|
||||
context.wrap_socket.assert_called_once()
|
||||
|
||||
def test_get_conns(self):
|
||||
@@ -257,7 +258,7 @@ class TestMemcached(unittest.TestCase):
|
||||
one = two = True
|
||||
while one or two: # Run until we match hosts one and two
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns('test', key):
|
||||
for conn in memcache_client._get_conns('set', 'test', key):
|
||||
if 'b' not in getattr(conn[1], 'mode', ''):
|
||||
self.assertIsInstance(conn[1], (
|
||||
io.RawIOBase, io.BufferedIOBase))
|
||||
@@ -284,7 +285,7 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns('test', key):
|
||||
for conn in memcache_client._get_conns('set', 'test', key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
||||
self.assertEqual(peer_socket, server_socket)
|
||||
@@ -306,7 +307,7 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing([server_host],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns('test', key):
|
||||
for conn in memcache_client._get_conns('set', 'test', key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
||||
self.assertEqual(peer_socket, server_socket)
|
||||
@@ -335,7 +336,7 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns('test', key):
|
||||
for conn in memcache_client._get_conns('set', 'test', key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
peer_socket = '%s:%s' % (peer_sockaddr[0],
|
||||
peer_sockaddr[1])
|
||||
@@ -361,7 +362,7 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns('test', key):
|
||||
for conn in memcache_client._get_conns('set', 'test', key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0],
|
||||
peer_sockaddr[1])
|
||||
@@ -551,20 +552,26 @@ class TestMemcached(unittest.TestCase):
|
||||
[(mock2, mock2)])
|
||||
memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool(
|
||||
[(mock1, mock1), (mock1, mock1)])
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(mock1.exploded, True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
|
||||
self.logger.clear()
|
||||
mock1.exploded = False
|
||||
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
|
||||
self.assertEqual(mock1.exploded, True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method get, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
# Check that we really did call create() twice
|
||||
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
||||
@@ -581,28 +588,35 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool(
|
||||
[(mock1, mock1)] * 12)
|
||||
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
# twelfth one skips .5 because of error limiting and goes straight
|
||||
# to .4
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 11 + [
|
||||
'Error limiting server 1.2.3.5:11211'
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
mock2.should_explode = True
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
# as we keep going, eventually .4 gets error limited, too
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.4:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
@@ -629,13 +643,16 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock1, mock1)] * 20)
|
||||
|
||||
for _ in range(20):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
for _ in range(20):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
# twelfth one skips .5 because of error limiting and goes straight
|
||||
# to .4
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 20)
|
||||
|
||||
def test_error_raising(self):
|
||||
@@ -646,51 +663,64 @@ class TestMemcached(unittest.TestCase):
|
||||
[(mock1, mock1)] * 20)
|
||||
|
||||
# expect exception when requested...
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
|
||||
now = time.time()
|
||||
with patch('time.time', return_value=now):
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get('some_key', raise_on_error=True)
|
||||
with patch('time.time', return_value=now):
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get('some_key', raise_on_error=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method get, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set(
|
||||
'shard-updating-v2/acc/container', [1, 2, 3],
|
||||
raise_on_error=True)
|
||||
with patch('time.time', return_value=now):
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set(
|
||||
'shard-updating-v2/acc/container', [1, 2, 3],
|
||||
raise_on_error=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
|
||||
'with key_prefix shard-updating-v2/acc, method set, '
|
||||
'time_spent 0.0, [Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
# ...but default is no exception
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
memcache_client.get('some_key')
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.get('some_key')
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method get, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3])
|
||||
with patch('time.time', return_value=now):
|
||||
memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
|
||||
'with key_prefix shard-updating-v2/acc, method set, '
|
||||
'time_spent 0.0, [Errno 32] Broken pipe',
|
||||
])
|
||||
|
||||
def test_error_limiting_custom_config(self):
|
||||
@@ -704,53 +734,60 @@ class TestMemcached(unittest.TestCase):
|
||||
MockedMemcachePool([(mock1, mock1)] * num_calls)
|
||||
|
||||
for n in range(num_calls):
|
||||
with mock.patch.object(memcached.time, 'time',
|
||||
with mock.patch.object(memcached.tm, 'time',
|
||||
return_value=time_step * n):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
|
||||
# with default error_limit_time of 60, one call per 5 secs, twelfth one
|
||||
# triggers error limit
|
||||
do_calls(5, 12)
|
||||
do_calls(5.0, 12)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
# with default error_limit_time of 60, one call per 6 secs, error limit
|
||||
# is not triggered
|
||||
do_calls(6, 20)
|
||||
do_calls(6.0, 20)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 20)
|
||||
|
||||
# with error_limit_time of 66, one call per 6 secs, twelfth one
|
||||
# triggers error limit
|
||||
do_calls(6, 12, error_limit_time=66)
|
||||
do_calls(6.0, 12, error_limit_time=66)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
# with error_limit_time of 70, one call per 6 secs, error_limit_count
|
||||
# of 11, 13th call triggers error limit
|
||||
do_calls(6, 13, error_limit_time=70, error_limit_count=11)
|
||||
do_calls(6.0, 13, error_limit_time=70, error_limit_count=11)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 11 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||
'with key_prefix some_key, method set, time_spent 0.0, '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
@@ -984,10 +1021,13 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
self.assertEqual(pending['1.2.3.5'], 8)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('error'),
|
||||
['Timeout getting a connection to memcached: 1.2.3.5:11211'
|
||||
': with key_prefix key'] * 8)
|
||||
error_logs = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(len(error_logs), 8)
|
||||
for each_log in error_logs:
|
||||
self.assertIn(
|
||||
'Timeout getting a connection to memcached: 1.2.3.5:11211: '
|
||||
'with key_prefix key',
|
||||
each_log)
|
||||
self.assertEqual(served['1.2.3.5'], 2)
|
||||
self.assertEqual(pending['1.2.3.4'], 0)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||
@@ -1017,7 +1057,7 @@ class TestMemcached(unittest.TestCase):
|
||||
mock_sock.connect = wait_connect
|
||||
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211'], connect_timeout=0.1)
|
||||
['1.2.3.4:11211'], connect_timeout=0.1, logger=self.logger)
|
||||
|
||||
# sanity
|
||||
self.assertEqual(1, len(memcache_client._client_cache))
|
||||
@@ -1027,7 +1067,7 @@ class TestMemcached(unittest.TestCase):
|
||||
# try to get connect and no connection found
|
||||
# so it will result in StopIteration
|
||||
conn_generator = memcache_client._get_conns(
|
||||
'key', md5hash(b'key'))
|
||||
'set', 'key', md5hash(b'key'))
|
||||
with self.assertRaises(StopIteration):
|
||||
next(conn_generator)
|
||||
|
||||
@@ -1133,6 +1173,205 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual('memcached.delete.timing', last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 7000.99)
|
||||
|
||||
def test_operations_timing_stats_with_incr_exception(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_add(key, flags, exptime, num_bytes, noreply=b''):
|
||||
raise Exception('add failed')
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_add', handle_add):
|
||||
mock_time.return_value = 4000.99
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.incr('incr_key', delta=5)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.incr.errors.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4000.99)
|
||||
self.assertEqual('Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix incr_key: ',
|
||||
self.logger.get_lines_for_level('error')[0])
|
||||
|
||||
def test_operations_timing_stats_with_set_exception(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_set(key, flags, exptime, num_bytes, noreply=b''):
|
||||
raise Exception('set failed')
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_set', handle_set):
|
||||
mock_time.return_value = 4000.99
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set(
|
||||
'set_key', [1, 2, 3],
|
||||
raise_on_error=True)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.set.errors.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4000.99)
|
||||
self.assertEqual('Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix set_key: ',
|
||||
self.logger.get_lines_for_level('error')[0])
|
||||
|
||||
def test_operations_timing_stats_with_get_exception(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_get(*keys):
|
||||
raise Exception('get failed')
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_get', handle_get):
|
||||
mock_time.return_value = 4000.99
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get('get_key', raise_on_error=True)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.get.errors.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4000.99)
|
||||
self.assertEqual('Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix get_key: ',
|
||||
self.logger.get_lines_for_level('error')[0])
|
||||
|
||||
def test_operations_timing_stats_with_get_error(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_get(*keys):
|
||||
raise MemcacheConnectionError('failed to connect')
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_get', handle_get):
|
||||
mock_time.return_value = 4000.99
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get('get_key', raise_on_error=True)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.get.conn_err.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4000.99)
|
||||
self.assertEqual('Error talking to memcached: 1.2.3.4:11211: '
|
||||
'with key_prefix get_key, method get, '
|
||||
'time_spent 0.0, failed to connect',
|
||||
self.logger.get_lines_for_level('error')[0])
|
||||
|
||||
def test_operations_timing_stats_with_incr_timeout(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
io_timeout=0.01,
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_add(key, flags, exptime, num_bytes, noreply=b''):
|
||||
sleep(0.05)
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_add', handle_add):
|
||||
mock_time.side_effect = itertools.count(4000.99, 1.0)
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.incr('nvratelimit/v2/wf/124593', delta=5)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.incr.timeout.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4002.99)
|
||||
error_logs = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ',
|
||||
error_logs[0])
|
||||
self.assertIn(
|
||||
'with key_prefix nvratelimit/v2/wf, ', error_logs[0])
|
||||
self.assertIn('method incr, ', error_logs[0])
|
||||
self.assertIn(
|
||||
'config_timeout 0.01, time_spent 1.0', error_logs[0])
|
||||
|
||||
def test_operations_timing_stats_with_set_timeout(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
io_timeout=0.01,
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_set(key, flags, exptime, num_bytes, noreply=b''):
|
||||
sleep(0.05)
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_set', handle_set):
|
||||
mock_time.side_effect = itertools.count(4000.99, 1.0)
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set(
|
||||
'shard-updating-v2/acc/container', [1, 2, 3],
|
||||
raise_on_error=True)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.set.timeout.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4002.99)
|
||||
error_logs = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ',
|
||||
error_logs[0])
|
||||
self.assertIn(
|
||||
'with key_prefix shard-updating-v2/acc, ', error_logs[0])
|
||||
self.assertIn('method set, ', error_logs[0])
|
||||
self.assertIn(
|
||||
'config_timeout 0.01, time_spent 1.0', error_logs[0])
|
||||
|
||||
def test_operations_timing_stats_with_get_timeout(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
io_timeout=0.01,
|
||||
logger=self.logger)
|
||||
mock_memcache = MockMemcached()
|
||||
memcache_client._client_cache[
|
||||
'1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock_memcache, mock_memcache)] * 2)
|
||||
|
||||
def handle_get(*keys):
|
||||
sleep(0.05)
|
||||
|
||||
with patch('time.time', ) as mock_time:
|
||||
with mock.patch.object(mock_memcache, 'handle_get', handle_get):
|
||||
mock_time.side_effect = itertools.count(4000.99, 1.0)
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get(
|
||||
'shard-updating-v2/acc/container', raise_on_error=True)
|
||||
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||
self.assertEqual('memcached.get.timeout.timing',
|
||||
last_stats[0][0])
|
||||
self.assertEqual(last_stats[0][1], 4002.99)
|
||||
error_logs = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ',
|
||||
error_logs[0])
|
||||
self.assertIn(
|
||||
'with key_prefix shard-updating-v2/acc, ', error_logs[0])
|
||||
self.assertIn('method get, ', error_logs[0])
|
||||
self.assertIn(
|
||||
'config_timeout 0.01, time_spent 1.0', error_logs[0])
|
||||
|
||||
|
||||
class ExcConfigParser(object):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user