Ensure that we correctly expire (and cleanup) redis members

When a member has not heartbeated in a given amount of time
it should no longer be present in any of its associated groups,
to accomplish this we need to have a heartbeat key that each
member uses and have the group membership code fetch, check,
and clean members (if they are no longer alive).

Fixes bug 1386684

Change-Id: I89a8343ea9317c0522a6132ea065905b16b27b0d
This commit is contained in:
Joshua Harlow 2014-10-29 12:27:15 -07:00
parent 3a470225f9
commit 0e193c2ae1
3 changed files with 64 additions and 6 deletions

View File

@ -27,6 +27,8 @@ import redis
from redis import exceptions
from redis import lock as redis_locks
import six
from six.moves import map as compat_map
from six.moves import zip as compat_zip
import tooz
from tooz import coordination
@ -199,12 +201,15 @@ class RedisDriver(coordination.CoordinationDriver):
self._encoding = encoding[-1]
timeout = options.get('timeout', [self._CLIENT_DEFAULT_SOCKET_TO])
self.timeout = int(timeout[-1])
self.membership_timeout = int(options.get(
'membership_timeout', timeout)[-1])
lock_timeout = options.get('lock_timeout', [self.timeout])
self.lock_timeout = int(lock_timeout[-1])
namespace = options.get('namespace', ['_tooz'])[-1]
self._namespace = self._to_binary(namespace)
self._group_prefix = self._namespace + b"_group"
self._leader_prefix = self._namespace + b"_leader"
self._beat_prefix = self._namespace + b"_beats"
self._groups = self._namespace + b"_groups"
self._client = None
self._member_id = self._to_binary(member_id)
@ -298,6 +303,10 @@ class RedisDriver(coordination.CoordinationDriver):
self.heartbeat()
self._started = True
def _encode_beat_id(self, member_id):
return self._NAMESPACE_SEP.join([self._beat_prefix,
self._to_binary(member_id)])
def _encode_member_id(self, member_id):
member_id = self._to_binary(member_id)
if member_id == self._GROUP_EXISTS:
@ -318,7 +327,9 @@ class RedisDriver(coordination.CoordinationDriver):
def heartbeat(self):
with _translate_failures():
self._client.ping()
self._client.setex(self._encode_beat_id(self._member_id),
time=self.membership_timeout,
value=b"Not dead!")
for lock in self._acquired_locks:
try:
lock.heartbeat()
@ -344,6 +355,16 @@ class RedisDriver(coordination.CoordinationDriver):
self._executor.shutdown(wait=True)
self._executor = None
if self._client is not None:
# Make sure we no longer exist...
beat_id = self._encode_beat_id(self._member_id)
try:
# NOTE(harlowja): this will delete nothing if the key doesn't
# exist in the first place, which is fine/expected/desired...
with _translate_failures():
self._client.delete(beat_id)
except coordination.ToozError:
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
exc_info=True)
self._client = None
self._server_info = {}
self._started = False
@ -415,12 +436,30 @@ class RedisDriver(coordination.CoordinationDriver):
def _get_members(p):
if not p.exists(encoded_group):
raise coordination.GroupNotCreated(group_id)
members = []
potential_members = []
for m in p.hkeys(encoded_group):
m = self._decode_member_id(m)
if m != self._GROUP_EXISTS:
members.append(m)
return members
potential_members.append(m)
if not potential_members:
return []
# Ok now we need to see which members have passed away...
gone_members = set()
member_values = p.mget(compat_map(self._encode_beat_id,
potential_members))
for (potential_member, value) in compat_zip(potential_members,
member_values):
# Always preserve self (just incase we haven't heartbeated
# while this call/s was being made...), this does *not* prevent
# another client from removing this though...
if potential_member == self._member_id:
continue
if not value:
gone_members.add(potential_member)
# Trash all the members that no longer are with us... RIP...
for m in gone_members:
p.hdel(encoded_group, self._encode_member_id(m))
return [m for m in potential_members if m not in gone_members]
return RedisFutureResult(self._submit(self._client.transaction,
_get_members, encoded_group,

View File

@ -35,6 +35,13 @@ class TestAPI(testscenarios.TestWithScenarios,
('redis', {'url': 'redis://localhost:6379?timeout=5'}),
]
# Only certain drivers have the tested support for timeouts that we test
# here, these are the lists of driver types that do support our test type.
timeout_capable = [
'memcached://',
'redis://',
]
def assertRaisesAny(self, exc_classes, callable_obj, *args, **kwargs):
checkers = [matchers.MatchesException(exc_class)
for exc_class in exc_classes]
@ -42,6 +49,16 @@ class TestAPI(testscenarios.TestWithScenarios,
callable_obj = testcase.Nullary(callable_obj, *args, **kwargs)
self.assertThat(callable_obj, matcher)
def skipIfNotSupported(self, supported):
applicable = False
for prefix in supported:
if self.url.startswith(prefix):
applicable = True
break
if not applicable:
self.skipTest("This test only works with %s types for now"
% list(supported))
def setUp(self):
super(TestAPI, self).setUp()
self.group_id = self._get_random_uuid()
@ -214,8 +231,7 @@ class TestAPI(testscenarios.TestWithScenarios,
self.assertTrue(member_id_test2 not in members_ids)
def test_timeout(self):
if not self.url.startswith('memcached://'):
self.skipTest("This test only works with memcached for now")
self.skipIfNotSupported(self.timeout_capable)
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)

View File

@ -50,3 +50,6 @@ commands =
ignore = H405,E126
exclude=.venv,.git,.tox,dist,*egg,*.egg-info,build,examples,doc
show-source = True
[hacking]
import_exceptions = six.moves