Ensure close socket for memcached if got timeout
Currently MemcacheRing attempts to close socket when Timeout occured at creating connection in MemcacheConnPool. However unfortunately it could not because if Timeout happened, the sock variable is not assigned and None value (initial assignment) will be fed into _exception_occured method. This patch added a unit test to confirm the socket.close() called and fixes the behavior with wrapping try/except in MemcacheConnPool.create to call close() method. Change-Id: I7959ab96682f4a459a99c4f92463e1280372f6ef
This commit is contained in:
parent
27478f07c3
commit
439fbbdc7b
|
@ -140,11 +140,15 @@ class MemcacheConnPool(Pool):
|
|||
family, socktype, proto, canonname, sockaddr = addrs[0]
|
||||
sock = socket.socket(family, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
with Timeout(self._connect_timeout):
|
||||
sock.connect(sockaddr)
|
||||
if self._tls_context:
|
||||
sock = self._tls_context.wrap_socket(sock,
|
||||
server_hostname=self.host)
|
||||
try:
|
||||
with Timeout(self._connect_timeout):
|
||||
sock.connect(sockaddr)
|
||||
if self._tls_context:
|
||||
sock = self._tls_context.wrap_socket(sock,
|
||||
server_hostname=self.host)
|
||||
except (Exception, Timeout):
|
||||
sock.close()
|
||||
raise
|
||||
return (sock.makefile('rwb'), sock)
|
||||
|
||||
def get(self):
|
||||
|
@ -237,6 +241,8 @@ class MemcacheRing(object):
|
|||
"""
|
||||
Retrieves a server conn from the pool, or connects a new one.
|
||||
Chooses the server based on a consistent hash of "key".
|
||||
|
||||
:return: generator to serve memcached connection
|
||||
"""
|
||||
pos = bisect(self._sorted, key)
|
||||
served = []
|
||||
|
|
|
@ -912,6 +912,41 @@ class TestMemcached(unittest.TestCase):
|
|||
self.assertEqual(connections['1.2.3.5'].qsize(), 2)
|
||||
self.assertEqual(connections['1.2.3.4'].qsize(), 2)
|
||||
|
||||
def test_connection_slow_connect(self):
|
||||
with patch('swift.common.memcached.socket') as mock_module:
|
||||
def mock_getaddrinfo(host, port, family=socket.AF_INET,
|
||||
socktype=socket.SOCK_STREAM, proto=0,
|
||||
flags=0):
|
||||
return [(family, socktype, proto, '', (host, port))]
|
||||
|
||||
mock_module.getaddrinfo = mock_getaddrinfo
|
||||
|
||||
# patch socket, stub socket.socket, mock sock
|
||||
mock_sock = mock_module.socket.return_value
|
||||
|
||||
def wait_connect(addr):
|
||||
# slow connect gives Timeout Exception
|
||||
sleep(1)
|
||||
|
||||
# patch connect method
|
||||
mock_sock.connect = wait_connect
|
||||
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211'], connect_timeout=0.1)
|
||||
|
||||
# sanity
|
||||
self.assertEqual(1, len(memcache_client._client_cache))
|
||||
for server, pool in memcache_client._client_cache.items():
|
||||
self.assertEqual(2, pool.max_size)
|
||||
|
||||
# try to get connect and no connection found
|
||||
# so it will result in StopIteration
|
||||
conn_generator = memcache_client._get_conns(b'key')
|
||||
with self.assertRaises(StopIteration):
|
||||
next(conn_generator)
|
||||
|
||||
self.assertEqual(1, mock_sock.close.call_count)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue