diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 90c2c20bc..7ab43fb31 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -102,7 +102,7 @@ class ZmqDriver(base.BaseDriver): self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', - self.get_matchmaker_backend(url), + self.get_matchmaker_backend(self.conf, url), ).driver(self.conf, url=url) client_cls = zmq_client.ZmqClientProxy @@ -124,12 +124,13 @@ class ZmqDriver(base.BaseDriver): super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) - def get_matchmaker_backend(self, url): - zmq_transport, p, matchmaker_backend = url.transport.partition('+') + @staticmethod + def get_matchmaker_backend(conf, url): + zmq_transport, _, matchmaker_backend = url.transport.partition('+') assert zmq_transport == 'zmq', "Needs to be zmq for this transport!" if not matchmaker_backend: - return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker - elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS: + return conf.oslo_messaging_zmq.rpc_zmq_matchmaker + if matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS: raise rpc_common.RPCException( _LE("Incorrect matchmaker backend name %(backend_name)s!" "Available names are: %(available_names)s") % diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index a1aedf296..196f87f03 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -1,3 +1,4 @@ +# Copyright 2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -11,15 +12,21 @@ # License for the specific language governing permissions and limitations # under the License. +import abc +import functools import logging -from retrying import retry +import random +import time from oslo_config import cfg from oslo_utils import importutils +from retrying import retry +import six from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._i18n import _LW, _LE +from oslo_messaging._drivers.zmq_driver import zmq_updater +from oslo_messaging._i18n import _LE, _LI, _LW redis = importutils.try_import('redis') redis_sentinel = importutils.try_import('redis.sentinel') @@ -67,20 +74,46 @@ _PUBLISHERS_KEY = "PUBLISHERS" _ROUTERS_KEY = "ROUTERS" -def redis_connection_warn(func): - def func_wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except redis.ConnectionError: - LOG.warning(_LW("Redis is currently not available. " - "Messages are being sent to known targets using " - "existing connections. But new nodes " - "can not be discovered until Redis is up " - "and running.")) +def write_to_redis_connection_warn(func): + @functools.wraps(func) + def func_wrapper(self, *args, **kwargs): + # try to perform a write operation to all available hosts + success = False + for redis_instance in self._redis_instances: + if not redis_instance._is_available: + continue + try: + func(self, redis_instance, *args, **kwargs) + success = True + except redis.ConnectionError: + LOG.warning(_LW("Redis host %s is not available now."), + redis_instance._address) + redis_instance._is_available = False + redis_instance._ready_from = float("inf") + if not success: raise zmq_matchmaker_base.MatchmakerUnavailable() return func_wrapper +def read_from_redis_connection_warn(func): + @functools.wraps(func) + def func_wrapper(self, *args, **kwargs): + # try to perform a read operation from any available and ready host + for redis_instance in self._redis_instances: + if not redis_instance._is_available \ + or redis_instance._ready_from > time.time(): + continue + try: + return func(self, redis_instance, *args, **kwargs) + except redis.ConnectionError: + LOG.warning(_LW("Redis host %s is not available now."), + redis_instance._address) + redis_instance._is_available = False + redis_instance._ready_from = float("inf") + raise zmq_matchmaker_base.MatchmakerUnavailable() + return func_wrapper + + def no_reraise(func): def func_wrapper(*args, **kwargs): try: @@ -107,131 +140,84 @@ def retry_if_empty(hosts): return not hosts -class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): +@six.add_metaclass(abc.ABCMeta) +class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase): def __init__(self, conf, *args, **kwargs): - super(MatchmakerRedis, self).__init__(conf, *args, **kwargs) - self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis") if redis is None: raise ImportError(_LE("Redis package is not available!")) - self.sentinel_hosts = self._extract_sentinel_options() - if not self.sentinel_hosts: - self.standalone_redis = self._extract_standalone_redis_options() - self._redis = redis.StrictRedis( - host=self.standalone_redis["host"], - port=self.standalone_redis["port"], - password=self.standalone_redis["password"] - ) - else: - socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000. - sentinel = redis.sentinel.Sentinel( - sentinels=self.sentinel_hosts, - socket_timeout=socket_timeout - ) + super(MatchmakerRedisBase, self).__init__(conf, *args, **kwargs) - self._redis = sentinel.master_for( - self.conf.matchmaker_redis.sentinel_group_name, - socket_timeout=socket_timeout - ) + self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis") - def _extract_sentinel_options(self): - if self.url and self.url.hosts: - if len(self.url.hosts) > 1: - return [(host.hostname, host.port) for host in self.url.hosts] - elif self.conf.matchmaker_redis.sentinel_hosts: - s = self.conf.matchmaker_redis.sentinel_hosts - return [tuple(i.split(":")) for i in s] + @abc.abstractmethod + def _sadd(self, key, value, expire): + pass - def _extract_standalone_redis_options(self): - if self.url and self.url.hosts: - redis_host = self.url.hosts[0] - return {"host": redis_host.hostname, - "port": redis_host.port, - "password": redis_host.password} - else: - return {"host": self.conf.matchmaker_redis.host, - "port": self.conf.matchmaker_redis.port, - "password": self.conf.matchmaker_redis.password} + @abc.abstractmethod + def _srem(self, key, value): + pass - def _add_key_with_expire(self, key, value, expire): - self._redis.sadd(key, value) - if expire > 0: - self._redis.expire(key, expire) + @abc.abstractmethod + def _smembers(self, key): + pass @no_reraise - @redis_connection_warn def register_publisher(self, hostname, expire=-1): - host_str = ",".join(hostname) - self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire) + self._sadd(_PUBLISHERS_KEY, ','.join(hostname), expire) @no_reraise - @redis_connection_warn def unregister_publisher(self, hostname): - host_str = ",".join(hostname) - self._redis.srem(_PUBLISHERS_KEY, host_str) + self._srem(_PUBLISHERS_KEY, ','.join(hostname)) @empty_list_on_error - @redis_connection_warn def get_publishers(self): - hosts = [] - hosts.extend([tuple(host_str.split(",")) - for host_str in - self._get_hosts_by_key(_PUBLISHERS_KEY)]) - return hosts + return [tuple(hostname.split(',')) for hostname + in self._smembers(_PUBLISHERS_KEY)] @no_reraise - @redis_connection_warn def register_router(self, hostname, expire=-1): - self._add_key_with_expire(_ROUTERS_KEY, hostname, expire) + self._sadd(_ROUTERS_KEY, hostname, expire) @no_reraise - @redis_connection_warn def unregister_router(self, hostname): - self._redis.srem(_ROUTERS_KEY, hostname) + self._srem(_ROUTERS_KEY, hostname) @empty_list_on_error - @redis_connection_warn def get_routers(self): - return self._get_hosts_by_key(_ROUTERS_KEY) + return self._smembers(_ROUTERS_KEY) - @redis_connection_warn def get_hosts_by_key(self, key): - return self._get_hosts_by_key(key) + return self._smembers(key) - def _get_hosts_by_key(self, key): - return self._redis.smembers(key) - - @redis_connection_warn def register(self, target, hostname, listener_type, expire=-1): if target.server: key = zmq_address.target_to_key(target, listener_type) - self._add_key_with_expire(key, hostname, expire) + self._sadd(key, hostname, expire) key = zmq_address.prefix_str(target.topic, listener_type) - self._add_key_with_expire(key, hostname, expire) + self._sadd(key, hostname, expire) @no_reraise - @redis_connection_warn def unregister(self, target, hostname, listener_type): if target.server: key = zmq_address.target_to_key(target, listener_type) - self._redis.srem(key, hostname) + self._srem(key, hostname) key = zmq_address.prefix_str(target.topic, listener_type) - self._redis.srem(key, hostname) + self._srem(key, hostname) - @redis_connection_warn def get_hosts(self, target, listener_type): hosts = [] if target.server: key = zmq_address.target_to_key(target, listener_type) - hosts.extend(self._get_hosts_by_key(key)) + hosts.extend(self._smembers(key)) if not hosts: key = zmq_address.prefix_str(target.topic, listener_type) - hosts.extend(self._get_hosts_by_key(key)) + hosts.extend(self._smembers(key)) LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s", {"target": target, "hosts": hosts}) @@ -241,10 +227,9 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): def get_hosts_retry(self, target, listener_type): return self._retry_method(target, listener_type, self.get_hosts) - @redis_connection_warn def get_hosts_fanout(self, target, listener_type): key = zmq_address.prefix_str(target.topic, listener_type) - hosts = list(self._get_hosts_by_key(key)) + hosts = list(self._smembers(key)) LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s", {"target": target, "hosts": hosts}) @@ -262,3 +247,154 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): def _get_hosts_retry(target, listener_type): return method(target, listener_type) return _get_hosts_retry(target, listener_type) + + +class MatchmakerRedis(MatchmakerRedisBase): + + def __init__(self, conf, *args, **kwargs): + super(MatchmakerRedis, self).__init__(conf, *args, **kwargs) + + self._redis_hosts = self._extract_redis_hosts() + + self._redis_instances = [ + redis.StrictRedis(host=redis_host["host"], + port=redis_host["port"], + password=redis_host["password"]) + for redis_host in self._redis_hosts + ] + + for redis_host, redis_instance \ + in six.moves.zip(self._redis_hosts, self._redis_instances): + address = "{host}:{port}".format(host=redis_host["host"], + port=redis_host["port"]) + redis_instance._address = address + is_available = self._check_availability(redis_instance) + if is_available: + redis_instance._is_available = True + redis_instance._ready_from = time.time() + else: + LOG.warning(_LW("Redis host %s is not available now."), + address) + redis_instance._is_available = False + redis_instance._ready_from = float("inf") + + # NOTE(gdavoian): store instances in a random order + # (for the sake of load balancing) + random.shuffle(self._redis_instances) + + self._availability_updater = \ + MatchmakerRedisAvailabilityUpdater(self.conf, self) + + def _extract_redis_hosts(self): + if self.url and self.url.hosts: + return [{"host": redis_host.hostname, + "port": redis_host.port, + "password": redis_host.password} + for redis_host in self.url.hosts] + else: + # FIXME(gdavoian): remove the code below along with the + # corresponding deprecated options in the next release + return [{"host": self.conf.matchmaker_redis.host, + "port": self.conf.matchmaker_redis.port, + "password": self.conf.matchmaker_redis.password}] + + @staticmethod + def _check_availability(redis_instance): + try: + redis_instance.ping() + return True + except redis.ConnectionError: + return False + + @write_to_redis_connection_warn + def _sadd(self, redis_instance, key, value, expire): + redis_instance.sadd(key, value) + if expire > 0: + redis_instance.expire(key, expire) + + @write_to_redis_connection_warn + def _srem(self, redis_instance, key, value): + redis_instance.srem(key, value) + + @read_from_redis_connection_warn + def _smembers(self, redis_instance, key): + return redis_instance.smembers(key) + + +class MatchmakerRedisAvailabilityUpdater(zmq_updater.UpdaterBase): + + _MIN_SLEEP_FOR = 10 + + def __init__(self, conf, matchmaker): + super(MatchmakerRedisAvailabilityUpdater, self).__init__( + conf, matchmaker, self._update_availability, + sleep_for=conf.oslo_messaging_zmq.zmq_target_update + ) + + def _update_availability(self): + fraction_of_available_instances = 0 + for redis_instance in self.matchmaker._redis_instances: + if not redis_instance._is_available: + is_available = \ + self.matchmaker._check_availability(redis_instance) + if is_available: + LOG.info(_LI("Redis host %s is available again."), + redis_instance._address) + fraction_of_available_instances += 1 + # NOTE(gdavoian): mark an instance as available for + # writing to, but wait until all services register + # themselves in it for making the instance ready for + # reading from + redis_instance._is_available = True + redis_instance._ready_from = time.time() + \ + self.conf.oslo_messaging_zmq.zmq_target_update + else: + fraction_of_available_instances += 1 + fraction_of_available_instances /= \ + float(len(self.matchmaker._redis_instances)) + # NOTE(gdavoian): make the sleep time proportional to the number of + # currently available instances + self._sleep_for = max(self.conf.oslo_messaging_zmq.zmq_target_update * + fraction_of_available_instances, + self._MIN_SLEEP_FOR) + + +class MatchmakerSentinel(MatchmakerRedisBase): + + def __init__(self, conf, *args, **kwargs): + super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs) + + self._sentinel_hosts = self._extract_sentinel_hosts() + + socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000. + + sentinel = redis_sentinel.Sentinel( + sentinels=self._sentinel_hosts, + socket_timeout=socket_timeout + ) + + self._redis_instance = sentinel.master_for( + self.conf.matchmaker_redis.sentinel_group_name, + socket_timeout=socket_timeout + ) + + def _extract_sentinel_hosts(self): + if self.url and self.url.hosts: + return [(sentinel_host.hostname, sentinel_host.port) + for sentinel_host in self.url.hosts] + elif self.conf.matchmaker_redis.sentinel_hosts: + return [tuple(sentinel_host.split(':')) for sentinel_host + in self.conf.matchmaker_redis.sentinel_hosts] + else: + return [] + + def _sadd(self, key, value, expire): + self._redis_instance.sadd(key, value) + if expire > 0: + self._redis_instance.expire(key, expire) + + def _srem(self, key, value): + self._redis_instance.srem(key, value) + + def _smembers(self, key): + return self._redis_instance.smembers(key) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index bfd6f98ea..76ade412f 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -19,9 +19,11 @@ import socket from oslo_config import cfg from stevedore import driver +from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI +from oslo_messaging import transport LOG = logging.getLogger(__name__) @@ -51,7 +53,10 @@ zmq_proxy_opts = [ cfg.BoolOpt('ack_pub_sub', default=False, help='Use acknowledgements for notifying senders about ' 'receiving their fanout messages. ' - 'The option is ignored if PUB/SUB is disabled.') + 'The option is ignored if PUB/SUB is disabled.'), + + cfg.StrOpt('url', default='zmq://127.0.0.1:6379/', + help='ZMQ-driver transport URL with additional configurations') ] @@ -79,6 +84,8 @@ def parse_command_line_args(conf): parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub', action='store_true', help='Acknowledge PUB/SUB messages') + parser.add_argument('-u', '--url', dest='url', type=str, + help='Transport URL with configurations') parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='Turn on DEBUG logging level instead of INFO') @@ -108,6 +115,8 @@ def parse_command_line_args(conf): if args.ack_pub_sub: conf.set_override('ack_pub_sub', args.ack_pub_sub, group='zmq_proxy_opts') + if args.url: + conf.set_override('url', args.url, group='zmq_proxy_opts') class ZmqProxy(object): @@ -152,10 +161,13 @@ class ZmqProxy(object): def __init__(self, conf): super(ZmqProxy, self).__init__() self.conf = conf + url = transport.TransportURL.parse( + self.conf, url=self.conf.zmq_proxy_opts.url + ) self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', - self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker, - ).driver(self.conf) + impl_zmq.ZmqDriver.get_matchmaker_backend(self.conf, url) + ).driver(self.conf, url=url) self.context = zmq.Context() self.proxy = self._choose_proxy_implementation() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 5bbe02ec6..eec2004db 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -20,7 +20,7 @@ from oslo_messaging._drivers import base from oslo_messaging import server -MATCHMAKER_BACKENDS = ('redis', 'dummy') +MATCHMAKER_BACKENDS = ('redis', 'sentinel', 'dummy') MATCHMAKER_DEFAULT = 'redis' diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py index 5eda0cc6b..7c117ff83 100644 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -12,12 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from fixtures._fixtures import timeout import inspect -import retrying from stevedore import driver import testscenarios -import testtools import oslo_messaging from oslo_messaging.tests import utils as test_utils @@ -31,8 +28,7 @@ def redis_available(): if not redis: return False try: - c = redis.StrictRedis(socket_timeout=1) - c.ping() + redis.StrictRedis(socket_timeout=1).ping() return True except redis.exceptions.ConnectionError: return False @@ -41,7 +37,6 @@ def redis_available(): load_tests = testscenarios.load_tests_apply_scenarios -@testtools.skipIf(not redis_available(), "redis unavailable") class TestImplMatchmaker(test_utils.BaseTestCase): scenarios = [ @@ -52,13 +47,18 @@ class TestImplMatchmaker(test_utils.BaseTestCase): def setUp(self): super(TestImplMatchmaker, self).setUp() + if self.rpc_zmq_matchmaker == "redis": + if not redis_available(): + self.skipTest("redis unavailable") + self.test_matcher = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', self.rpc_zmq_matchmaker, ).driver(self.conf) if self.rpc_zmq_matchmaker == "redis": - self.addCleanup(self.test_matcher._redis.flushdb) + for redis_instance in self.test_matcher._redis_instances: + self.addCleanup(redis_instance.flushdb) self.target = oslo_messaging.Target(topic="test_topic") self.host1 = b"test_host1" @@ -77,7 +77,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase): self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), [self.host1, self.host2]) - def test_register_unsibscribe(self): + def test_register_unregister(self): self.test_matcher.register(self.target, self.host1, "test") self.test_matcher.register(self.target, self.host2, "test") @@ -95,12 +95,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase): def test_get_hosts_wrong_topic(self): target = oslo_messaging.Target(topic="no_such_topic") - hosts = [] - try: - hosts = self.test_matcher.get_hosts(target, "test") - except (timeout.TimeoutException, retrying.RetryError): - pass - self.assertEqual([], hosts) + self.assertEqual([], self.test_matcher.get_hosts(target, "test")) def test_handle_redis_package_error(self): if self.rpc_zmq_matchmaker == "redis": @@ -108,10 +103,10 @@ class TestImplMatchmaker(test_utils.BaseTestCase): module = inspect.getmodule(self.test_matcher) redis_package = module.redis - # 'redis' variable is set None, when importing package is failed + # 'redis' variable is set to None, when package importing is failed module.redis = None self.assertRaises(ImportError, self.test_matcher.__init__, self.conf) - # retrieve 'redis' variable wihch is set originally + # retrieve 'redis' variable which is set originally module.redis = redis_package diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py index 2456ddf0c..765c5fa3f 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py @@ -22,7 +22,6 @@ from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging.tests import utils as test_utils - zmq = zmq_async.import_zmq() @@ -44,7 +43,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): driver.matchmaker.__class__) self.assertEqual('zmq', driver.matchmaker.url.transport) - def test_error_name(self): + def test_error_url(self): self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///") def test_dummy_url(self): @@ -59,30 +58,44 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - def test_redis_url_no_creds(self): - driver, url = self.setup_url("zmq+redis://host:65123/") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, + def test_sentinel_url(self): + driver, url = self.setup_url("zmq+sentinel:///") + self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) - self.assertEqual(65123, driver.matchmaker.standalone_redis["port"]) + self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) - def test_redis_url_no_port(self): - driver, url = self.setup_url("zmq+redis://:p12@host:65123/") + def test_host_with_credentials_url(self): + driver, url = self.setup_url("zmq://:password@host:60000/") + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, + driver.matchmaker.__class__) + self.assertEqual('zmq', driver.matchmaker.url.transport) + self.assertEqual( + [{"host": "host", "port": 60000, "password": "password"}], + driver.matchmaker._redis_hosts + ) + + def test_redis_multiple_hosts_url(self): + driver, url = self.setup_url( + "zmq+redis://host1:60001,host2:60002,host3:60003/" + ) self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) - self.assertEqual(65123, driver.matchmaker.standalone_redis["port"]) - self.assertEqual("p12", driver.matchmaker.standalone_redis["password"]) + self.assertEqual( + [{"host": "host1", "port": 60001, "password": None}, + {"host": "host2", "port": 60002, "password": None}, + {"host": "host3", "port": 60003, "password": None}], + driver.matchmaker._redis_hosts + ) def test_sentinel_multiple_hosts_url(self): driver, url = self.setup_url( - "zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, + "zmq+sentinel://host1:20001,host2:20002,host3:20003/" + ) + self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - self.assertEqual(3, len(driver.matchmaker.sentinel_hosts)) - expected = [("sentinel1", 20001), ("sentinel2", 20001), - ("sentinel3", 20001)] - self.assertEqual(expected, driver.matchmaker.sentinel_hosts) + self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) + self.assertEqual( + [("host1", 20001), ("host2", 20002), ("host3", 20003)], + driver.matchmaker._sentinel_hosts + ) diff --git a/setup.cfg b/setup.cfg index 014f1588e..90abec984 100644 --- a/setup.cfg +++ b/setup.cfg @@ -76,6 +76,7 @@ oslo.messaging.zmq.matchmaker = # Matchmakers for ZeroMQ dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis + sentinel = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerSentinel oslo.config.opts = oslo.messaging = oslo_messaging.opts:list_opts