close socket on exceptions

Change-Id: I0dee7c109d32e6325845df9ba6e1fbf23a2d1b89
This commit is contained in:
David Goetz 2013-05-07 09:40:53 -07:00
parent d754b59cf8
commit f85cf2b827
2 changed files with 31 additions and 8 deletions

View File

@ -88,13 +88,26 @@ class MemcacheRing(object):
self._allow_pickle = allow_pickle
self._allow_unpickle = allow_unpickle or allow_pickle
def _exception_occurred(self, server, e, action='talking'):
def _exception_occurred(self, server, e, action='talking',
sock=None, fp=None):
if isinstance(e, socket.timeout):
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
{'action': action, 'server': server})
else:
logging.exception(_("Error %(action)s to memcached: %(server)s"),
{'action': action, 'server': server})
try:
if fp:
fp.close()
del fp
except Exception:
pass
try:
if sock:
sock.close()
del sock
except Exception:
pass
now = time.time()
self._errors[server].append(time.time())
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
@ -119,6 +132,7 @@ class MemcacheRing(object):
served.append(server)
if self._error_limited[server] > time.time():
continue
sock = None
try:
fp, sock = self._client_cache[server].pop()
yield server, fp, sock
@ -136,7 +150,8 @@ class MemcacheRing(object):
sock.settimeout(self._io_timeout)
yield server, sock.makefile(), sock
except Exception, e:
self._exception_occurred(server, e, 'connecting')
self._exception_occurred(
server, e, action='connecting', sock=sock)
def _return_conn(self, server, fp, sock):
""" Returns a server connection to the pool """
@ -182,7 +197,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)
def get(self, key):
"""
@ -215,7 +230,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return value
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)
def incr(self, key, delta=1, time=0, timeout=0):
"""
@ -267,7 +282,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return ret
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)
raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, time=0, timeout=0):
@ -304,7 +319,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)
def set_multi(self, mapping, server_key, serialize=True, timeout=0,
time=0, min_compress_len=0):
@ -352,7 +367,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)
def get_multi(self, keys, server_key):
"""
@ -393,4 +408,4 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return values
except Exception, e:
self._exception_occurred(server, e)
self._exception_occurred(server, e, sock=sock, fp=fp)

View File

@ -42,6 +42,8 @@ class ExplodingMockMemcached(object):
self.exploded = True
raise socket.error()
def close(self):
pass
class MockMemcached(object):
@ -52,6 +54,7 @@ class MockMemcached(object):
self.down = False
self.exc_on_delete = False
self.read_return_none = False
self.close_called = False
def sendall(self, string):
if self.down:
@ -130,6 +133,10 @@ class MockMemcached(object):
self.outbuf = self.outbuf[size:]
return response
def close(self):
self.close_called = True
pass
class TestMemcached(unittest.TestCase):
""" Tests for swift.common.memcached"""
@ -206,6 +213,7 @@ class TestMemcached(unittest.TestCase):
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15)
self.assertTrue(mock.close_called)
def test_incr_w_timeout(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])