From 13e23e30877a43a203234e9370e23e720451092e Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 6 Jun 2017 18:28:41 +0200 Subject: [PATCH] Mutualize executor code in a mixin class Change-Id: I364b0ca5a6361a818a084bbcb83c9e7855cb3583 --- tooz/coordination.py | 27 ++++++++++++++++++++++++--- tooz/drivers/consul.py | 2 +- tooz/drivers/etcd.py | 2 +- tooz/drivers/etcd3.py | 2 +- tooz/drivers/etcd3gw.py | 2 +- tooz/drivers/file.py | 12 ++++-------- tooz/drivers/ipc.py | 11 +++-------- tooz/drivers/memcached.py | 22 ++++++++++------------ tooz/drivers/mysql.py | 2 +- tooz/drivers/pgsql.py | 2 +- tooz/drivers/redis.py | 24 ++++++++++++------------ tooz/drivers/zookeeper.py | 2 +- 12 files changed, 60 insertions(+), 50 deletions(-) diff --git a/tooz/coordination.py b/tooz/coordination.py index 1130098..02bb24f 100755 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -241,7 +241,7 @@ class CoordinationDriver(object): enum member(s) that can be used to interogate how this driver works. """ - def __init__(self, member_id): + def __init__(self, member_id, parsed_url, options): super(CoordinationDriver, self).__init__() self._member_id = member_id self._started = False @@ -672,6 +672,26 @@ class CoordinatorResult(CoordAsyncResult): return self._fut.done() +class CoordinationDriverWithExecutor(CoordinationDriver): + + EXCLUDE_OPTIONS = None + + def __init__(self, member_id, parsed_url, options): + self._options = utils.collapse(options, exclude=self.EXCLUDE_OPTIONS) + self._executor = utils.ProxyExecutor.build( + self.__class__.__name__, self._options) + super(CoordinationDriverWithExecutor, self).__init__( + member_id, parsed_url, options) + + def start(self, start_heart=False): + self._executor.start() + super(CoordinationDriverWithExecutor, self).start(start_heart) + + def stop(self): + super(CoordinationDriverWithExecutor, self).stop() + self._executor.stop() + + class CoordinationDriverCachedRunWatchers(CoordinationDriver): """Coordination driver with a `run_watchers` implementation. @@ -681,8 +701,9 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver): """ - def __init__(self, member_id): - super(CoordinationDriverCachedRunWatchers, self).__init__(member_id) + def __init__(self, member_id, parsed_url, options): + super(CoordinationDriverCachedRunWatchers, self).__init__( + member_id, parsed_url, options) # A cache for group members self._group_members = collections.defaultdict(set) self._joined_groups = set() diff --git a/tooz/drivers/consul.py b/tooz/drivers/consul.py index 2f81ec0..f9fd559 100644 --- a/tooz/drivers/consul.py +++ b/tooz/drivers/consul.py @@ -110,7 +110,7 @@ class ConsulDriver(coordination.CoordinationDriver): DEFAULT_PORT = 8500 def __init__(self, member_id, parsed_url, options): - super(ConsulDriver, self).__init__(member_id) + super(ConsulDriver, self).__init__(member_id, parsed_url, options) options = utils.collapse(options) self._host = parsed_url.hostname self._port = parsed_url.port or self.DEFAULT_PORT diff --git a/tooz/drivers/etcd.py b/tooz/drivers/etcd.py index 69b0647..1745d62 100644 --- a/tooz/drivers/etcd.py +++ b/tooz/drivers/etcd.py @@ -216,7 +216,7 @@ class EtcdDriver(coordination.CoordinationDriver): lock_encoder_cls = utils.Base64LockEncoder def __init__(self, member_id, parsed_url, options): - super(EtcdDriver, self).__init__(member_id) + super(EtcdDriver, self).__init__(member_id, parsed_url, options) host = parsed_url.hostname or self.DEFAULT_HOST port = parsed_url.port or self.DEFAULT_PORT options = utils.collapse(options) diff --git a/tooz/drivers/etcd3.py b/tooz/drivers/etcd3.py index 137c0cc..8a747bd 100644 --- a/tooz/drivers/etcd3.py +++ b/tooz/drivers/etcd3.py @@ -141,7 +141,7 @@ class Etcd3Driver(coordination.CoordinationDriver): DEFAULT_PORT = 2379 def __init__(self, member_id, parsed_url, options): - super(Etcd3Driver, self).__init__(member_id) + super(Etcd3Driver, self).__init__(member_id, parsed_url, options) host = parsed_url.hostname or self.DEFAULT_HOST port = parsed_url.port or self.DEFAULT_PORT options = utils.collapse(options) diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py index c73a854..1370f1c 100644 --- a/tooz/drivers/etcd3gw.py +++ b/tooz/drivers/etcd3gw.py @@ -162,7 +162,7 @@ class Etcd3Driver(coordination.CoordinationDriver): DEFAULT_PORT = 2379 def __init__(self, member_id, parsed_url, options): - super(Etcd3Driver, self).__init__(member_id) + super(Etcd3Driver, self).__init__(member_id, parsed_url, options) host = parsed_url.hostname or self.DEFAULT_HOST port = parsed_url.port or self.DEFAULT_PORT options = utils.collapse(options) diff --git a/tooz/drivers/file.py b/tooz/drivers/file.py index a0797b8..aaef57b 100644 --- a/tooz/drivers/file.py +++ b/tooz/drivers/file.py @@ -177,7 +177,8 @@ class FileLock(locking.Lock): LOG.warning("Unreleased lock %s garbage collected", self.name) -class FileDriver(coordination.CoordinationDriverCachedRunWatchers): +class FileDriver(coordination.CoordinationDriverCachedRunWatchers, + coordination.CoordinationDriverWithExecutor): """A file based driver. This driver uses files and directories (and associated file locks) to @@ -219,9 +220,8 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): def __init__(self, member_id, parsed_url, options): """Initialize the file driver.""" - super(FileDriver, self).__init__(member_id) + super(FileDriver, self).__init__(member_id, parsed_url, options) self._dir = self._normalize_path(parsed_url.path) - self._executor = utils.ProxyExecutor.build("File", options) self._group_dir = os.path.join(self._dir, 'groups') self._tmpdir = os.path.join(self._dir, 'tmp') self._driver_lock_path = os.path.join(self._dir, '.driver_lock') @@ -231,7 +231,6 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): self._reserved_paths = list(self._reserved_dirs) self._reserved_paths.append(self._driver_lock_path) self._safe_member_id = self._make_filesystem_safe(member_id) - self._options = utils.collapse(options) self._timeout = int(self._options.get('timeout', 10)) @staticmethod @@ -266,15 +265,12 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): return hashlib.new(cls.HASH_ROUTINE, item).hexdigest() def _start(self): + super(FileDriver, self)._start() for a_dir in self._reserved_dirs: try: fileutils.ensure_tree(a_dir) except OSError as e: raise coordination.ToozConnectionError(e) - self._executor.start() - - def _stop(self): - self._executor.stop() def _update_group_metadata(self, path, group_id): details = { diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index 76b02dc..f5d67a3 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -134,7 +134,7 @@ class IPCLock(locking.Lock): return False -class IPCDriver(coordination.CoordinationDriver): +class IPCDriver(coordination.CoordinationDriverWithExecutor): """A `IPC`_ based driver. This driver uses `IPC`_ concepts to provide the coordination driver @@ -165,21 +165,16 @@ class IPCDriver(coordination.CoordinationDriver): _GROUP_PROJECT = "_TOOZ_INTERNAL" _INTERNAL_LOCK_NAME = "TOOZ_INTERNAL_LOCK" - def __init__(self, member_id, parsed_url, options): - """Initialize the IPC driver.""" - super(IPCDriver, self).__init__(member_id) - self._executor = utils.ProxyExecutor.build("IPC", options) - def _start(self): + super(IPCDriver, self)._start() self._group_list = sysv_ipc.SharedMemory( ftok(self._GROUP_LIST_KEY, self._GROUP_PROJECT), sysv_ipc.IPC_CREAT, size=self._SEGMENT_SIZE) self._lock = self.get_lock(self._INTERNAL_LOCK_NAME) - self._executor.start() def _stop(self): - self._executor.stop() + super(IPCDriver, self)._stop() try: self._group_list.detach() self._group_list.remove() diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index f25f503..f2dd108 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -179,7 +179,8 @@ class MemcachedLock(locking.Lock): return self in self.coord._acquired_locks -class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): +class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers, + coordination.CoordinationDriverWithExecutor): """A `memcached`_ based driver. This driver users `memcached`_ concepts to provide the coordination driver @@ -226,21 +227,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): STILL_ALIVE = b"It's alive!" def __init__(self, member_id, parsed_url, options): - super(MemcachedDriver, self).__init__(member_id) - options = utils.collapse(options) - self._options = options - self._executor = utils.ProxyExecutor.build("Memcached", options) + super(MemcachedDriver, self).__init__(member_id, parsed_url, options) self.host = (parsed_url.hostname or "localhost", parsed_url.port or 11211) - default_timeout = options.get('timeout', self.DEFAULT_TIMEOUT) + default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT) self.timeout = int(default_timeout) - self.membership_timeout = int(options.get( + self.membership_timeout = int(self._options.get( 'membership_timeout', default_timeout)) - self.lock_timeout = int(options.get( + self.lock_timeout = int(self._options.get( 'lock_timeout', default_timeout)) - self.leader_timeout = int(options.get( + self.leader_timeout = int(self._options.get( 'leader_timeout', default_timeout)) - max_pool_size = options.get('max_pool_size', None) + max_pool_size = self._options.get('max_pool_size', None) if max_pool_size is not None: self.max_pool_size = int(max_pool_size) else: @@ -264,6 +262,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): @_translate_failures def _start(self): + super(MemcachedDriver, self)._start() self.client = pymemcache_client.PooledClient( self.host, serializer=self._msgpack_serializer, @@ -274,14 +273,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): # Run heartbeat here because pymemcache use a lazy connection # method and only connect once you do an operation. self.heartbeat() - self._executor.start() @_translate_failures def _stop(self): + super(MemcachedDriver, self)._stop() for lock in list(self._acquired_locks): lock.release() self.client.delete(self._encode_member_id(self._member_id)) - self._executor.stop() self.client.close() def _encode_group_id(self, group_id): diff --git a/tooz/drivers/mysql.py b/tooz/drivers/mysql.py index 72853d7..cbeea8d 100644 --- a/tooz/drivers/mysql.py +++ b/tooz/drivers/mysql.py @@ -130,7 +130,7 @@ class MySQLDriver(coordination.CoordinationDriver): def __init__(self, member_id, parsed_url, options): """Initialize the MySQL driver.""" - super(MySQLDriver, self).__init__(member_id) + super(MySQLDriver, self).__init__(member_id, parsed_url, options) self._parsed_url = parsed_url self._options = utils.collapse(options) diff --git a/tooz/drivers/pgsql.py b/tooz/drivers/pgsql.py index ee92db1..dbcd803 100644 --- a/tooz/drivers/pgsql.py +++ b/tooz/drivers/pgsql.py @@ -189,7 +189,7 @@ class PostgresDriver(coordination.CoordinationDriver): def __init__(self, member_id, parsed_url, options): """Initialize the PostgreSQL driver.""" - super(PostgresDriver, self).__init__(member_id) + super(PostgresDriver, self).__init__(member_id, parsed_url, options) self._parsed_url = parsed_url self._options = utils.collapse(options) diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 06854e9..c2cf824 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -115,7 +115,8 @@ class RedisLock(locking.Lock): return self in self._coord._acquired_locks -class RedisDriver(coordination.CoordinationDriverCachedRunWatchers): +class RedisDriver(coordination.CoordinationDriverCachedRunWatchers, + coordination.CoordinationDriverWithExecutor): """Redis provides a few nice benefits that act as a poormans zookeeper. It **is** fully functional and implements all of the coordination @@ -323,26 +324,25 @@ return 1 .. _Lua: http://www.lua.org """ + EXCLUDE_OPTIONS = CLIENT_LIST_ARGS + def __init__(self, member_id, parsed_url, options): - super(RedisDriver, self).__init__(member_id) - options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS) + super(RedisDriver, self).__init__(member_id, parsed_url, options) self._parsed_url = parsed_url - self._options = options - self._encoding = options.get('encoding', self.DEFAULT_ENCODING) - timeout = options.get('timeout', self.CLIENT_DEFAULT_SOCKET_TO) + self._encoding = self._options.get('encoding', self.DEFAULT_ENCODING) + timeout = self._options.get('timeout', self.CLIENT_DEFAULT_SOCKET_TO) self.timeout = int(timeout) - self.membership_timeout = float(options.get( + self.membership_timeout = float(self._options.get( 'membership_timeout', timeout)) - lock_timeout = options.get('lock_timeout', self.timeout) + lock_timeout = self._options.get('lock_timeout', self.timeout) self.lock_timeout = int(lock_timeout) - namespace = options.get('namespace', self.DEFAULT_NAMESPACE) + namespace = self._options.get('namespace', self.DEFAULT_NAMESPACE) self._namespace = utils.to_binary(namespace, encoding=self._encoding) self._group_prefix = self._namespace + b"_group" self._beat_prefix = self._namespace + b"_beats" self._groups = self._namespace + b"_groups" self._client = None self._acquired_locks = set() - self._executor = utils.ProxyExecutor.build("Redis", options) self._started = False self._server_info = {} self._scripts = {} @@ -429,7 +429,7 @@ return 1 return redis.StrictRedis(**kwargs) def _start(self): - self._executor.start() + super(RedisDriver, self)._start() try: self._client = self._make_client(self._parsed_url, self._options, self.timeout) @@ -522,7 +522,7 @@ return 1 lock.release() except tooz.ToozError: LOG.warning("Unable to release lock '%s'", lock, exc_info=True) - self._executor.stop() + super(RedisDriver, self)._stop() if self._client is not None: # Make sure we no longer exist... beat_id = self._encode_beat_id(self._member_id) diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 97a6bdd..52f151e 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -136,7 +136,7 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers): """ def __init__(self, member_id, parsed_url, options): - super(KazooDriver, self).__init__(member_id) + super(KazooDriver, self).__init__(member_id, parsed_url, options) options = utils.collapse(options, exclude=['hosts']) self.timeout = int(options.get('timeout', '10')) self._namespace = options.get('namespace', self.TOOZ_NAMESPACE)