diff --git a/tooz/coordination.py b/tooz/coordination.py index 4a0ebbc..55ffcce 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -215,7 +215,6 @@ class Heart(object): return self._finished.is_set() -@six.add_metaclass(abc.ABCMeta) class CoordinationDriver(object): requires_beating = False @@ -233,20 +232,19 @@ class CoordinationDriver(object): """ def __init__(self): + super(CoordinationDriver, self).__init__() self._started = False self._hooks_join_group = collections.defaultdict(Hooks) self._hooks_leave_group = collections.defaultdict(Hooks) self._hooks_elected_leader = collections.defaultdict(Hooks) - # A cache for group members - self._group_members = collections.defaultdict(set) self.requires_beating = ( CoordinationDriver.heartbeat != self.__class__.heartbeat ) self.heart = Heart(self) def _has_hooks_for_group(self, group_id): - return (len(self._hooks_join_group[group_id]) + - len(self._hooks_leave_group[group_id])) + return (group_id in self._hooks_join_group or + group_id in self._hooks_leave_group) @staticmethod def run_watchers(timeout=None): @@ -262,7 +260,6 @@ class CoordinationDriver(object): """Try to leader elect this coordinator & activate hooks on success.""" raise tooz.NotImplemented - @abc.abstractmethod def watch_join_group(self, group_id, callback): """Call a function when group_id sees a new member joined. @@ -275,7 +272,6 @@ class CoordinationDriver(object): """ self._hooks_join_group[group_id].append(callback) - @abc.abstractmethod def unwatch_join_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member joined. @@ -284,15 +280,17 @@ class CoordinationDriver(object): this group """ try: + # Check if group_id is in hooks to avoid creating a default empty + # entry in hooks list. + if group_id not in self._hooks_join_group: + raise ValueError self._hooks_join_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) - if (not self._has_hooks_for_group(group_id) and - group_id in self._group_members): - del self._group_members[group_id] + if not self._hooks_join_group[group_id]: + del self._hooks_join_group[group_id] - @abc.abstractmethod def watch_leave_group(self, group_id, callback): """Call a function when group_id sees a new member leaving. @@ -306,7 +304,6 @@ class CoordinationDriver(object): """ self._hooks_leave_group[group_id].append(callback) - @abc.abstractmethod def unwatch_leave_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member leaving. @@ -315,18 +312,17 @@ class CoordinationDriver(object): this group """ try: + # Check if group_id is in hooks to avoid creating a default empty + # entry in hooks list. + if group_id not in self._hooks_leave_group: + raise ValueError self._hooks_leave_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) - if (not self._has_hooks_for_group(group_id) and - group_id in self._group_members): - del self._group_members[group_id] - if not self._hooks_leave_group[group_id]: del self._hooks_leave_group[group_id] - @abc.abstractmethod def watch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. @@ -340,7 +336,6 @@ class CoordinationDriver(object): """ self._hooks_elected_leader[group_id].append(callback) - @abc.abstractmethod def unwatch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. @@ -606,8 +601,36 @@ class CoordAsyncResult(object): """Returns True if the task is done, False otherwise.""" -class _RunWatchersMixin(object): - """Mixin to share the *mostly* common ``run_watchers`` implementation.""" +class CoordinationDriverCachedRunWatchers(CoordinationDriver): + """Coordination driver with a `run_watchers` implementation. + + This implementation of `run_watchers` is based on a cache of the group + members between each run of `run_watchers` that is being updated between + each run. + + """ + + def __init__(self): + super(CoordinationDriverCachedRunWatchers, self).__init__() + # A cache for group members + self._group_members = collections.defaultdict(set) + self._joined_groups = set() + + def unwatch_join_group(self, group_id, callback): + super(CoordinationDriverCachedRunWatchers, self).unwatch_join_group( + group_id, callback) + + if (not self._has_hooks_for_group(group_id) and + group_id in self._group_members): + del self._group_members[group_id] + + def unwatch_leave_group(self, group_id, callback): + super(CoordinationDriverCachedRunWatchers, self).unwatch_leave_group( + group_id, callback) + + if (not self._has_hooks_for_group(group_id) and + group_id in self._group_members): + del self._group_members[group_id] def run_watchers(self, timeout=None): with timeutils.StopWatch(duration=timeout) as w: diff --git a/tooz/drivers/consul.py b/tooz/drivers/consul.py index 051c713..ac69805 100644 --- a/tooz/drivers/consul.py +++ b/tooz/drivers/consul.py @@ -19,7 +19,6 @@ from __future__ import absolute_import import consul from oslo_utils import encodeutils -import tooz from tooz import _retry from tooz import coordination from tooz import locking @@ -160,27 +159,3 @@ class ConsulDriver(coordination.CoordinationDriver): for arg in args: pieces.append(encodeutils.safe_decode(arg)) return u"/".join(pieces) - - @staticmethod - def watch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented diff --git a/tooz/drivers/etcd.py b/tooz/drivers/etcd.py index bdfa35c..34bfa35 100644 --- a/tooz/drivers/etcd.py +++ b/tooz/drivers/etcd.py @@ -21,7 +21,6 @@ from oslo_utils import timeutils import requests import six -import tooz from tooz import coordination from tooz import locking from tooz import utils @@ -227,27 +226,3 @@ class EtcdDriver(coordination.CoordinationDriver): for lock in self._acquired_locks: lock.heartbeat() return self.lock_timeout - - @staticmethod - def watch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented diff --git a/tooz/drivers/file.py b/tooz/drivers/file.py index 63c4e70..803aa76 100644 --- a/tooz/drivers/file.py +++ b/tooz/drivers/file.py @@ -163,8 +163,7 @@ class FileLock(locking.Lock): LOG.warning("Unreleased lock %s garbage collected", self.name) -class FileDriver(coordination._RunWatchersMixin, - coordination.CoordinationDriver): +class FileDriver(coordination.CoordinationDriverCachedRunWatchers): """A file based driver. This driver uses files and directories (and associated file locks) to @@ -493,16 +492,10 @@ class FileDriver(coordination._RunWatchersMixin, self._init_watch_group(group_id) return super(FileDriver, self).watch_join_group(group_id, callback) - def unwatch_join_group(self, group_id, callback): - return super(FileDriver, self).unwatch_join_group(group_id, callback) - def watch_leave_group(self, group_id, callback): self._init_watch_group(group_id) return super(FileDriver, self).watch_leave_group(group_id, callback) - def unwatch_leave_group(self, group_id, callback): - return super(FileDriver, self).unwatch_leave_group(group_id, callback) - @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index 2698be5..d9caf12 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -222,6 +222,16 @@ class IPCDriver(coordination.CoordinationDriver): return IPCFutureResult(self._executor.submit(_delete_group)) + def watch_join_group(self, group_id, callback): + # Check the group exist + self.get_members(group_id).get() + super(IPCDriver, self).watch_join_group(group_id, callback) + + def watch_leave_group(self, group_id, callback): + # Check the group exist + self.get_members(group_id).get() + super(IPCDriver, self).watch_leave_group(group_id, callback) + def _get_groups_handler(self): with self._lock: return self._read_group_list() @@ -234,30 +244,6 @@ class IPCDriver(coordination.CoordinationDriver): def get_lock(name): return IPCLock(name) - @staticmethod - def watch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_join_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_leave_group(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def watch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented - - @staticmethod - def unwatch_elected_as_leader(group_id, callback): - raise tooz.NotImplemented - class IPCFutureResult(coordination.CoordAsyncResult): """IPC asynchronous result that references a future.""" diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 74f5434..0d3a92b 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import errno import logging import socket @@ -170,8 +169,7 @@ class MemcachedLock(locking.Lock): return self in self.coord._acquired_locks -class MemcachedDriver(coordination._RunWatchersMixin, - coordination.CoordinationDriver): +class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): """A `memcached`_ based driver. This driver users `memcached`_ concepts to provide the coordination driver @@ -222,7 +220,6 @@ class MemcachedDriver(coordination._RunWatchersMixin, options = utils.collapse(options) self._options = options self._member_id = member_id - self._joined_groups = set() self._executor = utils.ProxyExecutor.build("Memcached", options) self.host = (parsed_url.hostname or "localhost", parsed_url.port or 11211) @@ -268,7 +265,6 @@ class MemcachedDriver(coordination._RunWatchersMixin, # Run heartbeat here because pymemcache use a lazy connection # method and only connect once you do an operation. self.heartbeat() - self._group_members = collections.defaultdict(set) self._executor.start() @_translate_failures @@ -495,27 +491,11 @@ class MemcachedDriver(coordination._RunWatchersMixin, return super(MemcachedDriver, self).watch_join_group( group_id, callback) - def unwatch_join_group(self, group_id, callback): - return super(MemcachedDriver, self).unwatch_join_group( - group_id, callback) - def watch_leave_group(self, group_id, callback): self._init_watch_group(group_id) return super(MemcachedDriver, self).watch_leave_group( group_id, callback) - def unwatch_leave_group(self, group_id, callback): - return super(MemcachedDriver, self).unwatch_leave_group( - group_id, callback) - - def watch_elected_as_leader(self, group_id, callback): - return super(MemcachedDriver, self).watch_elected_as_leader( - group_id, callback) - - def unwatch_elected_as_leader(self, group_id, callback): - return super(MemcachedDriver, self).unwatch_elected_as_leader( - group_id, callback) - def get_lock(self, name): return MemcachedLock(self, name, self.lock_timeout) diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index c7f6e14..814b833 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -106,8 +106,7 @@ class RedisLock(locking.Lock): return self in self._coord._acquired_locks -class RedisDriver(coordination._RunWatchersMixin, - coordination.CoordinationDriver): +class RedisDriver(coordination.CoordinationDriverCachedRunWatchers): """Redis provides a few nice benefits that act as a poormans zookeeper. It **is** fully functional and implements all of the coordination @@ -730,24 +729,10 @@ return 1 self._init_watch_group(group_id) return super(RedisDriver, self).watch_join_group(group_id, callback) - def unwatch_join_group(self, group_id, callback): - return super(RedisDriver, self).unwatch_join_group(group_id, callback) - def watch_leave_group(self, group_id, callback): self._init_watch_group(group_id) return super(RedisDriver, self).watch_leave_group(group_id, callback) - def unwatch_leave_group(self, group_id, callback): - return super(RedisDriver, self).unwatch_leave_group(group_id, callback) - - def watch_elected_as_leader(self, group_id, callback): - return super(RedisDriver, self).watch_elected_as_leader( - group_id, callback) - - def unwatch_elected_as_leader(self, group_id, callback): - return super(RedisDriver, self).unwatch_elected_as_leader( - group_id, callback) - def _get_leader_lock(self, group_id): name = self._encode_group_leader(group_id) return self.get_lock(name) diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 144cbc3..00a4f05 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -75,22 +75,73 @@ class ZooKeeperLock(locking.Lock): return self._lock.is_acquired -class BaseZooKeeperDriver(coordination.CoordinationDriver): - """Initialize the zookeeper driver. +class KazooDriver(coordination.CoordinationDriverCachedRunWatchers): + """This driver uses the `kazoo`_ client against real `zookeeper`_ servers. - :param timeout: connection timeout to wait when first connecting to the - zookeeper server + It **is** fully functional and implements all of the coordination + driver API(s). It stores data into `zookeeper`_ using znodes + and `msgpack`_ encoded values. + + To configure the client to your liking a subset of the options defined at + http://kazoo.readthedocs.org/en/latest/api/client.html + will be extracted from the coordinator url (or any provided options), + so that a specific coordinator can be created that will work for you. + + Currently the following options will be proxied to the contained client: + + ================ =============================== ==================== + Name Source Default + ================ =============================== ==================== + hosts url netloc + 'hosts' option key localhost:2181 + timeout 'timeout' options key 10.0 (kazoo default) + connection_retry 'connection_retry' options key None + command_retry 'command_retry' options key None + randomize_hosts 'randomize_hosts' options key True + ================ =============================== ==================== + + .. _kazoo: http://kazoo.readthedocs.org/ + .. _zookeeper: http://zookeeper.apache.org/ + .. _msgpack: http://msgpack.org/ """ #: Default namespace when none is provided. TOOZ_NAMESPACE = b"tooz" + HANDLERS = { + 'threading': threading_handler.SequentialThreadingHandler, + } + + if eventlet_handler: + HANDLERS['eventlet'] = eventlet_handler.SequentialEventletHandler + + """ + Restricted immutable dict of handler 'kinds' -> handler classes that + this driver can accept via 'handler' option key (the expected value for + this option is one of the keys in this dictionary). + """ + + CHARACTERISTICS = ( + coordination.Characteristics.NON_TIMEOUT_BASED, + coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, + coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, + coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS, + # Writes *always* go through a single leader process, but it may + # take a while for those writes to propagate to followers (and = + # during this time clients can read older values)... + coordination.Characteristics.SEQUENTIAL, + ) + """ + Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable + enum member(s) that can be used to interogate how this driver works. + """ + def __init__(self, member_id, parsed_url, options): - super(BaseZooKeeperDriver, self).__init__() + super(KazooDriver, self).__init__() options = utils.collapse(options, exclude=['hosts']) - self._options = options self._member_id = member_id self.timeout = int(options.get('timeout', '10')) self._namespace = options.get('namespace', self.TOOZ_NAMESPACE) + self._coord = self._make_client(parsed_url, options) + self._timeout_exception = self._coord.handler.timeout_exception def _start(self): try: @@ -107,7 +158,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): coordination.raise_with_cause(coordination.ToozError, "Operational error: %s" % e_msg, cause=e) - self._group_members = collections.defaultdict(set) self._watchers = collections.deque() self._leader_locks = {} @@ -397,70 +447,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): cleaned_args.append(arg) return paths.join(*cleaned_args) - -class KazooDriver(BaseZooKeeperDriver): - """This driver uses the `kazoo`_ client against real `zookeeper`_ servers. - - It **is** fully functional and implements all of the coordination - driver API(s). It stores data into `zookeeper`_ using znodes - and `msgpack`_ encoded values. - - To configure the client to your liking a subset of the options defined at - http://kazoo.readthedocs.org/en/latest/api/client.html - will be extracted from the coordinator url (or any provided options), - so that a specific coordinator can be created that will work for you. - - Currently the following options will be proxied to the contained client: - - ================ =============================== ==================== - Name Source Default - ================ =============================== ==================== - hosts url netloc + 'hosts' option key localhost:2181 - timeout 'timeout' options key 10.0 (kazoo default) - connection_retry 'connection_retry' options key None - command_retry 'command_retry' options key None - randomize_hosts 'randomize_hosts' options key True - ================ =============================== ==================== - - .. _kazoo: http://kazoo.readthedocs.org/ - .. _zookeeper: http://zookeeper.apache.org/ - .. _msgpack: http://msgpack.org/ - """ - - HANDLERS = { - 'threading': threading_handler.SequentialThreadingHandler, - } - - if eventlet_handler: - HANDLERS['eventlet'] = eventlet_handler.SequentialEventletHandler - - """ - Restricted immutable dict of handler 'kinds' -> handler classes that - this driver can accept via 'handler' option key (the expected value for - this option is one of the keys in this dictionary). - """ - - CHARACTERISTICS = ( - coordination.Characteristics.NON_TIMEOUT_BASED, - coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, - coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, - coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS, - # Writes *always* go through a single leader process, but it may - # take a while for those writes to propagate to followers (and = - # during this time clients can read older values)... - coordination.Characteristics.SEQUENTIAL, - ) - """ - Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable - enum member(s) that can be used to interogate how this driver works. - """ - - def __init__(self, member_id, parsed_url, options): - super(KazooDriver, self).__init__(member_id, parsed_url, options) - self._coord = self._make_client(parsed_url, self._options) - self._member_id = member_id - self._timeout_exception = self._coord.handler.timeout_exception - def _make_client(self, parsed_url, options): # Creates a kazoo client, # See: https://github.com/python-zk/kazoo/blob/2.2.1/kazoo/client.py @@ -537,8 +523,7 @@ class KazooDriver(BaseZooKeeperDriver): # Add the hook before starting watching to avoid race conditions # as the watching executor can be in a thread - super(BaseZooKeeperDriver, self).watch_join_group( - group_id, callback) + super(KazooDriver, self).watch_join_group(group_id, callback) if not already_being_watched: try: @@ -548,10 +533,6 @@ class KazooDriver(BaseZooKeeperDriver): self.unwatch_join_group(group_id, callback) raise - def unwatch_join_group(self, group_id, callback): - return super(BaseZooKeeperDriver, self).unwatch_join_group( - group_id, callback) - def watch_leave_group(self, group_id, callback): # Check if we already have hooks for this group_id, if not, start # watching it. @@ -559,8 +540,7 @@ class KazooDriver(BaseZooKeeperDriver): # Add the hook before starting watching to avoid race conditions # as the watching executor can be in a thread - super(BaseZooKeeperDriver, self).watch_leave_group( - group_id, callback) + super(KazooDriver, self).watch_leave_group(group_id, callback) if not already_being_watched: try: @@ -570,18 +550,6 @@ class KazooDriver(BaseZooKeeperDriver): self.unwatch_leave_group(group_id, callback) raise - def unwatch_leave_group(self, group_id, callback): - return super(BaseZooKeeperDriver, self).unwatch_leave_group( - group_id, callback) - - def watch_elected_as_leader(self, group_id, callback): - return super(BaseZooKeeperDriver, self).watch_elected_as_leader( - group_id, callback) - - def unwatch_elected_as_leader(self, group_id, callback): - return super(BaseZooKeeperDriver, self).unwatch_elected_as_leader( - group_id, callback) - def stand_down_group_leader(self, group_id): if group_id in self._leader_locks: self._leader_locks[group_id].release() diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 2d44474..fa59819 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -552,7 +552,7 @@ class TestAPI(tests.TestCaseSkipNotImplemented): # client now no longer believes its part of the group. if (hasattr(self._coord, '_joined_groups') and (self._coord.run_watchers - == tooz.coordination._RunWatchersMixin.run_watchers)): + == tooz.coordination.CoordinationDriverCachedRunWatchers.run_watchers)): # noqa self.assertIn(self.group_id, self._coord._joined_groups) self._coord.run_watchers() self.assertNotIn(self.group_id, self._coord._joined_groups) diff --git a/tooz/tests/test_memcache.py b/tooz/tests/test_memcache.py index 787a96b..07bb815 100644 --- a/tooz/tests/test_memcache.py +++ b/tooz/tests/test_memcache.py @@ -70,8 +70,9 @@ class TestMemcacheDriverFailures(testcase.TestCase): mock_client.set.side_effect = socket.timeout('timed-out') self.assertRaises(coordination.ToozConnectionError, coord.heartbeat) - @mock.patch('tooz.coordination._RunWatchersMixin.run_watchers', - autospec=True) + @mock.patch( + 'tooz.coordination.CoordinationDriverCachedRunWatchers.run_watchers', + autospec=True) @mock.patch('pymemcache.client.PooledClient') def test_client_run_watchers_mixin(self, mock_client_cls, mock_run_watchers):