Allow specifying a kazoo async handler 'kind'
In situations where the built-in (and default) kazoo
threading async handler does not work (which sometimes
appears to happen under eventlet) allow for specifying
a different handler (ie the 'eventlet' one) that should
work better under those scenarios.
Closes-bug: #1512001
Change-Id: Iec5e39928b223a3ffca0b9b5b4d0fd61abaa0f2b
(cherry-picked from commit 9c5cb6fd0b
)
This commit is contained in:
parent
8a01682bb5
commit
4251e1ea72
|
@ -16,3 +16,6 @@ coverage>=3.6
|
|||
psycopg2
|
||||
PyMySQL>=0.6.2 # MIT License
|
||||
sysv-ipc>=0.6.8 # BSD License
|
||||
|
||||
# Ensure that the eventlet executor continues to operate...
|
||||
eventlet!=0.17.0,>=0.16.1
|
|
@ -19,8 +19,12 @@ import copy
|
|||
|
||||
from kazoo import client
|
||||
from kazoo import exceptions
|
||||
from kazoo.handlers import eventlet as eventlet_handler
|
||||
from kazoo.handlers import threading as threading_handler
|
||||
from kazoo.protocol import paths
|
||||
from oslo_utils import strutils
|
||||
import six
|
||||
from six.moves import filter as compat_filter
|
||||
|
||||
from tooz import coordination
|
||||
from tooz.drivers import _retry
|
||||
|
@ -73,6 +77,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(BaseZooKeeperDriver, self).__init__()
|
||||
options = utils.collapse(options, exclude=['hosts'])
|
||||
self._options = options
|
||||
self._member_id = member_id
|
||||
self.timeout = int(options.get('timeout', ['10'])[-1])
|
||||
|
||||
|
@ -291,15 +297,49 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
class KazooDriver(BaseZooKeeperDriver):
|
||||
"""The driver using the Kazoo client against real ZooKeeper servers."""
|
||||
|
||||
HANDLERS = {
|
||||
'eventlet': eventlet_handler.SequentialEventletHandler,
|
||||
'threading': threading_handler.SequentialThreadingHandler,
|
||||
}
|
||||
"""
|
||||
Restricted immutable dict of handler 'kinds' -> handler classes that
|
||||
this driver can accept via 'handler' option key (the expected value for
|
||||
this option is one of the keys in this dictionary).
|
||||
"""
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(KazooDriver, self).__init__(member_id, parsed_url, options)
|
||||
self._coord = self._make_client(parsed_url, options)
|
||||
self._coord = self._make_client(parsed_url, self._options)
|
||||
self._member_id = member_id
|
||||
self._timeout_exception = self._coord.handler.timeout_exception
|
||||
|
||||
@classmethod
|
||||
def _make_client(cls, parsed_url, options):
|
||||
return client.KazooClient(hosts=parsed_url.netloc)
|
||||
def _make_client(self, parsed_url, options):
|
||||
# Creates a kazoo client,
|
||||
# See: https://github.com/python-zk/kazoo/blob/1.3.1/kazoo/client.py
|
||||
# for what options a client takes...
|
||||
maybe_hosts = [parsed_url.netloc] + list(options.get('hosts', []))
|
||||
hosts = list(compat_filter(None, maybe_hosts))
|
||||
if not hosts:
|
||||
hosts = ['localhost:2181']
|
||||
randomize_hosts = options.get('randomize_hosts', True)
|
||||
client_kwargs = {
|
||||
'hosts': ",".join(hosts),
|
||||
'timeout': float(options.get('timeout', self.timeout)),
|
||||
'connection_retry': options.get('connection_retry'),
|
||||
'command_retry': options.get('command_retry'),
|
||||
'randomize_hosts': strutils.bool_from_string(randomize_hosts),
|
||||
}
|
||||
handler_kind = options.get('handler')
|
||||
if handler_kind:
|
||||
try:
|
||||
handler_cls = self.HANDLERS[handler_kind]
|
||||
except KeyError:
|
||||
raise ValueError("Unknown handler '%s' requested"
|
||||
" valid handlers are %s"
|
||||
% (handler_kind,
|
||||
sorted(self.HANDLERS.keys())))
|
||||
client_kwargs['handler'] = handler_cls()
|
||||
return client.KazooClient(**client_kwargs)
|
||||
|
||||
def _watch_group(self, group_id):
|
||||
get_members_req = self.get_members(group_id)
|
||||
|
|
|
@ -58,3 +58,31 @@ def loads(blob, excp_cls=coordination.ToozError):
|
|||
return msgpack.unpackb(blob, encoding='utf-8')
|
||||
except (msgpack.UnpackException, ValueError) as e:
|
||||
raise excp_cls(exception_message(e))
|
||||
|
||||
|
||||
def collapse(config, exclude=None, item_selector=None):
|
||||
"""Collapses config with keys and **list/tuple** values.
|
||||
NOTE(harlowja): The last item/index from the list/tuple value is selected
|
||||
be default as the new value (values that are not lists/tuples are left
|
||||
alone). If the list/tuple value is empty (zero length), then no value
|
||||
is set.
|
||||
"""
|
||||
if not isinstance(config, dict):
|
||||
raise TypeError("Unexpected config type, dict expected")
|
||||
if not config:
|
||||
return {}
|
||||
if exclude is None:
|
||||
exclude = set()
|
||||
if item_selector is None:
|
||||
item_selector = lambda items: items[-1]
|
||||
collapsed = {}
|
||||
for (k, v) in six.iteritems(config):
|
||||
if isinstance(v, (tuple, list)):
|
||||
if k in exclude:
|
||||
collapsed[k] = v
|
||||
else:
|
||||
if len(v):
|
||||
collapsed[k] = item_selector(v)
|
||||
else:
|
||||
collapsed[k] = v
|
||||
return collapsed
|
||||
|
|
Loading…
Reference in New Issue