Do not re-set the members cache for watchers by default
The current code fills the group membership cache by default – without any check for data already been there - which can lead to a race condition in the following situation: coord1.watch_group_join(x, cb1) # Cache is filled: contains {} coord2.join_group(x) coord1.watch_group_join(x, cb2) # Cache is re-filled: contains {coord2} coord1.run_watchers() # Calls nothing since no new join is seen whereas # it should call cb1 Change-Id: I7376c9f320a37bb44282b52fa5b6fa3f1f86784b
This commit is contained in:
parent
3574b1310b
commit
5f905c86c9
|
@ -627,6 +627,16 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver):
|
|||
self._group_members = collections.defaultdict(set)
|
||||
self._joined_groups = set()
|
||||
|
||||
def _init_watch_group(self, group_id):
|
||||
members = self.get_members(group_id).get()
|
||||
if group_id not in self._group_members:
|
||||
self._group_members[group_id] = members
|
||||
|
||||
def watch_join_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
super(CoordinationDriverCachedRunWatchers, self).watch_join_group(
|
||||
group_id, callback)
|
||||
|
||||
def unwatch_join_group(self, group_id, callback):
|
||||
super(CoordinationDriverCachedRunWatchers, self).unwatch_join_group(
|
||||
group_id, callback)
|
||||
|
@ -635,6 +645,11 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver):
|
|||
group_id in self._group_members):
|
||||
del self._group_members[group_id]
|
||||
|
||||
def watch_leave_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
super(CoordinationDriverCachedRunWatchers, self).watch_leave_group(
|
||||
group_id, callback)
|
||||
|
||||
def unwatch_leave_group(self, group_id, callback):
|
||||
super(CoordinationDriverCachedRunWatchers, self).unwatch_leave_group(
|
||||
group_id, callback)
|
||||
|
|
|
@ -482,19 +482,6 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
fut = self._executor.submit(_do_get_groups)
|
||||
return FileFutureResult(fut)
|
||||
|
||||
def _init_watch_group(self, group_id):
|
||||
group_members_fut = self.get_members(group_id)
|
||||
group_members = group_members_fut.get(timeout=None)
|
||||
self._group_members[group_id].update(group_members)
|
||||
|
||||
def watch_join_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(FileDriver, self).watch_join_group(group_id, callback)
|
||||
|
||||
def watch_leave_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(FileDriver, self).watch_leave_group(group_id, callback)
|
||||
|
||||
@staticmethod
|
||||
def watch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
|
|
@ -476,25 +476,6 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
self.leader_timeout,
|
||||
self.lock_timeout)
|
||||
|
||||
@_translate_failures
|
||||
def _init_watch_group(self, group_id):
|
||||
members = self.client.get(self._encode_group_id(group_id))
|
||||
if members is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
# Initialize with the current group member list
|
||||
if group_id not in self._group_members:
|
||||
self._group_members[group_id] = set(members.keys())
|
||||
|
||||
def watch_join_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(MemcachedDriver, self).watch_join_group(
|
||||
group_id, callback)
|
||||
|
||||
def watch_leave_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(MemcachedDriver, self).watch_leave_group(
|
||||
group_id, callback)
|
||||
|
||||
def get_lock(self, name):
|
||||
return MemcachedLock(self, name, self.lock_timeout)
|
||||
|
||||
|
|
|
@ -720,18 +720,6 @@ return 1
|
|||
|
||||
return RedisFutureResult(self._submit(_get_groups))
|
||||
|
||||
def _init_watch_group(self, group_id):
|
||||
members = self.get_members(group_id)
|
||||
self._group_members[group_id].update(members.get(timeout=None))
|
||||
|
||||
def watch_join_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(RedisDriver, self).watch_join_group(group_id, callback)
|
||||
|
||||
def watch_leave_group(self, group_id, callback):
|
||||
self._init_watch_group(group_id)
|
||||
return super(RedisDriver, self).watch_leave_group(group_id, callback)
|
||||
|
||||
def _get_leader_lock(self, group_id):
|
||||
name = self._encode_group_leader(group_id)
|
||||
return self.get_lock(name)
|
||||
|
|
|
@ -475,7 +475,6 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
return client.KazooClient(**client_kwargs)
|
||||
|
||||
def _watch_group(self, group_id):
|
||||
get_members_req = self.get_members(group_id)
|
||||
|
||||
def on_children_change(children):
|
||||
# If we don't have any hook, stop watching
|
||||
|
@ -506,9 +505,6 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
|
||||
self._group_members[group_id] = children
|
||||
|
||||
# Initialize the current member list
|
||||
self._group_members[group_id] = get_members_req.get()
|
||||
|
||||
try:
|
||||
self._coord.ChildrenWatch(self._path_group(group_id),
|
||||
on_children_change)
|
||||
|
|
|
@ -387,6 +387,10 @@ class TestAPI(tests.TestCaseSkipNotImplemented):
|
|||
# we get an event.
|
||||
self._coord.watch_leave_group(self.group_id, self._set_event)
|
||||
|
||||
# Run watchers to be sure we initialize the member cache and we *know*
|
||||
# client2 is a member now
|
||||
self._coord.run_watchers()
|
||||
|
||||
time.sleep(3)
|
||||
self._coord.heartbeat()
|
||||
time.sleep(3)
|
||||
|
|
Loading…
Reference in New Issue