Merge "memcached: Plumb logger into MemcacheRing"
This commit is contained in:
commit
3c0a448f06
@ -160,7 +160,7 @@ class MemcacheRing(object):
|
||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
||||
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||
max_conns=2):
|
||||
max_conns=2, logger=None):
|
||||
self._ring = {}
|
||||
self._errors = dict(((serv, []) for serv in servers))
|
||||
self._error_limited = dict(((serv, 0) for serv in servers))
|
||||
@ -178,18 +178,23 @@ class MemcacheRing(object):
|
||||
self._pool_timeout = pool_timeout
|
||||
self._allow_pickle = allow_pickle
|
||||
self._allow_unpickle = allow_unpickle or allow_pickle
|
||||
if logger is None:
|
||||
self.logger = logging.getLogger()
|
||||
else:
|
||||
self.logger = logger
|
||||
|
||||
def _exception_occurred(self, server, e, action='talking',
|
||||
sock=None, fp=None, got_connection=True):
|
||||
if isinstance(e, Timeout):
|
||||
logging.error("Timeout %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||
logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e})
|
||||
else:
|
||||
logging.exception("Error %(action)s to memcached: %(server)s",
|
||||
self.logger.error("Timeout %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||
self.logger.error(
|
||||
"Error %(action)s to memcached: %(server)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e})
|
||||
else:
|
||||
self.logger.exception("Error %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
try:
|
||||
if fp:
|
||||
fp.close()
|
||||
@ -213,7 +218,7 @@ class MemcacheRing(object):
|
||||
if err > now - ERROR_LIMIT_TIME]
|
||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
||||
self._error_limited[server] = now + ERROR_LIMIT_DURATION
|
||||
logging.error('Error limiting server %s', server)
|
||||
self.logger.error('Error limiting server %s', server)
|
||||
|
||||
def _get_conns(self, key):
|
||||
"""
|
||||
|
@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
||||
|
||||
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
|
||||
IO_TIMEOUT, TRY_COUNT)
|
||||
from swift.common.utils import get_logger
|
||||
|
||||
|
||||
class MemcacheMiddleware(object):
|
||||
@ -28,6 +29,7 @@ class MemcacheMiddleware(object):
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route='memcache')
|
||||
self.memcache_servers = conf.get('memcache_servers')
|
||||
serialization_format = conf.get('memcache_serialization_support')
|
||||
try:
|
||||
@ -102,7 +104,8 @@ class MemcacheMiddleware(object):
|
||||
io_timeout=io_timeout,
|
||||
allow_pickle=(serialization_format == 0),
|
||||
allow_unpickle=(serialization_format <= 1),
|
||||
max_conns=max_conns)
|
||||
max_conns=max_conns,
|
||||
logger=self.logger)
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
env['swift.cache'] = self.memcache
|
||||
|
@ -20,6 +20,7 @@ from collections import defaultdict
|
||||
import errno
|
||||
from hashlib import md5
|
||||
import io
|
||||
import logging
|
||||
import six
|
||||
import socket
|
||||
import time
|
||||
@ -184,9 +185,14 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
patcher = mock.patch('swift.common.memcached.logging', self.logger)
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher.start()
|
||||
|
||||
def test_logger_kwarg(self):
|
||||
server_socket = '%s:%s' % ('[::1]', 11211)
|
||||
client = memcached.MemcacheRing([server_socket])
|
||||
self.assertIs(client.logger, logging.getLogger())
|
||||
|
||||
client = memcached.MemcacheRing([server_socket], logger=self.logger)
|
||||
self.assertIs(client.logger, self.logger)
|
||||
|
||||
def test_get_conns(self):
|
||||
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@ -202,7 +208,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock2ipport = '%s:%s' % (sock2ip, memcached.DEFAULT_MEMCACHED_PORT)
|
||||
# We're deliberately using sock2ip (no port) here to test that the
|
||||
# default port is used.
|
||||
memcache_client = memcached.MemcacheRing([sock1ipport, sock2ip])
|
||||
memcache_client = memcached.MemcacheRing([sock1ipport, sock2ip],
|
||||
logger=self.logger)
|
||||
one = two = True
|
||||
while one or two: # Run until we match hosts one and two
|
||||
key = uuid4().hex.encode('ascii')
|
||||
@ -230,7 +237,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock.listen(1)
|
||||
sock_addr = sock.getsockname()
|
||||
server_socket = '[%s]:%s' % (sock_addr[0], sock_addr[1])
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -251,7 +259,8 @@ class TestMemcached(unittest.TestCase):
|
||||
server_socket = '[%s]:%s' % (sock_addr[0], sock_addr[1])
|
||||
server_host = '[%s]' % sock_addr[0]
|
||||
memcached.DEFAULT_MEMCACHED_PORT = sock_addr[1]
|
||||
memcache_client = memcached.MemcacheRing([server_host])
|
||||
memcache_client = memcached.MemcacheRing([server_host],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -265,7 +274,7 @@ class TestMemcached(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
# IPv6 address with missing [] is invalid
|
||||
server_socket = '%s:%s' % ('::1', 11211)
|
||||
memcached.MemcacheRing([server_socket])
|
||||
memcached.MemcacheRing([server_socket], logger=self.logger)
|
||||
|
||||
def test_get_conns_hostname(self):
|
||||
with patch('swift.common.memcached.socket.getaddrinfo') as addrinfo:
|
||||
@ -279,7 +288,8 @@ class TestMemcached(unittest.TestCase):
|
||||
addrinfo.return_value = [(socket.AF_INET,
|
||||
socket.SOCK_STREAM, 0, '',
|
||||
('127.0.0.1', sock_addr[1]))]
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -304,7 +314,8 @@ class TestMemcached(unittest.TestCase):
|
||||
addrinfo.return_value = [(socket.AF_INET6,
|
||||
socket.SOCK_STREAM, 0, '',
|
||||
('::1', sock_addr[1]))]
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -317,7 +328,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock.close()
|
||||
|
||||
def test_set_get_json(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -350,7 +362,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1)
|
||||
|
||||
def test_get_failed_connection_mid_request(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -365,18 +378,17 @@ class TestMemcached(unittest.TestCase):
|
||||
# force the logging through the DebugLogger instead of the nose
|
||||
# handler. This will use stdout, so we can assert that no stack trace
|
||||
# is logged.
|
||||
logger = debug_logger()
|
||||
with patch("sys.stdout", fake_stdout),\
|
||||
patch('swift.common.memcached.logging', logger):
|
||||
with patch("sys.stdout", fake_stdout):
|
||||
mock.read_return_empty_str = True
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error talking to memcached', log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertNotIn("Traceback", fake_stdout.getvalue())
|
||||
|
||||
def test_incr(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -396,7 +408,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertTrue(mock.close_called)
|
||||
|
||||
def test_incr_failed_connection_mid_request(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -411,19 +424,18 @@ class TestMemcached(unittest.TestCase):
|
||||
# force the logging through the DebugLogger instead of the nose
|
||||
# handler. This will use stdout, so we can assert that no stack trace
|
||||
# is logged.
|
||||
logger = debug_logger()
|
||||
with patch("sys.stdout", fake_stdout), \
|
||||
patch('swift.common.memcached.logging', logger):
|
||||
with patch("sys.stdout", fake_stdout):
|
||||
mock.read_return_empty_str = True
|
||||
self.assertRaises(memcached.MemcacheConnectionError,
|
||||
memcache_client.incr, 'some_key', delta=1)
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error talking to memcached', log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertNotIn('Traceback', fake_stdout.getvalue())
|
||||
|
||||
def test_incr_w_timeout(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -455,7 +467,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(mock.cache, {cache_key: (b'0', b'0', b'10')})
|
||||
|
||||
def test_decr(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -473,7 +486,7 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_retry(self):
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211', '1.2.3.5:11211'])
|
||||
['1.2.3.4:11211', '1.2.3.5:11211'], logger=self.logger)
|
||||
mock1 = ExplodingMockMemcached()
|
||||
mock2 = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
@ -500,7 +513,8 @@ class TestMemcached(unittest.TestCase):
|
||||
[])
|
||||
|
||||
def test_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -510,7 +524,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
|
||||
def test_multi(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -560,7 +575,8 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_multi_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'])
|
||||
'1.2.3.5:11211'],
|
||||
logger=self.logger)
|
||||
mock1 = MockMemcached()
|
||||
mock2 = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
@ -598,7 +614,8 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_serialization(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
allow_pickle=True)
|
||||
allow_pickle=True,
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -643,7 +660,8 @@ class TestMemcached(unittest.TestCase):
|
||||
mock_sock.connect = wait_connect
|
||||
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
connect_timeout=10)
|
||||
connect_timeout=10,
|
||||
logger=self.logger)
|
||||
# sanity
|
||||
self.assertEqual(1, len(memcache_client._client_cache))
|
||||
for server, pool in memcache_client._client_cache.items():
|
||||
@ -702,7 +720,8 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'],
|
||||
io_timeout=0.5,
|
||||
pool_timeout=0.1)
|
||||
pool_timeout=0.1,
|
||||
logger=self.logger)
|
||||
|
||||
# Hand out a couple slow connections to 1.2.3.5, leaving 1.2.3.4
|
||||
# fast. All ten (10) clients should try to talk to .5 first, and
|
||||
|
Loading…
x
Reference in New Issue
Block a user