Mutualize executor code in a mixin class

Change-Id: I364b0ca5a6361a818a084bbcb83c9e7855cb3583
This commit is contained in:
Julien Danjou 2017-06-06 18:28:41 +02:00
parent 413527c181
commit 13e23e3087
12 changed files with 60 additions and 50 deletions

View File

@ -241,7 +241,7 @@ class CoordinationDriver(object):
enum member(s) that can be used to interogate how this driver works. enum member(s) that can be used to interogate how this driver works.
""" """
def __init__(self, member_id): def __init__(self, member_id, parsed_url, options):
super(CoordinationDriver, self).__init__() super(CoordinationDriver, self).__init__()
self._member_id = member_id self._member_id = member_id
self._started = False self._started = False
@ -672,6 +672,26 @@ class CoordinatorResult(CoordAsyncResult):
return self._fut.done() return self._fut.done()
class CoordinationDriverWithExecutor(CoordinationDriver):
EXCLUDE_OPTIONS = None
def __init__(self, member_id, parsed_url, options):
self._options = utils.collapse(options, exclude=self.EXCLUDE_OPTIONS)
self._executor = utils.ProxyExecutor.build(
self.__class__.__name__, self._options)
super(CoordinationDriverWithExecutor, self).__init__(
member_id, parsed_url, options)
def start(self, start_heart=False):
self._executor.start()
super(CoordinationDriverWithExecutor, self).start(start_heart)
def stop(self):
super(CoordinationDriverWithExecutor, self).stop()
self._executor.stop()
class CoordinationDriverCachedRunWatchers(CoordinationDriver): class CoordinationDriverCachedRunWatchers(CoordinationDriver):
"""Coordination driver with a `run_watchers` implementation. """Coordination driver with a `run_watchers` implementation.
@ -681,8 +701,9 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver):
""" """
def __init__(self, member_id): def __init__(self, member_id, parsed_url, options):
super(CoordinationDriverCachedRunWatchers, self).__init__(member_id) super(CoordinationDriverCachedRunWatchers, self).__init__(
member_id, parsed_url, options)
# A cache for group members # A cache for group members
self._group_members = collections.defaultdict(set) self._group_members = collections.defaultdict(set)
self._joined_groups = set() self._joined_groups = set()

View File

@ -110,7 +110,7 @@ class ConsulDriver(coordination.CoordinationDriver):
DEFAULT_PORT = 8500 DEFAULT_PORT = 8500
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(ConsulDriver, self).__init__(member_id) super(ConsulDriver, self).__init__(member_id, parsed_url, options)
options = utils.collapse(options) options = utils.collapse(options)
self._host = parsed_url.hostname self._host = parsed_url.hostname
self._port = parsed_url.port or self.DEFAULT_PORT self._port = parsed_url.port or self.DEFAULT_PORT

View File

@ -216,7 +216,7 @@ class EtcdDriver(coordination.CoordinationDriver):
lock_encoder_cls = utils.Base64LockEncoder lock_encoder_cls = utils.Base64LockEncoder
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(EtcdDriver, self).__init__(member_id) super(EtcdDriver, self).__init__(member_id, parsed_url, options)
host = parsed_url.hostname or self.DEFAULT_HOST host = parsed_url.hostname or self.DEFAULT_HOST
port = parsed_url.port or self.DEFAULT_PORT port = parsed_url.port or self.DEFAULT_PORT
options = utils.collapse(options) options = utils.collapse(options)

View File

@ -141,7 +141,7 @@ class Etcd3Driver(coordination.CoordinationDriver):
DEFAULT_PORT = 2379 DEFAULT_PORT = 2379
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(Etcd3Driver, self).__init__(member_id) super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
host = parsed_url.hostname or self.DEFAULT_HOST host = parsed_url.hostname or self.DEFAULT_HOST
port = parsed_url.port or self.DEFAULT_PORT port = parsed_url.port or self.DEFAULT_PORT
options = utils.collapse(options) options = utils.collapse(options)

View File

@ -162,7 +162,7 @@ class Etcd3Driver(coordination.CoordinationDriver):
DEFAULT_PORT = 2379 DEFAULT_PORT = 2379
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(Etcd3Driver, self).__init__(member_id) super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
host = parsed_url.hostname or self.DEFAULT_HOST host = parsed_url.hostname or self.DEFAULT_HOST
port = parsed_url.port or self.DEFAULT_PORT port = parsed_url.port or self.DEFAULT_PORT
options = utils.collapse(options) options = utils.collapse(options)

View File

@ -177,7 +177,8 @@ class FileLock(locking.Lock):
LOG.warning("Unreleased lock %s garbage collected", self.name) LOG.warning("Unreleased lock %s garbage collected", self.name)
class FileDriver(coordination.CoordinationDriverCachedRunWatchers): class FileDriver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""A file based driver. """A file based driver.
This driver uses files and directories (and associated file locks) to This driver uses files and directories (and associated file locks) to
@ -219,9 +220,8 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
"""Initialize the file driver.""" """Initialize the file driver."""
super(FileDriver, self).__init__(member_id) super(FileDriver, self).__init__(member_id, parsed_url, options)
self._dir = self._normalize_path(parsed_url.path) self._dir = self._normalize_path(parsed_url.path)
self._executor = utils.ProxyExecutor.build("File", options)
self._group_dir = os.path.join(self._dir, 'groups') self._group_dir = os.path.join(self._dir, 'groups')
self._tmpdir = os.path.join(self._dir, 'tmp') self._tmpdir = os.path.join(self._dir, 'tmp')
self._driver_lock_path = os.path.join(self._dir, '.driver_lock') self._driver_lock_path = os.path.join(self._dir, '.driver_lock')
@ -231,7 +231,6 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
self._reserved_paths = list(self._reserved_dirs) self._reserved_paths = list(self._reserved_dirs)
self._reserved_paths.append(self._driver_lock_path) self._reserved_paths.append(self._driver_lock_path)
self._safe_member_id = self._make_filesystem_safe(member_id) self._safe_member_id = self._make_filesystem_safe(member_id)
self._options = utils.collapse(options)
self._timeout = int(self._options.get('timeout', 10)) self._timeout = int(self._options.get('timeout', 10))
@staticmethod @staticmethod
@ -266,15 +265,12 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
return hashlib.new(cls.HASH_ROUTINE, item).hexdigest() return hashlib.new(cls.HASH_ROUTINE, item).hexdigest()
def _start(self): def _start(self):
super(FileDriver, self)._start()
for a_dir in self._reserved_dirs: for a_dir in self._reserved_dirs:
try: try:
fileutils.ensure_tree(a_dir) fileutils.ensure_tree(a_dir)
except OSError as e: except OSError as e:
raise coordination.ToozConnectionError(e) raise coordination.ToozConnectionError(e)
self._executor.start()
def _stop(self):
self._executor.stop()
def _update_group_metadata(self, path, group_id): def _update_group_metadata(self, path, group_id):
details = { details = {

View File

@ -134,7 +134,7 @@ class IPCLock(locking.Lock):
return False return False
class IPCDriver(coordination.CoordinationDriver): class IPCDriver(coordination.CoordinationDriverWithExecutor):
"""A `IPC`_ based driver. """A `IPC`_ based driver.
This driver uses `IPC`_ concepts to provide the coordination driver This driver uses `IPC`_ concepts to provide the coordination driver
@ -165,21 +165,16 @@ class IPCDriver(coordination.CoordinationDriver):
_GROUP_PROJECT = "_TOOZ_INTERNAL" _GROUP_PROJECT = "_TOOZ_INTERNAL"
_INTERNAL_LOCK_NAME = "TOOZ_INTERNAL_LOCK" _INTERNAL_LOCK_NAME = "TOOZ_INTERNAL_LOCK"
def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver."""
super(IPCDriver, self).__init__(member_id)
self._executor = utils.ProxyExecutor.build("IPC", options)
def _start(self): def _start(self):
super(IPCDriver, self)._start()
self._group_list = sysv_ipc.SharedMemory( self._group_list = sysv_ipc.SharedMemory(
ftok(self._GROUP_LIST_KEY, self._GROUP_PROJECT), ftok(self._GROUP_LIST_KEY, self._GROUP_PROJECT),
sysv_ipc.IPC_CREAT, sysv_ipc.IPC_CREAT,
size=self._SEGMENT_SIZE) size=self._SEGMENT_SIZE)
self._lock = self.get_lock(self._INTERNAL_LOCK_NAME) self._lock = self.get_lock(self._INTERNAL_LOCK_NAME)
self._executor.start()
def _stop(self): def _stop(self):
self._executor.stop() super(IPCDriver, self)._stop()
try: try:
self._group_list.detach() self._group_list.detach()
self._group_list.remove() self._group_list.remove()

View File

@ -179,7 +179,8 @@ class MemcachedLock(locking.Lock):
return self in self.coord._acquired_locks return self in self.coord._acquired_locks
class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""A `memcached`_ based driver. """A `memcached`_ based driver.
This driver users `memcached`_ concepts to provide the coordination driver This driver users `memcached`_ concepts to provide the coordination driver
@ -226,21 +227,18 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers):
STILL_ALIVE = b"It's alive!" STILL_ALIVE = b"It's alive!"
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(MemcachedDriver, self).__init__(member_id) super(MemcachedDriver, self).__init__(member_id, parsed_url, options)
options = utils.collapse(options)
self._options = options
self._executor = utils.ProxyExecutor.build("Memcached", options)
self.host = (parsed_url.hostname or "localhost", self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211) parsed_url.port or 11211)
default_timeout = options.get('timeout', self.DEFAULT_TIMEOUT) default_timeout = self._options.get('timeout', self.DEFAULT_TIMEOUT)
self.timeout = int(default_timeout) self.timeout = int(default_timeout)
self.membership_timeout = int(options.get( self.membership_timeout = int(self._options.get(
'membership_timeout', default_timeout)) 'membership_timeout', default_timeout))
self.lock_timeout = int(options.get( self.lock_timeout = int(self._options.get(
'lock_timeout', default_timeout)) 'lock_timeout', default_timeout))
self.leader_timeout = int(options.get( self.leader_timeout = int(self._options.get(
'leader_timeout', default_timeout)) 'leader_timeout', default_timeout))
max_pool_size = options.get('max_pool_size', None) max_pool_size = self._options.get('max_pool_size', None)
if max_pool_size is not None: if max_pool_size is not None:
self.max_pool_size = int(max_pool_size) self.max_pool_size = int(max_pool_size)
else: else:
@ -264,6 +262,7 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers):
@_translate_failures @_translate_failures
def _start(self): def _start(self):
super(MemcachedDriver, self)._start()
self.client = pymemcache_client.PooledClient( self.client = pymemcache_client.PooledClient(
self.host, self.host,
serializer=self._msgpack_serializer, serializer=self._msgpack_serializer,
@ -274,14 +273,13 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers):
# Run heartbeat here because pymemcache use a lazy connection # Run heartbeat here because pymemcache use a lazy connection
# method and only connect once you do an operation. # method and only connect once you do an operation.
self.heartbeat() self.heartbeat()
self._executor.start()
@_translate_failures @_translate_failures
def _stop(self): def _stop(self):
super(MemcachedDriver, self)._stop()
for lock in list(self._acquired_locks): for lock in list(self._acquired_locks):
lock.release() lock.release()
self.client.delete(self._encode_member_id(self._member_id)) self.client.delete(self._encode_member_id(self._member_id))
self._executor.stop()
self.client.close() self.client.close()
def _encode_group_id(self, group_id): def _encode_group_id(self, group_id):

View File

@ -130,7 +130,7 @@ class MySQLDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
"""Initialize the MySQL driver.""" """Initialize the MySQL driver."""
super(MySQLDriver, self).__init__(member_id) super(MySQLDriver, self).__init__(member_id, parsed_url, options)
self._parsed_url = parsed_url self._parsed_url = parsed_url
self._options = utils.collapse(options) self._options = utils.collapse(options)

View File

@ -189,7 +189,7 @@ class PostgresDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
"""Initialize the PostgreSQL driver.""" """Initialize the PostgreSQL driver."""
super(PostgresDriver, self).__init__(member_id) super(PostgresDriver, self).__init__(member_id, parsed_url, options)
self._parsed_url = parsed_url self._parsed_url = parsed_url
self._options = utils.collapse(options) self._options = utils.collapse(options)

View File

@ -115,7 +115,8 @@ class RedisLock(locking.Lock):
return self in self._coord._acquired_locks return self in self._coord._acquired_locks
class RedisDriver(coordination.CoordinationDriverCachedRunWatchers): class RedisDriver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""Redis provides a few nice benefits that act as a poormans zookeeper. """Redis provides a few nice benefits that act as a poormans zookeeper.
It **is** fully functional and implements all of the coordination It **is** fully functional and implements all of the coordination
@ -323,26 +324,25 @@ return 1
.. _Lua: http://www.lua.org .. _Lua: http://www.lua.org
""" """
EXCLUDE_OPTIONS = CLIENT_LIST_ARGS
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(RedisDriver, self).__init__(member_id) super(RedisDriver, self).__init__(member_id, parsed_url, options)
options = utils.collapse(options, exclude=self.CLIENT_LIST_ARGS)
self._parsed_url = parsed_url self._parsed_url = parsed_url
self._options = options self._encoding = self._options.get('encoding', self.DEFAULT_ENCODING)
self._encoding = options.get('encoding', self.DEFAULT_ENCODING) timeout = self._options.get('timeout', self.CLIENT_DEFAULT_SOCKET_TO)
timeout = options.get('timeout', self.CLIENT_DEFAULT_SOCKET_TO)
self.timeout = int(timeout) self.timeout = int(timeout)
self.membership_timeout = float(options.get( self.membership_timeout = float(self._options.get(
'membership_timeout', timeout)) 'membership_timeout', timeout))
lock_timeout = options.get('lock_timeout', self.timeout) lock_timeout = self._options.get('lock_timeout', self.timeout)
self.lock_timeout = int(lock_timeout) self.lock_timeout = int(lock_timeout)
namespace = options.get('namespace', self.DEFAULT_NAMESPACE) namespace = self._options.get('namespace', self.DEFAULT_NAMESPACE)
self._namespace = utils.to_binary(namespace, encoding=self._encoding) self._namespace = utils.to_binary(namespace, encoding=self._encoding)
self._group_prefix = self._namespace + b"_group" self._group_prefix = self._namespace + b"_group"
self._beat_prefix = self._namespace + b"_beats" self._beat_prefix = self._namespace + b"_beats"
self._groups = self._namespace + b"_groups" self._groups = self._namespace + b"_groups"
self._client = None self._client = None
self._acquired_locks = set() self._acquired_locks = set()
self._executor = utils.ProxyExecutor.build("Redis", options)
self._started = False self._started = False
self._server_info = {} self._server_info = {}
self._scripts = {} self._scripts = {}
@ -429,7 +429,7 @@ return 1
return redis.StrictRedis(**kwargs) return redis.StrictRedis(**kwargs)
def _start(self): def _start(self):
self._executor.start() super(RedisDriver, self)._start()
try: try:
self._client = self._make_client(self._parsed_url, self._options, self._client = self._make_client(self._parsed_url, self._options,
self.timeout) self.timeout)
@ -522,7 +522,7 @@ return 1
lock.release() lock.release()
except tooz.ToozError: except tooz.ToozError:
LOG.warning("Unable to release lock '%s'", lock, exc_info=True) LOG.warning("Unable to release lock '%s'", lock, exc_info=True)
self._executor.stop() super(RedisDriver, self)._stop()
if self._client is not None: if self._client is not None:
# Make sure we no longer exist... # Make sure we no longer exist...
beat_id = self._encode_beat_id(self._member_id) beat_id = self._encode_beat_id(self._member_id)

View File

@ -136,7 +136,7 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers):
""" """
def __init__(self, member_id, parsed_url, options): def __init__(self, member_id, parsed_url, options):
super(KazooDriver, self).__init__(member_id) super(KazooDriver, self).__init__(member_id, parsed_url, options)
options = utils.collapse(options, exclude=['hosts']) options = utils.collapse(options, exclude=['hosts'])
self.timeout = int(options.get('timeout', '10')) self.timeout = int(options.get('timeout', '10'))
self._namespace = options.get('namespace', self.TOOZ_NAMESPACE) self._namespace = options.get('namespace', self.TOOZ_NAMESPACE)