[zmq] Maintain several redis hosts
This patch makes it possible to maintain several redis hosts at once in order to increase driver's reliability and fault tolerance. Change-Id: Id6f63a4bb67a39340a74d16144c79028c7af245d
This commit is contained in:
parent
cb13e65bed
commit
cb3af2167f
@ -102,7 +102,7 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
|
|
||||||
self.matchmaker = driver.DriverManager(
|
self.matchmaker = driver.DriverManager(
|
||||||
'oslo.messaging.zmq.matchmaker',
|
'oslo.messaging.zmq.matchmaker',
|
||||||
self.get_matchmaker_backend(url),
|
self.get_matchmaker_backend(self.conf, url),
|
||||||
).driver(self.conf, url=url)
|
).driver(self.conf, url=url)
|
||||||
|
|
||||||
client_cls = zmq_client.ZmqClientProxy
|
client_cls = zmq_client.ZmqClientProxy
|
||||||
@ -124,12 +124,13 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||||
allowed_remote_exmods)
|
allowed_remote_exmods)
|
||||||
|
|
||||||
def get_matchmaker_backend(self, url):
|
@staticmethod
|
||||||
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
|
def get_matchmaker_backend(conf, url):
|
||||||
|
zmq_transport, _, matchmaker_backend = url.transport.partition('+')
|
||||||
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
|
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
|
||||||
if not matchmaker_backend:
|
if not matchmaker_backend:
|
||||||
return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
|
return conf.oslo_messaging_zmq.rpc_zmq_matchmaker
|
||||||
elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
|
if matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
|
||||||
raise rpc_common.RPCException(
|
raise rpc_common.RPCException(
|
||||||
_LE("Incorrect matchmaker backend name %(backend_name)s!"
|
_LE("Incorrect matchmaker backend name %(backend_name)s!"
|
||||||
"Available names are: %(available_names)s") %
|
"Available names are: %(available_names)s") %
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# Copyright 2016 Mirantis, Inc.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@ -11,15 +12,21 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import functools
|
||||||
import logging
|
import logging
|
||||||
from retrying import retry
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_utils import importutils
|
from oslo_utils import importutils
|
||||||
|
from retrying import retry
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._i18n import _LW, _LE
|
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||||
|
from oslo_messaging._i18n import _LE, _LI, _LW
|
||||||
|
|
||||||
redis = importutils.try_import('redis')
|
redis = importutils.try_import('redis')
|
||||||
redis_sentinel = importutils.try_import('redis.sentinel')
|
redis_sentinel = importutils.try_import('redis.sentinel')
|
||||||
@ -67,20 +74,46 @@ _PUBLISHERS_KEY = "PUBLISHERS"
|
|||||||
_ROUTERS_KEY = "ROUTERS"
|
_ROUTERS_KEY = "ROUTERS"
|
||||||
|
|
||||||
|
|
||||||
def redis_connection_warn(func):
|
def write_to_redis_connection_warn(func):
|
||||||
def func_wrapper(*args, **kwargs):
|
@functools.wraps(func)
|
||||||
try:
|
def func_wrapper(self, *args, **kwargs):
|
||||||
return func(*args, **kwargs)
|
# try to perform a write operation to all available hosts
|
||||||
except redis.ConnectionError:
|
success = False
|
||||||
LOG.warning(_LW("Redis is currently not available. "
|
for redis_instance in self._redis_instances:
|
||||||
"Messages are being sent to known targets using "
|
if not redis_instance._is_available:
|
||||||
"existing connections. But new nodes "
|
continue
|
||||||
"can not be discovered until Redis is up "
|
try:
|
||||||
"and running."))
|
func(self, redis_instance, *args, **kwargs)
|
||||||
|
success = True
|
||||||
|
except redis.ConnectionError:
|
||||||
|
LOG.warning(_LW("Redis host %s is not available now."),
|
||||||
|
redis_instance._address)
|
||||||
|
redis_instance._is_available = False
|
||||||
|
redis_instance._ready_from = float("inf")
|
||||||
|
if not success:
|
||||||
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
||||||
return func_wrapper
|
return func_wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def read_from_redis_connection_warn(func):
|
||||||
|
@functools.wraps(func)
|
||||||
|
def func_wrapper(self, *args, **kwargs):
|
||||||
|
# try to perform a read operation from any available and ready host
|
||||||
|
for redis_instance in self._redis_instances:
|
||||||
|
if not redis_instance._is_available \
|
||||||
|
or redis_instance._ready_from > time.time():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
return func(self, redis_instance, *args, **kwargs)
|
||||||
|
except redis.ConnectionError:
|
||||||
|
LOG.warning(_LW("Redis host %s is not available now."),
|
||||||
|
redis_instance._address)
|
||||||
|
redis_instance._is_available = False
|
||||||
|
redis_instance._ready_from = float("inf")
|
||||||
|
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
||||||
|
return func_wrapper
|
||||||
|
|
||||||
|
|
||||||
def no_reraise(func):
|
def no_reraise(func):
|
||||||
def func_wrapper(*args, **kwargs):
|
def func_wrapper(*args, **kwargs):
|
||||||
try:
|
try:
|
||||||
@ -107,131 +140,84 @@ def retry_if_empty(hosts):
|
|||||||
return not hosts
|
return not hosts
|
||||||
|
|
||||||
|
|
||||||
class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
|
||||||
|
|
||||||
def __init__(self, conf, *args, **kwargs):
|
def __init__(self, conf, *args, **kwargs):
|
||||||
super(MatchmakerRedis, self).__init__(conf, *args, **kwargs)
|
|
||||||
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
|
|
||||||
if redis is None:
|
if redis is None:
|
||||||
raise ImportError(_LE("Redis package is not available!"))
|
raise ImportError(_LE("Redis package is not available!"))
|
||||||
|
|
||||||
self.sentinel_hosts = self._extract_sentinel_options()
|
super(MatchmakerRedisBase, self).__init__(conf, *args, **kwargs)
|
||||||
if not self.sentinel_hosts:
|
|
||||||
self.standalone_redis = self._extract_standalone_redis_options()
|
|
||||||
self._redis = redis.StrictRedis(
|
|
||||||
host=self.standalone_redis["host"],
|
|
||||||
port=self.standalone_redis["port"],
|
|
||||||
password=self.standalone_redis["password"]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
|
|
||||||
sentinel = redis.sentinel.Sentinel(
|
|
||||||
sentinels=self.sentinel_hosts,
|
|
||||||
socket_timeout=socket_timeout
|
|
||||||
)
|
|
||||||
|
|
||||||
self._redis = sentinel.master_for(
|
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
|
||||||
self.conf.matchmaker_redis.sentinel_group_name,
|
|
||||||
socket_timeout=socket_timeout
|
|
||||||
)
|
|
||||||
|
|
||||||
def _extract_sentinel_options(self):
|
@abc.abstractmethod
|
||||||
if self.url and self.url.hosts:
|
def _sadd(self, key, value, expire):
|
||||||
if len(self.url.hosts) > 1:
|
pass
|
||||||
return [(host.hostname, host.port) for host in self.url.hosts]
|
|
||||||
elif self.conf.matchmaker_redis.sentinel_hosts:
|
|
||||||
s = self.conf.matchmaker_redis.sentinel_hosts
|
|
||||||
return [tuple(i.split(":")) for i in s]
|
|
||||||
|
|
||||||
def _extract_standalone_redis_options(self):
|
@abc.abstractmethod
|
||||||
if self.url and self.url.hosts:
|
def _srem(self, key, value):
|
||||||
redis_host = self.url.hosts[0]
|
pass
|
||||||
return {"host": redis_host.hostname,
|
|
||||||
"port": redis_host.port,
|
|
||||||
"password": redis_host.password}
|
|
||||||
else:
|
|
||||||
return {"host": self.conf.matchmaker_redis.host,
|
|
||||||
"port": self.conf.matchmaker_redis.port,
|
|
||||||
"password": self.conf.matchmaker_redis.password}
|
|
||||||
|
|
||||||
def _add_key_with_expire(self, key, value, expire):
|
@abc.abstractmethod
|
||||||
self._redis.sadd(key, value)
|
def _smembers(self, key):
|
||||||
if expire > 0:
|
pass
|
||||||
self._redis.expire(key, expire)
|
|
||||||
|
|
||||||
@no_reraise
|
@no_reraise
|
||||||
@redis_connection_warn
|
|
||||||
def register_publisher(self, hostname, expire=-1):
|
def register_publisher(self, hostname, expire=-1):
|
||||||
host_str = ",".join(hostname)
|
self._sadd(_PUBLISHERS_KEY, ','.join(hostname), expire)
|
||||||
self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
|
|
||||||
|
|
||||||
@no_reraise
|
@no_reraise
|
||||||
@redis_connection_warn
|
|
||||||
def unregister_publisher(self, hostname):
|
def unregister_publisher(self, hostname):
|
||||||
host_str = ",".join(hostname)
|
self._srem(_PUBLISHERS_KEY, ','.join(hostname))
|
||||||
self._redis.srem(_PUBLISHERS_KEY, host_str)
|
|
||||||
|
|
||||||
@empty_list_on_error
|
@empty_list_on_error
|
||||||
@redis_connection_warn
|
|
||||||
def get_publishers(self):
|
def get_publishers(self):
|
||||||
hosts = []
|
return [tuple(hostname.split(',')) for hostname
|
||||||
hosts.extend([tuple(host_str.split(","))
|
in self._smembers(_PUBLISHERS_KEY)]
|
||||||
for host_str in
|
|
||||||
self._get_hosts_by_key(_PUBLISHERS_KEY)])
|
|
||||||
return hosts
|
|
||||||
|
|
||||||
@no_reraise
|
@no_reraise
|
||||||
@redis_connection_warn
|
|
||||||
def register_router(self, hostname, expire=-1):
|
def register_router(self, hostname, expire=-1):
|
||||||
self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
|
self._sadd(_ROUTERS_KEY, hostname, expire)
|
||||||
|
|
||||||
@no_reraise
|
@no_reraise
|
||||||
@redis_connection_warn
|
|
||||||
def unregister_router(self, hostname):
|
def unregister_router(self, hostname):
|
||||||
self._redis.srem(_ROUTERS_KEY, hostname)
|
self._srem(_ROUTERS_KEY, hostname)
|
||||||
|
|
||||||
@empty_list_on_error
|
@empty_list_on_error
|
||||||
@redis_connection_warn
|
|
||||||
def get_routers(self):
|
def get_routers(self):
|
||||||
return self._get_hosts_by_key(_ROUTERS_KEY)
|
return self._smembers(_ROUTERS_KEY)
|
||||||
|
|
||||||
@redis_connection_warn
|
|
||||||
def get_hosts_by_key(self, key):
|
def get_hosts_by_key(self, key):
|
||||||
return self._get_hosts_by_key(key)
|
return self._smembers(key)
|
||||||
|
|
||||||
def _get_hosts_by_key(self, key):
|
|
||||||
return self._redis.smembers(key)
|
|
||||||
|
|
||||||
@redis_connection_warn
|
|
||||||
def register(self, target, hostname, listener_type, expire=-1):
|
def register(self, target, hostname, listener_type, expire=-1):
|
||||||
if target.server:
|
if target.server:
|
||||||
key = zmq_address.target_to_key(target, listener_type)
|
key = zmq_address.target_to_key(target, listener_type)
|
||||||
self._add_key_with_expire(key, hostname, expire)
|
self._sadd(key, hostname, expire)
|
||||||
|
|
||||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||||
self._add_key_with_expire(key, hostname, expire)
|
self._sadd(key, hostname, expire)
|
||||||
|
|
||||||
@no_reraise
|
@no_reraise
|
||||||
@redis_connection_warn
|
|
||||||
def unregister(self, target, hostname, listener_type):
|
def unregister(self, target, hostname, listener_type):
|
||||||
if target.server:
|
if target.server:
|
||||||
key = zmq_address.target_to_key(target, listener_type)
|
key = zmq_address.target_to_key(target, listener_type)
|
||||||
self._redis.srem(key, hostname)
|
self._srem(key, hostname)
|
||||||
|
|
||||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||||
self._redis.srem(key, hostname)
|
self._srem(key, hostname)
|
||||||
|
|
||||||
@redis_connection_warn
|
|
||||||
def get_hosts(self, target, listener_type):
|
def get_hosts(self, target, listener_type):
|
||||||
hosts = []
|
hosts = []
|
||||||
|
|
||||||
if target.server:
|
if target.server:
|
||||||
key = zmq_address.target_to_key(target, listener_type)
|
key = zmq_address.target_to_key(target, listener_type)
|
||||||
hosts.extend(self._get_hosts_by_key(key))
|
hosts.extend(self._smembers(key))
|
||||||
|
|
||||||
if not hosts:
|
if not hosts:
|
||||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||||
hosts.extend(self._get_hosts_by_key(key))
|
hosts.extend(self._smembers(key))
|
||||||
|
|
||||||
LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s",
|
LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s",
|
||||||
{"target": target, "hosts": hosts})
|
{"target": target, "hosts": hosts})
|
||||||
@ -241,10 +227,9 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
|
|||||||
def get_hosts_retry(self, target, listener_type):
|
def get_hosts_retry(self, target, listener_type):
|
||||||
return self._retry_method(target, listener_type, self.get_hosts)
|
return self._retry_method(target, listener_type, self.get_hosts)
|
||||||
|
|
||||||
@redis_connection_warn
|
|
||||||
def get_hosts_fanout(self, target, listener_type):
|
def get_hosts_fanout(self, target, listener_type):
|
||||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||||
hosts = list(self._get_hosts_by_key(key))
|
hosts = list(self._smembers(key))
|
||||||
|
|
||||||
LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
|
LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
|
||||||
{"target": target, "hosts": hosts})
|
{"target": target, "hosts": hosts})
|
||||||
@ -262,3 +247,154 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
|
|||||||
def _get_hosts_retry(target, listener_type):
|
def _get_hosts_retry(target, listener_type):
|
||||||
return method(target, listener_type)
|
return method(target, listener_type)
|
||||||
return _get_hosts_retry(target, listener_type)
|
return _get_hosts_retry(target, listener_type)
|
||||||
|
|
||||||
|
|
||||||
|
class MatchmakerRedis(MatchmakerRedisBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, *args, **kwargs):
|
||||||
|
super(MatchmakerRedis, self).__init__(conf, *args, **kwargs)
|
||||||
|
|
||||||
|
self._redis_hosts = self._extract_redis_hosts()
|
||||||
|
|
||||||
|
self._redis_instances = [
|
||||||
|
redis.StrictRedis(host=redis_host["host"],
|
||||||
|
port=redis_host["port"],
|
||||||
|
password=redis_host["password"])
|
||||||
|
for redis_host in self._redis_hosts
|
||||||
|
]
|
||||||
|
|
||||||
|
for redis_host, redis_instance \
|
||||||
|
in six.moves.zip(self._redis_hosts, self._redis_instances):
|
||||||
|
address = "{host}:{port}".format(host=redis_host["host"],
|
||||||
|
port=redis_host["port"])
|
||||||
|
redis_instance._address = address
|
||||||
|
is_available = self._check_availability(redis_instance)
|
||||||
|
if is_available:
|
||||||
|
redis_instance._is_available = True
|
||||||
|
redis_instance._ready_from = time.time()
|
||||||
|
else:
|
||||||
|
LOG.warning(_LW("Redis host %s is not available now."),
|
||||||
|
address)
|
||||||
|
redis_instance._is_available = False
|
||||||
|
redis_instance._ready_from = float("inf")
|
||||||
|
|
||||||
|
# NOTE(gdavoian): store instances in a random order
|
||||||
|
# (for the sake of load balancing)
|
||||||
|
random.shuffle(self._redis_instances)
|
||||||
|
|
||||||
|
self._availability_updater = \
|
||||||
|
MatchmakerRedisAvailabilityUpdater(self.conf, self)
|
||||||
|
|
||||||
|
def _extract_redis_hosts(self):
|
||||||
|
if self.url and self.url.hosts:
|
||||||
|
return [{"host": redis_host.hostname,
|
||||||
|
"port": redis_host.port,
|
||||||
|
"password": redis_host.password}
|
||||||
|
for redis_host in self.url.hosts]
|
||||||
|
else:
|
||||||
|
# FIXME(gdavoian): remove the code below along with the
|
||||||
|
# corresponding deprecated options in the next release
|
||||||
|
return [{"host": self.conf.matchmaker_redis.host,
|
||||||
|
"port": self.conf.matchmaker_redis.port,
|
||||||
|
"password": self.conf.matchmaker_redis.password}]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _check_availability(redis_instance):
|
||||||
|
try:
|
||||||
|
redis_instance.ping()
|
||||||
|
return True
|
||||||
|
except redis.ConnectionError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@write_to_redis_connection_warn
|
||||||
|
def _sadd(self, redis_instance, key, value, expire):
|
||||||
|
redis_instance.sadd(key, value)
|
||||||
|
if expire > 0:
|
||||||
|
redis_instance.expire(key, expire)
|
||||||
|
|
||||||
|
@write_to_redis_connection_warn
|
||||||
|
def _srem(self, redis_instance, key, value):
|
||||||
|
redis_instance.srem(key, value)
|
||||||
|
|
||||||
|
@read_from_redis_connection_warn
|
||||||
|
def _smembers(self, redis_instance, key):
|
||||||
|
return redis_instance.smembers(key)
|
||||||
|
|
||||||
|
|
||||||
|
class MatchmakerRedisAvailabilityUpdater(zmq_updater.UpdaterBase):
|
||||||
|
|
||||||
|
_MIN_SLEEP_FOR = 10
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker):
|
||||||
|
super(MatchmakerRedisAvailabilityUpdater, self).__init__(
|
||||||
|
conf, matchmaker, self._update_availability,
|
||||||
|
sleep_for=conf.oslo_messaging_zmq.zmq_target_update
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_availability(self):
|
||||||
|
fraction_of_available_instances = 0
|
||||||
|
for redis_instance in self.matchmaker._redis_instances:
|
||||||
|
if not redis_instance._is_available:
|
||||||
|
is_available = \
|
||||||
|
self.matchmaker._check_availability(redis_instance)
|
||||||
|
if is_available:
|
||||||
|
LOG.info(_LI("Redis host %s is available again."),
|
||||||
|
redis_instance._address)
|
||||||
|
fraction_of_available_instances += 1
|
||||||
|
# NOTE(gdavoian): mark an instance as available for
|
||||||
|
# writing to, but wait until all services register
|
||||||
|
# themselves in it for making the instance ready for
|
||||||
|
# reading from
|
||||||
|
redis_instance._is_available = True
|
||||||
|
redis_instance._ready_from = time.time() + \
|
||||||
|
self.conf.oslo_messaging_zmq.zmq_target_update
|
||||||
|
else:
|
||||||
|
fraction_of_available_instances += 1
|
||||||
|
fraction_of_available_instances /= \
|
||||||
|
float(len(self.matchmaker._redis_instances))
|
||||||
|
# NOTE(gdavoian): make the sleep time proportional to the number of
|
||||||
|
# currently available instances
|
||||||
|
self._sleep_for = max(self.conf.oslo_messaging_zmq.zmq_target_update *
|
||||||
|
fraction_of_available_instances,
|
||||||
|
self._MIN_SLEEP_FOR)
|
||||||
|
|
||||||
|
|
||||||
|
class MatchmakerSentinel(MatchmakerRedisBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, *args, **kwargs):
|
||||||
|
super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs)
|
||||||
|
|
||||||
|
self._sentinel_hosts = self._extract_sentinel_hosts()
|
||||||
|
|
||||||
|
socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
|
||||||
|
|
||||||
|
sentinel = redis_sentinel.Sentinel(
|
||||||
|
sentinels=self._sentinel_hosts,
|
||||||
|
socket_timeout=socket_timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
self._redis_instance = sentinel.master_for(
|
||||||
|
self.conf.matchmaker_redis.sentinel_group_name,
|
||||||
|
socket_timeout=socket_timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
def _extract_sentinel_hosts(self):
|
||||||
|
if self.url and self.url.hosts:
|
||||||
|
return [(sentinel_host.hostname, sentinel_host.port)
|
||||||
|
for sentinel_host in self.url.hosts]
|
||||||
|
elif self.conf.matchmaker_redis.sentinel_hosts:
|
||||||
|
return [tuple(sentinel_host.split(':')) for sentinel_host
|
||||||
|
in self.conf.matchmaker_redis.sentinel_hosts]
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _sadd(self, key, value, expire):
|
||||||
|
self._redis_instance.sadd(key, value)
|
||||||
|
if expire > 0:
|
||||||
|
self._redis_instance.expire(key, expire)
|
||||||
|
|
||||||
|
def _srem(self, key, value):
|
||||||
|
self._redis_instance.srem(key, value)
|
||||||
|
|
||||||
|
def _smembers(self, key):
|
||||||
|
return self._redis_instance.smembers(key)
|
||||||
|
@ -19,9 +19,11 @@ import socket
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import impl_zmq
|
||||||
from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
|
from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._i18n import _LI
|
from oslo_messaging._i18n import _LI
|
||||||
|
from oslo_messaging import transport
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -51,7 +53,10 @@ zmq_proxy_opts = [
|
|||||||
cfg.BoolOpt('ack_pub_sub', default=False,
|
cfg.BoolOpt('ack_pub_sub', default=False,
|
||||||
help='Use acknowledgements for notifying senders about '
|
help='Use acknowledgements for notifying senders about '
|
||||||
'receiving their fanout messages. '
|
'receiving their fanout messages. '
|
||||||
'The option is ignored if PUB/SUB is disabled.')
|
'The option is ignored if PUB/SUB is disabled.'),
|
||||||
|
|
||||||
|
cfg.StrOpt('url', default='zmq://127.0.0.1:6379/',
|
||||||
|
help='ZMQ-driver transport URL with additional configurations')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -79,6 +84,8 @@ def parse_command_line_args(conf):
|
|||||||
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
|
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
|
||||||
action='store_true',
|
action='store_true',
|
||||||
help='Acknowledge PUB/SUB messages')
|
help='Acknowledge PUB/SUB messages')
|
||||||
|
parser.add_argument('-u', '--url', dest='url', type=str,
|
||||||
|
help='Transport URL with configurations')
|
||||||
|
|
||||||
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
|
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
|
||||||
help='Turn on DEBUG logging level instead of INFO')
|
help='Turn on DEBUG logging level instead of INFO')
|
||||||
@ -108,6 +115,8 @@ def parse_command_line_args(conf):
|
|||||||
if args.ack_pub_sub:
|
if args.ack_pub_sub:
|
||||||
conf.set_override('ack_pub_sub', args.ack_pub_sub,
|
conf.set_override('ack_pub_sub', args.ack_pub_sub,
|
||||||
group='zmq_proxy_opts')
|
group='zmq_proxy_opts')
|
||||||
|
if args.url:
|
||||||
|
conf.set_override('url', args.url, group='zmq_proxy_opts')
|
||||||
|
|
||||||
|
|
||||||
class ZmqProxy(object):
|
class ZmqProxy(object):
|
||||||
@ -152,10 +161,13 @@ class ZmqProxy(object):
|
|||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(ZmqProxy, self).__init__()
|
super(ZmqProxy, self).__init__()
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
url = transport.TransportURL.parse(
|
||||||
|
self.conf, url=self.conf.zmq_proxy_opts.url
|
||||||
|
)
|
||||||
self.matchmaker = driver.DriverManager(
|
self.matchmaker = driver.DriverManager(
|
||||||
'oslo.messaging.zmq.matchmaker',
|
'oslo.messaging.zmq.matchmaker',
|
||||||
self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
|
impl_zmq.ZmqDriver.get_matchmaker_backend(self.conf, url)
|
||||||
).driver(self.conf)
|
).driver(self.conf, url=url)
|
||||||
self.context = zmq.Context()
|
self.context = zmq.Context()
|
||||||
self.proxy = self._choose_proxy_implementation()
|
self.proxy = self._choose_proxy_implementation()
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ from oslo_messaging._drivers import base
|
|||||||
from oslo_messaging import server
|
from oslo_messaging import server
|
||||||
|
|
||||||
|
|
||||||
MATCHMAKER_BACKENDS = ('redis', 'dummy')
|
MATCHMAKER_BACKENDS = ('redis', 'sentinel', 'dummy')
|
||||||
MATCHMAKER_DEFAULT = 'redis'
|
MATCHMAKER_DEFAULT = 'redis'
|
||||||
|
|
||||||
|
|
||||||
|
@ -12,12 +12,9 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from fixtures._fixtures import timeout
|
|
||||||
import inspect
|
import inspect
|
||||||
import retrying
|
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
import testscenarios
|
import testscenarios
|
||||||
import testtools
|
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
@ -31,8 +28,7 @@ def redis_available():
|
|||||||
if not redis:
|
if not redis:
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
c = redis.StrictRedis(socket_timeout=1)
|
redis.StrictRedis(socket_timeout=1).ping()
|
||||||
c.ping()
|
|
||||||
return True
|
return True
|
||||||
except redis.exceptions.ConnectionError:
|
except redis.exceptions.ConnectionError:
|
||||||
return False
|
return False
|
||||||
@ -41,7 +37,6 @@ def redis_available():
|
|||||||
load_tests = testscenarios.load_tests_apply_scenarios
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
|
|
||||||
|
|
||||||
@testtools.skipIf(not redis_available(), "redis unavailable")
|
|
||||||
class TestImplMatchmaker(test_utils.BaseTestCase):
|
class TestImplMatchmaker(test_utils.BaseTestCase):
|
||||||
|
|
||||||
scenarios = [
|
scenarios = [
|
||||||
@ -52,13 +47,18 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestImplMatchmaker, self).setUp()
|
super(TestImplMatchmaker, self).setUp()
|
||||||
|
|
||||||
|
if self.rpc_zmq_matchmaker == "redis":
|
||||||
|
if not redis_available():
|
||||||
|
self.skipTest("redis unavailable")
|
||||||
|
|
||||||
self.test_matcher = driver.DriverManager(
|
self.test_matcher = driver.DriverManager(
|
||||||
'oslo.messaging.zmq.matchmaker',
|
'oslo.messaging.zmq.matchmaker',
|
||||||
self.rpc_zmq_matchmaker,
|
self.rpc_zmq_matchmaker,
|
||||||
).driver(self.conf)
|
).driver(self.conf)
|
||||||
|
|
||||||
if self.rpc_zmq_matchmaker == "redis":
|
if self.rpc_zmq_matchmaker == "redis":
|
||||||
self.addCleanup(self.test_matcher._redis.flushdb)
|
for redis_instance in self.test_matcher._redis_instances:
|
||||||
|
self.addCleanup(redis_instance.flushdb)
|
||||||
|
|
||||||
self.target = oslo_messaging.Target(topic="test_topic")
|
self.target = oslo_messaging.Target(topic="test_topic")
|
||||||
self.host1 = b"test_host1"
|
self.host1 = b"test_host1"
|
||||||
@ -77,7 +77,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
|||||||
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
|
||||||
[self.host1, self.host2])
|
[self.host1, self.host2])
|
||||||
|
|
||||||
def test_register_unsibscribe(self):
|
def test_register_unregister(self):
|
||||||
self.test_matcher.register(self.target, self.host1, "test")
|
self.test_matcher.register(self.target, self.host1, "test")
|
||||||
self.test_matcher.register(self.target, self.host2, "test")
|
self.test_matcher.register(self.target, self.host2, "test")
|
||||||
|
|
||||||
@ -95,12 +95,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
def test_get_hosts_wrong_topic(self):
|
def test_get_hosts_wrong_topic(self):
|
||||||
target = oslo_messaging.Target(topic="no_such_topic")
|
target = oslo_messaging.Target(topic="no_such_topic")
|
||||||
hosts = []
|
self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
|
||||||
try:
|
|
||||||
hosts = self.test_matcher.get_hosts(target, "test")
|
|
||||||
except (timeout.TimeoutException, retrying.RetryError):
|
|
||||||
pass
|
|
||||||
self.assertEqual([], hosts)
|
|
||||||
|
|
||||||
def test_handle_redis_package_error(self):
|
def test_handle_redis_package_error(self):
|
||||||
if self.rpc_zmq_matchmaker == "redis":
|
if self.rpc_zmq_matchmaker == "redis":
|
||||||
@ -108,10 +103,10 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
|
|||||||
module = inspect.getmodule(self.test_matcher)
|
module = inspect.getmodule(self.test_matcher)
|
||||||
redis_package = module.redis
|
redis_package = module.redis
|
||||||
|
|
||||||
# 'redis' variable is set None, when importing package is failed
|
# 'redis' variable is set to None, when package importing is failed
|
||||||
module.redis = None
|
module.redis = None
|
||||||
self.assertRaises(ImportError, self.test_matcher.__init__,
|
self.assertRaises(ImportError, self.test_matcher.__init__,
|
||||||
self.conf)
|
self.conf)
|
||||||
|
|
||||||
# retrieve 'redis' variable wihch is set originally
|
# retrieve 'redis' variable which is set originally
|
||||||
module.redis = redis_package
|
module.redis = redis_package
|
||||||
|
@ -22,7 +22,6 @@ from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
|
|||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
|
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
@ -44,7 +43,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase):
|
|||||||
driver.matchmaker.__class__)
|
driver.matchmaker.__class__)
|
||||||
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
||||||
|
|
||||||
def test_error_name(self):
|
def test_error_url(self):
|
||||||
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
|
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
|
||||||
|
|
||||||
def test_dummy_url(self):
|
def test_dummy_url(self):
|
||||||
@ -59,30 +58,44 @@ class TestZmqTransportUrl(test_utils.BaseTestCase):
|
|||||||
driver.matchmaker.__class__)
|
driver.matchmaker.__class__)
|
||||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||||
|
|
||||||
def test_redis_url_no_creds(self):
|
def test_sentinel_url(self):
|
||||||
driver, url = self.setup_url("zmq+redis://host:65123/")
|
driver, url = self.setup_url("zmq+sentinel:///")
|
||||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
|
||||||
driver.matchmaker.__class__)
|
driver.matchmaker.__class__)
|
||||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
|
||||||
self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
|
|
||||||
self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
|
|
||||||
|
|
||||||
def test_redis_url_no_port(self):
|
def test_host_with_credentials_url(self):
|
||||||
driver, url = self.setup_url("zmq+redis://:p12@host:65123/")
|
driver, url = self.setup_url("zmq://:password@host:60000/")
|
||||||
|
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||||
|
driver.matchmaker.__class__)
|
||||||
|
self.assertEqual('zmq', driver.matchmaker.url.transport)
|
||||||
|
self.assertEqual(
|
||||||
|
[{"host": "host", "port": 60000, "password": "password"}],
|
||||||
|
driver.matchmaker._redis_hosts
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_redis_multiple_hosts_url(self):
|
||||||
|
driver, url = self.setup_url(
|
||||||
|
"zmq+redis://host1:60001,host2:60002,host3:60003/"
|
||||||
|
)
|
||||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
||||||
driver.matchmaker.__class__)
|
driver.matchmaker.__class__)
|
||||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
||||||
self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
|
self.assertEqual(
|
||||||
self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
|
[{"host": "host1", "port": 60001, "password": None},
|
||||||
self.assertEqual("p12", driver.matchmaker.standalone_redis["password"])
|
{"host": "host2", "port": 60002, "password": None},
|
||||||
|
{"host": "host3", "port": 60003, "password": None}],
|
||||||
|
driver.matchmaker._redis_hosts
|
||||||
|
)
|
||||||
|
|
||||||
def test_sentinel_multiple_hosts_url(self):
|
def test_sentinel_multiple_hosts_url(self):
|
||||||
driver, url = self.setup_url(
|
driver, url = self.setup_url(
|
||||||
"zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/")
|
"zmq+sentinel://host1:20001,host2:20002,host3:20003/"
|
||||||
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
|
)
|
||||||
|
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
|
||||||
driver.matchmaker.__class__)
|
driver.matchmaker.__class__)
|
||||||
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
|
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
|
||||||
self.assertEqual(3, len(driver.matchmaker.sentinel_hosts))
|
self.assertEqual(
|
||||||
expected = [("sentinel1", 20001), ("sentinel2", 20001),
|
[("host1", 20001), ("host2", 20002), ("host3", 20003)],
|
||||||
("sentinel3", 20001)]
|
driver.matchmaker._sentinel_hosts
|
||||||
self.assertEqual(expected, driver.matchmaker.sentinel_hosts)
|
)
|
||||||
|
@ -76,6 +76,7 @@ oslo.messaging.zmq.matchmaker =
|
|||||||
# Matchmakers for ZeroMQ
|
# Matchmakers for ZeroMQ
|
||||||
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
|
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
|
||||||
redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis
|
redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis
|
||||||
|
sentinel = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerSentinel
|
||||||
|
|
||||||
oslo.config.opts =
|
oslo.config.opts =
|
||||||
oslo.messaging = oslo_messaging.opts:list_opts
|
oslo.messaging = oslo_messaging.opts:list_opts
|
||||||
|
Loading…
Reference in New Issue
Block a user