Share most of the `run_watchers` code via a common mixin

Change-Id: Ib8e1398c05635d0c698c03760c1bbb547a0747ae
This commit is contained in:
Joshua Harlow 2015-06-12 16:37:46 -07:00
parent ae289f1f97
commit 61e86b0e40
4 changed files with 49 additions and 104 deletions

View File

@ -19,6 +19,7 @@ import collections
from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
import six
from stevedore import driver
@ -344,6 +345,39 @@ class CoordAsyncResult(object):
"""Returns True if the task is done, False otherwise."""
class _RunWatchersMixin(object):
"""Mixin to share the *mostly* common ``run_watchers`` implementation."""
def run_watchers(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w:
known_groups = self.get_groups().get(
timeout=w.leftover(return_none=True))
result = []
for group_id in known_groups:
try:
group_members_fut = self.get_members(group_id)
group_members = group_members_fut.get(
timeout=w.leftover(return_none=True))
except GroupNotCreated:
group_members = set()
else:
group_members = set(group_members)
if (group_id in self._joined_groups and
self._member_id not in group_members):
self._joined_groups.discard(group_id)
old_group_members = self._group_members.get(group_id, set())
for member_id in (group_members - old_group_members):
result.extend(
self._hooks_join_group[group_id].run(
MemberJoinedGroup(group_id, member_id)))
for member_id in (old_group_members - group_members):
result.extend(
self._hooks_leave_group[group_id].run(
MemberLeftGroup(group_id, member_id)))
self._group_members[group_id] = group_members
return result
def get_coordinator(backend_url, member_id, **kwargs):
"""Initialize and load the backend.

View File

@ -107,7 +107,8 @@ class FileLock(locking.Lock):
LOG.warn("Unreleased lock %s garbage collected", self.name)
class FileDriver(coordination.CoordinationDriver):
class FileDriver(coordination.CoordinationDriver,
coordination._RunWatchersMixin):
"""A file based driver.
This driver uses files and directories (and associated file locks) to
@ -398,38 +399,6 @@ class FileDriver(coordination.CoordinationDriver):
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
def run_watchers(self, timeout=None):
w = timeutils.StopWatch(duration=timeout)
w.start()
leftover_timeout = w.leftover(return_none=True)
known_groups = self.get_groups().get(timeout=leftover_timeout)
result = []
for group_id in known_groups:
leftover_timeout = w.leftover(return_none=True)
try:
group_members_fut = self.get_members(group_id)
group_members = group_members_fut.get(timeout=leftover_timeout)
except coordination.GroupNotCreated:
group_members = set()
else:
group_members = set(group_members)
if (group_id in self._joined_groups and
self._member_id not in group_members):
self._joined_groups.discard(group_id)
old_group_members = self._group_members.get(group_id, set())
for member_id in (group_members - old_group_members):
result.extend(
self._hooks_join_group[group_id].run(
coordination.MemberJoinedGroup(group_id,
member_id)))
for member_id in (old_group_members - group_members):
result.extend(
self._hooks_leave_group[group_id].run(
coordination.MemberLeftGroup(group_id,
member_id)))
self._group_members[group_id] = group_members
return result
class FileFutureResult(coordination.CoordAsyncResult):
"""File asynchronous result that references a future."""

View File

@ -20,7 +20,6 @@ import logging
import socket
from concurrent import futures
from oslo_utils import timeutils
from pymemcache import client as pymemcache_client
import six
@ -115,7 +114,8 @@ class MemcachedLock(locking.Lock):
return self.coord.client.get(self.name)
class MemcachedDriver(coordination.CoordinationDriver):
class MemcachedDriver(coordination.CoordinationDriver,
coordination._RunWatchersMixin):
"""A `memcached`_ based driver.
This driver users `memcached`_ concepts to provide the coordination driver
@ -150,7 +150,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
options = utils.collapse(options)
self._options = options
self._member_id = member_id
self._groups = set()
self._joined_groups = set()
self._executor = None
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
@ -204,7 +204,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
for lock in list(self._acquired_locks):
lock.release()
self.client.delete(self._encode_member_id(self._member_id))
for g in list(self._groups):
for g in list(self._joined_groups):
try:
self.leave_group(g).get()
except (coordination.MemberNotJoined,
@ -298,7 +298,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise _retry.Retry
self._groups.add(group_id)
self._joined_groups.add(group_id)
return MemcachedFutureResult(self._executor.submit(_join_group))
@ -317,7 +317,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise _retry.Retry
self._groups.discard(group_id)
self._joined_groups.discard(group_id)
return MemcachedFutureResult(self._executor.submit(_leave_group))
@ -460,37 +460,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
self.leader_timeout)
@_translate_failures
def run_watchers(self, timeout=None):
w = timeutils.StopWatch(duration=timeout)
w.start()
leftover_timeout = w.leftover(return_none=True)
known_groups = self.get_groups().get(timeout=leftover_timeout)
result = []
for group_id in known_groups:
leftover_timeout = w.leftover(return_none=True)
try:
group_members_fut = self.get_members(group_id)
group_members = group_members_fut.get(timeout=leftover_timeout)
except coordination.GroupNotCreated:
group_members = set()
else:
group_members = set(group_members)
old_group_members = self._group_members[group_id]
for member_id in (group_members - old_group_members):
result.extend(
self._hooks_join_group[group_id].run(
coordination.MemberJoinedGroup(group_id,
member_id)))
for member_id in (old_group_members - group_members):
result.extend(
self._hooks_leave_group[group_id].run(
coordination.MemberLeftGroup(group_id,
member_id)))
self._group_members[group_id] = group_members
def _run_leadership(self):
for group_id, hooks in six.iteritems(self._hooks_elected_leader):
# Try to grab the lock, if that fails, that means someone has it
# already.
@ -501,6 +471,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
group_id,
self._member_id))
def run_watchers(self, timeout=None):
result = super(MemcachedDriver, self).run_watchers(timeout=timeout)
self._run_leadership()
return result

View File

@ -98,7 +98,8 @@ class RedisLock(locking.Lock):
self._lock.extend(self._lock.timeout)
class RedisDriver(coordination.CoordinationDriver):
class RedisDriver(coordination.CoordinationDriver,
coordination._RunWatchersMixin):
"""Redis provides a few nice benefits that act as a poormans zookeeper.
It **is** fully functional and implements all of the coordination
@ -677,39 +678,7 @@ class RedisDriver(coordination.CoordinationDriver):
def run_watchers(self, timeout=None):
w = timeutils.StopWatch(duration=timeout)
w.start()
result = []
leftover_timeout = w.leftover(return_none=True)
known_groups = self.get_groups().get(timeout=leftover_timeout)
for group_id in known_groups:
leftover_timeout = w.leftover(return_none=True)
try:
group_members_fut = self.get_members(group_id)
group_members = group_members_fut.get(timeout=leftover_timeout)
except coordination.GroupNotCreated:
group_members = set()
else:
group_members = set(group_members)
# I was booted out...
#
# TODO(harlowja): perhaps we should have a way to notify
# watchers that this has happened (the below mechanism will
# also do this, but it might be better to have a separate
# way when 'self' membership is lost)?
if (group_id in self._joined_groups and
self._member_id not in group_members):
self._joined_groups.discard(group_id)
old_group_members = self._group_members.get(group_id, set())
for member_id in (group_members - old_group_members):
result.extend(
self._hooks_join_group[group_id].run(
coordination.MemberJoinedGroup(group_id,
member_id)))
for member_id in (old_group_members - group_members):
result.extend(
self._hooks_leave_group[group_id].run(
coordination.MemberLeftGroup(group_id,
member_id)))
self._group_members[group_id] = group_members
result = super(RedisDriver, self).run_watchers(timeout=timeout)
self._run_leadership(w)
return result