From 7b4642567ae2c35dd0e8d07ea628a80d04ccc820 Mon Sep 17 00:00:00 2001 From: Jianjian Huo Date: Mon, 17 Apr 2023 19:08:32 +0100 Subject: [PATCH] Memcached: emit memcache timing metrics when exceptions raised Below new metrics will be added: memcached.[method].timeout.timing memcached.[method].conn_err.timing memcached.[method].errors.timing Also, MemcacheRing method and time spent in the function will be logged when Timeout or other exceptions are raised. Co-Authored-By: Alistair Coles Change-Id: I4d4f20b92b85255ac8bf66f2c830e691e64bbe47 --- swift/common/memcached.py | 92 +++++--- test/unit/common/test_memcached.py | 349 ++++++++++++++++++++++++----- 2 files changed, 358 insertions(+), 83 deletions(-) diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 22ec81c718..315ea9e205 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -48,7 +48,9 @@ import os import six import json import logging -import time +# the name of 'time' module is changed to 'tm', to avoid changing the +# signatures of member functions in this file. +import time as tm from bisect import bisect from eventlet.green import socket, ssl @@ -99,7 +101,7 @@ def sanitize_timeout(timeout): additional second), client beware. """ if timeout > (30 * 24 * 60 * 60): - timeout += time.time() + timeout += tm.time() return int(timeout) @@ -223,24 +225,38 @@ class MemcacheRing(object): def memcache_servers(self): return list(self._client_cache.keys()) - def _exception_occurred(self, server, e, key_prefix, action='talking', - sock=None, fp=None, got_connection=True): + def _exception_occurred(self, server, e, key_prefix, method, + conn_start_time, action='talking', sock=None, + fp=None, got_connection=True): if isinstance(e, Timeout): self.logger.error( "Timeout %(action)s to memcached: %(server)s" - ": with key_prefix %(key_prefix)s", - {'action': action, 'server': server, 'key_prefix': key_prefix}) + ": with key_prefix %(key_prefix)s, method %(method)s, " + "config_timeout %(config_timeout)s, time_spent %(time_spent)s", + {'action': action, 'server': server, + 'key_prefix': key_prefix, 'method': method, + 'config_timeout': e.seconds, + 'time_spent': tm.time() - conn_start_time}) + self.logger.timing_since( + 'memcached.' + method + '.timeout.timing', conn_start_time) elif isinstance(e, (socket.error, MemcacheConnectionError)): self.logger.error( "Error %(action)s to memcached: %(server)s: " - "with key_prefix %(key_prefix)s: %(err)s", - {'action': action, 'server': server, 'err': e, - 'key_prefix': key_prefix}) + "with key_prefix %(key_prefix)s, method %(method)s, " + "time_spent %(time_spent)s, %(err)s", + {'action': action, 'server': server, + 'key_prefix': key_prefix, 'method': method, + 'time_spent': tm.time() - conn_start_time, 'err': e}) + self.logger.timing_since( + 'memcached.' + method + '.conn_err.timing', conn_start_time) else: self.logger.exception("Error %(action)s to memcached: %(server)s" ": with key_prefix %(key_prefix)s", {'action': action, 'server': server, 'key_prefix': key_prefix}) + self.logger.timing_since( + 'memcached.' + method + '.errors.timing', conn_start_time) + try: if fp: fp.close() @@ -261,7 +277,7 @@ class MemcacheRing(object): if self._error_limit_time <= 0 or self._error_limit_duration <= 0: return - now = time.time() + now = tm.time() self._errors[server].append(now) if len(self._errors[server]) > self._error_limit_count: self._errors[server] = [err for err in self._errors[server] @@ -270,11 +286,12 @@ class MemcacheRing(object): self._error_limited[server] = now + self._error_limit_duration self.logger.error('Error limiting server %s', server) - def _get_conns(self, key_prefix, hash_key): + def _get_conns(self, method, key_prefix, hash_key): """ Retrieves a server conn from the pool, or connects a new one. Chooses the server based on a consistent hash of "key". + :param method: the name of memcache method. :param key_prefix: the prefix of user provided key. :param hash_key: the consistent hash of user key, or server key for set_multi and get_multi. @@ -289,7 +306,8 @@ class MemcacheRing(object): if server in served: continue served.append(server) - if self._error_limited[server] > time.time(): + pool_start_time = tm.time() + if self._error_limited[server] > pool_start_time: continue sock = None try: @@ -299,14 +317,15 @@ class MemcacheRing(object): yield server, fp, sock except MemcachePoolTimeout as e: self._exception_occurred( - server, e, key_prefix, action='getting a connection', - got_connection=False) + server, e, key_prefix, method, pool_start_time, + action='getting a connection', got_connection=False) except (Exception, Timeout) as e: # Typically a Timeout exception caught here is the one raised # by the create() method of this server's MemcacheConnPool # object. self._exception_occurred( - server, e, key_prefix, action='connecting', sock=sock) + server, e, key_prefix, method, pool_start_time, + action='connecting', sock=sock) if not any_yielded: self.logger.error('All memcached servers error-limited') @@ -346,7 +365,8 @@ class MemcacheRing(object): elif not isinstance(value, bytes): value = str(value).encode('utf-8') - for (server, fp, sock) in self._get_conns(key_prefix, hash_key): + for (server, fp, sock) in self._get_conns('set', key_prefix, hash_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(set_msg(hash_key, flags, timeout, value)) @@ -370,7 +390,8 @@ class MemcacheRing(object): return except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) + server, e, key_prefix, 'set', conn_start_time, + sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -389,7 +410,8 @@ class MemcacheRing(object): key_prefix = get_key_prefix(key) hash_key = md5hash(key) value = None - for (server, fp, sock) in self._get_conns(key_prefix, hash_key): + for (server, fp, sock) in self._get_conns('get', key_prefix, hash_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(b'get ' + hash_key + b'\r\n') @@ -412,7 +434,8 @@ class MemcacheRing(object): return value except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) + server, e, key_prefix, 'get', conn_start_time, + sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -442,7 +465,10 @@ class MemcacheRing(object): command = b'decr' delta = str(abs(int(delta))).encode('ascii') timeout = sanitize_timeout(time) - for (server, fp, sock) in self._get_conns(key_prefix, hash_key): + method = command.decode() + for (server, fp, sock) in self._get_conns(method, key_prefix, + hash_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(b' '.join([ @@ -474,8 +500,9 @@ class MemcacheRing(object): return ret except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) - raise MemcacheConnectionError("No Memcached connections succeeded.") + server, e, key_prefix, method, conn_start_time, + sock=sock, fp=fp) + raise MemcacheConnectionError("No memcached connections succeeded.") @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW) def decr(self, key, delta=1, time=0): @@ -505,7 +532,9 @@ class MemcacheRing(object): key_prefix = get_key_prefix(key) hash_key = md5hash(key) server_key = md5hash(server_key) if server_key else hash_key - for (server, fp, sock) in self._get_conns(key_prefix, server_key): + for (server, fp, sock) in self._get_conns('delete', key_prefix, + server_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(b'delete ' + hash_key + b'\r\n') @@ -515,7 +544,8 @@ class MemcacheRing(object): return except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) + server, e, key_prefix, 'delete', conn_start_time, + sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def set_multi(self, mapping, server_key, serialize=True, time=0, @@ -547,7 +577,9 @@ class MemcacheRing(object): value = json.dumps(value).encode('ascii') flags |= JSON_FLAG msg.append(set_msg(key, flags, timeout, value)) - for (server, fp, sock) in self._get_conns(key_prefix, hash_key): + for (server, fp, sock) in self._get_conns('set_multi', key_prefix, + hash_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(b''.join(msg)) @@ -558,7 +590,8 @@ class MemcacheRing(object): return except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) + server, e, key_prefix, 'set_multi', conn_start_time, + sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def get_multi(self, keys, server_key): @@ -573,7 +606,9 @@ class MemcacheRing(object): key_prefix = get_key_prefix(server_key) server_key = md5hash(server_key) hash_keys = [md5hash(key) for key in keys] - for (server, fp, sock) in self._get_conns(key_prefix, server_key): + for (server, fp, sock) in self._get_conns('get_multi', key_prefix, + server_key): + conn_start_time = tm.time() try: with Timeout(self._io_timeout): sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n') @@ -604,7 +639,8 @@ class MemcacheRing(object): return values except (Exception, Timeout) as e: self._exception_occurred( - server, e, key_prefix, sock=sock, fp=fp) + server, e, key_prefix, 'get_multi', conn_start_time, + sock=sock, fp=fp) def load_memcache(conf, logger): diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 7de13ff106..34968cc091 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -15,6 +15,7 @@ # limitations under the License. """Tests for swift.common.utils""" +import itertools from collections import defaultdict import errno import io @@ -235,7 +236,7 @@ class TestMemcached(unittest.TestCase): self.assertIs(client._client_cache[server]._tls_context, context) key = uuid4().hex.encode('ascii') - list(client._get_conns('test', key)) + list(client._get_conns('set', 'test', key)) context.wrap_socket.assert_called_once() def test_get_conns(self): @@ -257,7 +258,7 @@ class TestMemcached(unittest.TestCase): one = two = True while one or two: # Run until we match hosts one and two key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns('test', key): + for conn in memcache_client._get_conns('set', 'test', key): if 'b' not in getattr(conn[1], 'mode', ''): self.assertIsInstance(conn[1], ( io.RawIOBase, io.BufferedIOBase)) @@ -284,7 +285,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns('test', key): + for conn in memcache_client._get_conns('set', 'test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) self.assertEqual(peer_socket, server_socket) @@ -306,7 +307,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_host], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns('test', key): + for conn in memcache_client._get_conns('set', 'test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) self.assertEqual(peer_socket, server_socket) @@ -335,7 +336,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns('test', key): + for conn in memcache_client._get_conns('set', 'test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '%s:%s' % (peer_sockaddr[0], peer_sockaddr[1]) @@ -361,7 +362,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns('test', key): + for conn in memcache_client._get_conns('set', 'test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) @@ -551,20 +552,26 @@ class TestMemcached(unittest.TestCase): [(mock2, mock2)]) memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool( [(mock1, mock1), (mock1, mock1)]) - memcache_client.set('some_key', [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + memcache_client.set('some_key', [1, 2, 3]) self.assertEqual(mock1.exploded, True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) self.logger.clear() mock1.exploded = False - self.assertEqual(memcache_client.get('some_key'), [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + self.assertEqual(memcache_client.get('some_key'), [1, 2, 3]) self.assertEqual(mock1.exploded, True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method get, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) # Check that we really did call create() twice self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks, @@ -581,28 +588,35 @@ class TestMemcached(unittest.TestCase): memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool( [(mock1, mock1)] * 12) - for _ in range(12): - memcache_client.set('some_key', [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + for _ in range(12): + memcache_client.set('some_key', [1, 2, 3]) # twelfth one skips .5 because of error limiting and goes straight # to .4 self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 11 + [ 'Error limiting server 1.2.3.5:11211' ]) self.logger.clear() mock2.should_explode = True - for _ in range(12): - memcache_client.set('some_key', [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + for _ in range(12): + memcache_client.set('some_key', [1, 2, 3]) # as we keep going, eventually .4 gets error limited, too self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.4:11211', 'All memcached servers error-limited', ]) @@ -629,13 +643,16 @@ class TestMemcached(unittest.TestCase): memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( [(mock1, mock1)] * 20) - for _ in range(20): - memcache_client.set('some_key', [1, 2, 3]) + now = time.time() + with patch('time.time', return_value=now): + for _ in range(20): + memcache_client.set('some_key', [1, 2, 3]) # twelfth one skips .5 because of error limiting and goes straight # to .4 self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 20) def test_error_raising(self): @@ -646,51 +663,64 @@ class TestMemcached(unittest.TestCase): [(mock1, mock1)] * 20) # expect exception when requested... - with self.assertRaises(MemcacheConnectionError): - memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) + now = time.time() + with patch('time.time', return_value=now): + with self.assertRaises(MemcacheConnectionError): + memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) self.logger.clear() - with self.assertRaises(MemcacheConnectionError): - memcache_client.get('some_key', raise_on_error=True) + with patch('time.time', return_value=now): + with self.assertRaises(MemcacheConnectionError): + memcache_client.get('some_key', raise_on_error=True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method get, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) self.logger.clear() - with self.assertRaises(MemcacheConnectionError): - memcache_client.set( - 'shard-updating-v2/acc/container', [1, 2, 3], - raise_on_error=True) + with patch('time.time', return_value=now): + with self.assertRaises(MemcacheConnectionError): + memcache_client.set( + 'shard-updating-v2/acc/container', [1, 2, 3], + raise_on_error=True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe', + 'with key_prefix shard-updating-v2/acc, method set, ' + 'time_spent 0.0, [Errno 32] Broken pipe', ]) self.logger.clear() # ...but default is no exception - memcache_client.set('some_key', [1, 2, 3]) + with patch('time.time', return_value=now): + memcache_client.set('some_key', [1, 2, 3]) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) self.logger.clear() - memcache_client.get('some_key') + with patch('time.time', return_value=now): + memcache_client.get('some_key') self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method get, time_spent 0.0, ' + '[Errno 32] Broken pipe', ]) self.logger.clear() - memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3]) + with patch('time.time', return_value=now): + memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3]) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - 'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe', + 'with key_prefix shard-updating-v2/acc, method set, ' + 'time_spent 0.0, [Errno 32] Broken pipe', ]) def test_error_limiting_custom_config(self): @@ -704,53 +734,60 @@ class TestMemcached(unittest.TestCase): MockedMemcachePool([(mock1, mock1)] * num_calls) for n in range(num_calls): - with mock.patch.object(memcached.time, 'time', + with mock.patch.object(memcached.tm, 'time', return_value=time_step * n): memcache_client.set('some_key', [1, 2, 3]) # with default error_limit_time of 60, one call per 5 secs, twelfth one # triggers error limit - do_calls(5, 12) + do_calls(5.0, 12) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) # with default error_limit_time of 60, one call per 6 secs, error limit # is not triggered - do_calls(6, 20) + do_calls(6.0, 20) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 20) # with error_limit_time of 66, one call per 6 secs, twelfth one # triggers error limit - do_calls(6, 12, error_limit_time=66) + do_calls(6.0, 12, error_limit_time=66) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) # with error_limit_time of 70, one call per 6 secs, error_limit_count # of 11, 13th call triggers error limit - do_calls(6, 13, error_limit_time=70, error_limit_count=11) + do_calls(6.0, 13, error_limit_time=70, error_limit_count=11) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', ] * 11 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - 'with key_prefix some_key: [Errno 32] Broken pipe', + 'with key_prefix some_key, method set, time_spent 0.0, ' + '[Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) @@ -984,10 +1021,13 @@ class TestMemcached(unittest.TestCase): self.assertEqual(pending['1.2.3.5'], 8) self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8) - self.assertEqual( - self.logger.get_lines_for_level('error'), - ['Timeout getting a connection to memcached: 1.2.3.5:11211' - ': with key_prefix key'] * 8) + error_logs = self.logger.get_lines_for_level('error') + self.assertEqual(len(error_logs), 8) + for each_log in error_logs: + self.assertIn( + 'Timeout getting a connection to memcached: 1.2.3.5:11211: ' + 'with key_prefix key', + each_log) self.assertEqual(served['1.2.3.5'], 2) self.assertEqual(pending['1.2.3.4'], 0) self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0) @@ -1017,7 +1057,7 @@ class TestMemcached(unittest.TestCase): mock_sock.connect = wait_connect memcache_client = memcached.MemcacheRing( - ['1.2.3.4:11211'], connect_timeout=0.1) + ['1.2.3.4:11211'], connect_timeout=0.1, logger=self.logger) # sanity self.assertEqual(1, len(memcache_client._client_cache)) @@ -1027,7 +1067,7 @@ class TestMemcached(unittest.TestCase): # try to get connect and no connection found # so it will result in StopIteration conn_generator = memcache_client._get_conns( - 'key', md5hash(b'key')) + 'set', 'key', md5hash(b'key')) with self.assertRaises(StopIteration): next(conn_generator) @@ -1133,6 +1173,205 @@ class TestMemcached(unittest.TestCase): self.assertEqual('memcached.delete.timing', last_stats[0][0]) self.assertEqual(last_stats[0][1], 7000.99) + def test_operations_timing_stats_with_incr_exception(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_add(key, flags, exptime, num_bytes, noreply=b''): + raise Exception('add failed') + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_add', handle_add): + mock_time.return_value = 4000.99 + with self.assertRaises(MemcacheConnectionError): + memcache_client.incr('incr_key', delta=5) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.incr.errors.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4000.99) + self.assertEqual('Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix incr_key: ', + self.logger.get_lines_for_level('error')[0]) + + def test_operations_timing_stats_with_set_exception(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_set(key, flags, exptime, num_bytes, noreply=b''): + raise Exception('set failed') + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_set', handle_set): + mock_time.return_value = 4000.99 + with self.assertRaises(MemcacheConnectionError): + memcache_client.set( + 'set_key', [1, 2, 3], + raise_on_error=True) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.set.errors.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4000.99) + self.assertEqual('Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix set_key: ', + self.logger.get_lines_for_level('error')[0]) + + def test_operations_timing_stats_with_get_exception(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_get(*keys): + raise Exception('get failed') + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_get', handle_get): + mock_time.return_value = 4000.99 + with self.assertRaises(MemcacheConnectionError): + memcache_client.get('get_key', raise_on_error=True) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.get.errors.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4000.99) + self.assertEqual('Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix get_key: ', + self.logger.get_lines_for_level('error')[0]) + + def test_operations_timing_stats_with_get_error(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_get(*keys): + raise MemcacheConnectionError('failed to connect') + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_get', handle_get): + mock_time.return_value = 4000.99 + with self.assertRaises(MemcacheConnectionError): + memcache_client.get('get_key', raise_on_error=True) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.get.conn_err.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4000.99) + self.assertEqual('Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix get_key, method get, ' + 'time_spent 0.0, failed to connect', + self.logger.get_lines_for_level('error')[0]) + + def test_operations_timing_stats_with_incr_timeout(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + io_timeout=0.01, + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_add(key, flags, exptime, num_bytes, noreply=b''): + sleep(0.05) + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_add', handle_add): + mock_time.side_effect = itertools.count(4000.99, 1.0) + with self.assertRaises(MemcacheConnectionError): + memcache_client.incr('nvratelimit/v2/wf/124593', delta=5) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.incr.timeout.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4002.99) + error_logs = self.logger.get_lines_for_level('error') + self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ', + error_logs[0]) + self.assertIn( + 'with key_prefix nvratelimit/v2/wf, ', error_logs[0]) + self.assertIn('method incr, ', error_logs[0]) + self.assertIn( + 'config_timeout 0.01, time_spent 1.0', error_logs[0]) + + def test_operations_timing_stats_with_set_timeout(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + io_timeout=0.01, + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_set(key, flags, exptime, num_bytes, noreply=b''): + sleep(0.05) + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_set', handle_set): + mock_time.side_effect = itertools.count(4000.99, 1.0) + with self.assertRaises(MemcacheConnectionError): + memcache_client.set( + 'shard-updating-v2/acc/container', [1, 2, 3], + raise_on_error=True) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.set.timeout.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4002.99) + error_logs = self.logger.get_lines_for_level('error') + self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ', + error_logs[0]) + self.assertIn( + 'with key_prefix shard-updating-v2/acc, ', error_logs[0]) + self.assertIn('method set, ', error_logs[0]) + self.assertIn( + 'config_timeout 0.01, time_spent 1.0', error_logs[0]) + + def test_operations_timing_stats_with_get_timeout(self): + memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'], + io_timeout=0.01, + logger=self.logger) + mock_memcache = MockMemcached() + memcache_client._client_cache[ + '1.2.3.4:11211'] = MockedMemcachePool( + [(mock_memcache, mock_memcache)] * 2) + + def handle_get(*keys): + sleep(0.05) + + with patch('time.time', ) as mock_time: + with mock.patch.object(mock_memcache, 'handle_get', handle_get): + mock_time.side_effect = itertools.count(4000.99, 1.0) + with self.assertRaises(MemcacheConnectionError): + memcache_client.get( + 'shard-updating-v2/acc/container', raise_on_error=True) + self.assertTrue(self.logger.log_dict['timing_since']) + last_stats = self.logger.log_dict['timing_since'][-1] + self.assertEqual('memcached.get.timeout.timing', + last_stats[0][0]) + self.assertEqual(last_stats[0][1], 4002.99) + error_logs = self.logger.get_lines_for_level('error') + self.assertIn('Timeout talking to memcached: 1.2.3.4:11211: ', + error_logs[0]) + self.assertIn( + 'with key_prefix shard-updating-v2/acc, ', error_logs[0]) + self.assertIn('method get, ', error_logs[0]) + self.assertIn( + 'config_timeout 0.01, time_spent 1.0', error_logs[0]) + class ExcConfigParser(object):