diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index e6862b8ba..891520f98 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -14,8 +14,6 @@ import logging -import retrying - from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_receivers @@ -25,6 +23,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_updater +from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) @@ -52,27 +51,29 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): return self.socket def send_call(self, request): - try: - request.routing_key = \ - self.routing_table.get_routable_host(request.target) - except retrying.RetryError: + request.routing_key = \ + self.routing_table.get_routable_host(request.target) + if request.routing_key is None: self._raise_timeout(request) return super(DealerPublisherProxy, self).send_call(request) def _get_routing_keys(self, request): - try: - if request.msg_type in zmq_names.DIRECT_TYPES: - return [self.routing_table.get_routable_host(request.target)] - else: - return \ - [zmq_address.target_to_subscribe_filter(request.target)] \ - if self.conf.oslo_messaging_zmq.use_pub_sub else \ - self.routing_table.get_all_hosts(request.target) - except retrying.RetryError: - return [] + if request.msg_type in zmq_names.DIRECT_TYPES: + return [self.routing_table.get_routable_host(request.target)] + else: + return \ + [zmq_address.target_to_subscribe_filter(request.target)] \ + if self.conf.oslo_messaging_zmq.use_pub_sub else \ + self.routing_table.get_all_hosts(request.target) def _send_non_blocking(self, request): for routing_key in self._get_routing_keys(request): + if routing_key is None: + LOG.warning(_LW("Matchmaker contains no record for specified " + "target %(target)s. Dropping message %(id)s.") + % {"target": request.target, + "id": request.message_id}) + continue request.routing_key = routing_key self.sender.send(self.socket, request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index d0b6227f2..d6f9b94ee 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -12,13 +12,19 @@ # License for the specific language governing permissions and limitations # under the License. +import logging +import retrying import time +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LW zmq = zmq_async.import_zmq() +LOG = logging.getLogger(__name__) + class RoutingTable(object): """This class implements local routing-table cache @@ -34,13 +40,22 @@ class RoutingTable(object): self.routable_hosts = {} def get_all_hosts(self, target): - self._update_routing_table(target) - return list(self.routable_hosts.get(str(target), [])) + self._update_routing_table( + target, + get_hosts=self.matchmaker.get_hosts_fanout, + get_hosts_retry=self.matchmaker.get_hosts_fanout_retry) + return self.routable_hosts.get(str(target), []) def get_routable_host(self, target): - self._update_routing_table(target) - hosts_for_target = self.routable_hosts[str(target)] - host = hosts_for_target.pop() + self._update_routing_table( + target, + get_hosts=self.matchmaker.get_hosts, + get_hosts_retry=self.matchmaker.get_hosts_retry) + hosts_for_target = self.routable_hosts.get(str(target)) + if not hosts_for_target: + # Matchmaker doesn't contain any target + return None + host = hosts_for_target.pop(0) if not hosts_for_target: self._renew_routable_hosts(target) return host @@ -49,18 +64,37 @@ class RoutingTable(object): return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ <= time.time() - tm - def _update_routing_table(self, target): + def _update_routing_table(self, target, get_hosts, get_hosts_retry): routing_record = self.routing_table.get(str(target)) if routing_record is None: - self._fetch_hosts(target) + self._fetch_hosts(target, get_hosts, get_hosts_retry) self._renew_routable_hosts(target) elif self._is_tm_expired(routing_record[1]): - self._fetch_hosts(target) + self._fetch_hosts(target, get_hosts, get_hosts_retry) - def _fetch_hosts(self, target): - self.routing_table[str(target)] = (self.matchmaker.get_hosts( - target, zmq_names.socket_type_str(zmq.DEALER)), time.time()) + def _fetch_hosts(self, target, get_hosts, get_hosts_retry): + key = str(target) + if key not in self.routing_table: + try: + self.routing_table[key] = (get_hosts_retry( + target, zmq_names.socket_type_str(zmq.DEALER)), + time.time()) + except retrying.RetryError: + LOG.warning(_LW("Matchmaker contains no hosts for target %s") + % key) + else: + try: + hosts = get_hosts( + target, zmq_names.socket_type_str(zmq.DEALER)) + self.routing_table[key] = (hosts, time.time()) + except zmq_matchmaker_base.MatchmakerUnavailable: + LOG.warning(_LW("Matchmaker contains no hosts for target %s") + % key) def _renew_routable_hosts(self, target): - hosts, _ = self.routing_table[str(target)] - self.routable_hosts[str(target)] = list(hosts) + key = str(target) + try: + hosts, _ = self.routing_table[key] + self.routable_hosts[key] = list(hosts) + except KeyError: + self.routable_hosts[key] = [] diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index aa82b8453..6472f5da4 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -34,7 +34,7 @@ class SocketsManager(object): self.socket_to_routers = None def get_hosts(self, target): - return self.matchmaker.get_hosts( + return self.matchmaker.get_hosts_retry( target, zmq_names.socket_type_str(self.listener_type)) @staticmethod diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py similarity index 82% rename from oslo_messaging/_drivers/zmq_driver/matchmaker/base.py rename to oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py index 6ad07e5f4..8376db8d9 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_base.py @@ -16,14 +16,24 @@ import collections import six +from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._i18n import _LE + + +class MatchmakerUnavailable(rpc_common.RPCException): + """Exception is raised on connection error to matchmaker service""" + + def __init__(self): + super(MatchmakerUnavailable, self).__init__( + message=_LE("Matchmaker is not currently available.")) @six.add_metaclass(abc.ABCMeta) -class MatchMakerBase(object): +class MatchmakerBase(object): def __init__(self, conf, *args, **kwargs): - super(MatchMakerBase, self).__init__() + super(MatchmakerBase, self).__init__() self.conf = conf self.url = kwargs.get('url') @@ -126,11 +136,22 @@ class MatchMakerBase(object): :returns: a list of "hostname:port" hosts """ + @abc.abstractmethod + def get_hosts_retry(self, target, listener_type): + """Retry if not hosts - used on client first time connection. -class DummyMatchMaker(MatchMakerBase): + :param target: the default target for invocations + :type target: Target + :param listener_type: listener socket type ROUTER, SUB etc. + :type listener_type: str + :returns: a list of "hostname:port" hosts + """ + + +class MatchmakerDummy(MatchmakerBase): def __init__(self, conf, *args, **kwargs): - super(DummyMatchMaker, self).__init__(conf, *args, **kwargs) + super(MatchmakerDummy, self).__init__(conf, *args, **kwargs) self._cache = collections.defaultdict(list) self._publishers = set() @@ -171,3 +192,8 @@ class DummyMatchMaker(MatchMakerBase): def get_hosts(self, target, listener_type): key = zmq_address.target_to_key(target, listener_type) return self._cache[key] + + def get_hosts_retry(self, target, listener_type): + # Do not complicate dummy matchmaker + # This method will act smarter in real world matchmakers + return self.get_hosts(target, listener_type) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py similarity index 68% rename from oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py rename to oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index 440c00b32..ff2036800 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -11,15 +11,15 @@ # License for the specific language governing permissions and limitations # under the License. -import inspect import logging +from retrying import retry from oslo_config import cfg from oslo_utils import importutils -from oslo_messaging._drivers.zmq_driver.matchmaker import base +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_address -from retrying import retry +from oslo_messaging._i18n import _LW redis = importutils.try_import('redis') redis_sentinel = importutils.try_import('redis.sentinel') @@ -53,10 +53,10 @@ matchmaker_redis_opts = [ default='oslo-messaging-zeromq', help='Redis replica set name.'), cfg.IntOpt('wait_timeout', - default=5000, + default=2000, help='Time in ms to wait between connection attempts.'), cfg.IntOpt('check_timeout', - default=60000, + default=20000, help='Time in ms to wait before the transaction is killed.'), cfg.IntOpt('socket_timeout', default=10000, @@ -65,37 +65,52 @@ matchmaker_redis_opts = [ _PUBLISHERS_KEY = "PUBLISHERS" _ROUTERS_KEY = "ROUTERS" -_RETRY_METHODS = ("get_hosts", "get_publishers", "get_routers") + + +def redis_connection_warn(func): + def func_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except redis.ConnectionError: + LOG.warning(_LW("Redis is currently not available. " + "Messages are being sent to known targets using " + "existing connections. But new nodes " + "can not be discovered until Redis is up " + "and running.")) + raise zmq_matchmaker_base.MatchmakerUnavailable() + return func_wrapper + + +def no_reraise(func): + def func_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except zmq_matchmaker_base.MatchmakerUnavailable: + pass + return func_wrapper + + +def empty_list_on_error(func): + def func_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except zmq_matchmaker_base.MatchmakerUnavailable: + return [] + return func_wrapper def retry_if_connection_error(ex): - return isinstance(ex, redis.ConnectionError) + return isinstance(ex, zmq_matchmaker_base.MatchmakerUnavailable) def retry_if_empty(hosts): return not hosts -def apply_retrying(obj, cfg): - for attr_name, attr in inspect.getmembers(obj): - if not (inspect.ismethod(attr) or inspect.isfunction(attr)): - continue - if attr_name in _RETRY_METHODS: - setattr( - obj, - attr_name, - retry( - wait_fixed=cfg.matchmaker_redis.wait_timeout, - stop_max_delay=cfg.matchmaker_redis.check_timeout, - retry_on_exception=retry_if_connection_error, - retry_on_result=retry_if_empty - )(attr)) - - -class RedisMatchMaker(base.MatchMakerBase): +class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): def __init__(self, conf, *args, **kwargs): - super(RedisMatchMaker, self).__init__(conf, *args, **kwargs) + super(MatchmakerRedis, self).__init__(conf, *args, **kwargs) self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis") self.sentinel_hosts = self._extract_sentinel_options() @@ -117,7 +132,6 @@ class RedisMatchMaker(base.MatchMakerBase): self.conf.matchmaker_redis.sentinel_group_name, socket_timeout=socket_timeout ) - apply_retrying(self, self.conf) def _extract_sentinel_options(self): if self.url and self.url.hosts: @@ -143,14 +157,20 @@ class RedisMatchMaker(base.MatchMakerBase): if expire > 0: self._redis.expire(key, expire) + @no_reraise + @redis_connection_warn def register_publisher(self, hostname, expire=-1): host_str = ",".join(hostname) self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire) + @no_reraise + @redis_connection_warn def unregister_publisher(self, hostname): host_str = ",".join(hostname) self._redis.srem(_PUBLISHERS_KEY, host_str) + @empty_list_on_error + @redis_connection_warn def get_publishers(self): hosts = [] hosts.extend([tuple(host_str.split(",")) @@ -158,18 +178,25 @@ class RedisMatchMaker(base.MatchMakerBase): self._get_hosts_by_key(_PUBLISHERS_KEY)]) return hosts + @no_reraise + @redis_connection_warn def register_router(self, hostname, expire=-1): self._add_key_with_expire(_ROUTERS_KEY, hostname, expire) + @no_reraise + @redis_connection_warn def unregister_router(self, hostname): self._redis.srem(_ROUTERS_KEY, hostname) + @empty_list_on_error + @redis_connection_warn def get_routers(self): return self._get_hosts_by_key(_ROUTERS_KEY) def _get_hosts_by_key(self, key): return self._redis.smembers(key) + @redis_connection_warn def register(self, target, hostname, listener_type, expire=-1): if target.topic and target.server: key = zmq_address.target_to_key(target, listener_type) @@ -179,6 +206,8 @@ class RedisMatchMaker(base.MatchMakerBase): key = zmq_address.prefix_str(target.topic, listener_type) self._add_key_with_expire(key, hostname, expire) + @no_reraise + @redis_connection_warn def unregister(self, target, hostname, listener_type): if target.topic and target.server: key = zmq_address.target_to_key(target, listener_type) @@ -188,9 +217,8 @@ class RedisMatchMaker(base.MatchMakerBase): key = zmq_address.prefix_str(target.topic, listener_type) self._redis.srem(key, hostname) + @redis_connection_warn def get_hosts(self, target, listener_type): - LOG.debug("[Redis] get_hosts for target %s", target) - hosts = [] if target.topic and target.server: @@ -201,4 +229,39 @@ class RedisMatchMaker(base.MatchMakerBase): key = zmq_address.prefix_str(target.topic, listener_type) hosts.extend(self._get_hosts_by_key(key)) + LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s", + {"target": target, "hosts": hosts}) + return hosts + + def get_hosts_retry(self, target, listener_type): + return self._retry_method(target, listener_type, self.get_hosts) + + @redis_connection_warn + def get_hosts_fanout(self, target, listener_type): + LOG.debug("[Redis] get_hosts for target %s", target) + + hosts = [] + + if target.topic and target.server: + key = zmq_address.target_to_key(target, listener_type) + hosts.extend(self._get_hosts_by_key(key)) + + key = zmq_address.prefix_str(target.topic, listener_type) + hosts.extend(self._get_hosts_by_key(key)) + + return hosts + + def get_hosts_fanout_retry(self, target, listener_type): + return self._retry_method(target, listener_type, self.get_hosts_fanout) + + def _retry_method(self, target, listener_type, method): + conf = self.conf + + @retry(retry_on_result=retry_if_empty, + wrap_exception=True, + wait_fixed=conf.matchmaker_redis.wait_timeout, + stop_max_delay=conf.matchmaker_redis.check_timeout) + def _get_hosts_retry(target, listener_type): + return method(target, listener_type) + return _get_hosts_retry(target, listener_type) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 69a707789..578f008a6 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -18,12 +18,13 @@ import logging import six from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LE +from oslo_messaging._i18n import _LE, _LI, _LW LOG = logging.getLogger(__name__) @@ -116,10 +117,25 @@ class TargetUpdater(zmq_updater.UpdaterBase): self._update_target) def _update_target(self): - self.matchmaker.register( - self.target, self.host, - zmq_names.socket_type_str(self.socket_type), - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) + try: + self.matchmaker.register( + self.target, self.host, + zmq_names.socket_type_str(self.socket_type), + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) + + if self._sleep_for != \ + self.conf.oslo_messaging_zmq.zmq_target_update: + self._sleep_for = \ + self.conf.oslo_messaging_zmq.zmq_target_update + LOG.info(_LI("Falling back to the normal update %d sec") + % self._sleep_for) + + except zmq_matchmaker_base.MatchmakerUnavailable: + # Update target frequently until first successful update + # After matchmaker is back update normally as of config + self._sleep_for = 10 + LOG.warning(_LW("Failed connecting to the Matchmaker, " + "update each %d sec") % self._sleep_for) def stop(self): super(TargetUpdater, self).stop() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index 2d4f9e0a1..8ce53c73a 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -31,7 +31,7 @@ class UpdaterBase(object): self.conf = conf self.matchmaker = matchmaker self.update_method = update_method - # make first update immediately + self._sleep_for = self.conf.oslo_messaging_zmq.zmq_target_update self.update_method() self.executor = zmq_async.get_executor(method=self._update_loop) self.executor.execute() @@ -41,7 +41,7 @@ class UpdaterBase(object): def _update_loop(self): self.update_method() - time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update) + time.sleep(self._sleep_for) def cleanup(self): self.executor.stop() diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 5eb4e5ef7..b294a9aa0 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -62,7 +62,7 @@ class ConfFixture(fixtures.Fixture): 'zmq_opts', 'oslo_messaging_zmq') _import_opts(self.conf, 'oslo_messaging._drivers.zmq_driver.' - 'matchmaker.matchmaker_redis', + 'matchmaker.zmq_matchmaker_redis', 'matchmaker_redis_opts', 'matchmaker_redis') _import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts') diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index c252496ae..9fa87f118 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -27,7 +27,7 @@ from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers.impl_zmq import zmq_options from oslo_messaging._drivers.pika_driver import pika_connection_factory -from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis from oslo_messaging.notify import notifier from oslo_messaging.rpc import client from oslo_messaging import server @@ -44,7 +44,7 @@ _global_opt_lists = [ _opts = [ (None, list(itertools.chain(*_global_opt_lists))), - ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), + ('matchmaker_redis', zmq_matchmaker_redis.matchmaker_redis_opts), ('oslo_messaging_zmq', zmq_options.zmq_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_notifications', notifier._notifier_opts), diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py index f80ee912f..2456ddf0c 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py @@ -16,8 +16,9 @@ import testtools import oslo_messaging from oslo_messaging._drivers import common -from oslo_messaging._drivers.zmq_driver.matchmaker.base import DummyMatchMaker -from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis +from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \ + import MatchmakerDummy +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging.tests import utils as test_utils @@ -39,7 +40,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): def test_empty_url(self): driver, url = self.setup_url("zmq:///") - self.assertIs(matchmaker_redis.RedisMatchMaker, + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq', driver.matchmaker.url.transport) @@ -48,19 +49,19 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): def test_dummy_url(self): driver, url = self.setup_url("zmq+dummy:///") - self.assertIs(DummyMatchMaker, + self.assertIs(MatchmakerDummy, driver.matchmaker.__class__) self.assertEqual('zmq+dummy', driver.matchmaker.url.transport) def test_redis_url(self): driver, url = self.setup_url("zmq+redis:///") - self.assertIs(matchmaker_redis.RedisMatchMaker, + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) def test_redis_url_no_creds(self): driver, url = self.setup_url("zmq+redis://host:65123/") - self.assertIs(matchmaker_redis.RedisMatchMaker, + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) @@ -68,7 +69,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): def test_redis_url_no_port(self): driver, url = self.setup_url("zmq+redis://:p12@host:65123/") - self.assertIs(matchmaker_redis.RedisMatchMaker, + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) self.assertEqual("host", driver.matchmaker.standalone_redis["host"]) @@ -78,7 +79,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase): def test_sentinel_multiple_hosts_url(self): driver, url = self.setup_url( "zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/") - self.assertIs(matchmaker_redis.RedisMatchMaker, + self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, driver.matchmaker.__class__) self.assertEqual('zmq+redis', driver.matchmaker.url.transport) self.assertEqual(3, len(driver.matchmaker.sentinel_hosts)) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 1d215a593..ab4387d26 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -20,6 +20,7 @@ from oslo_config import cfg from six import moves import oslo_messaging +from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging.notify import notifier from oslo_messaging.tests import utils as test_utils @@ -291,6 +292,8 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): if not self.url: self.skipTest("No transport url configured") + zmq_options.register_opts(conf) + zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER') if zmq_matchmaker: self.config(rpc_zmq_matchmaker=zmq_matchmaker, @@ -305,13 +308,14 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): self.config(check_timeout=10000, group="matchmaker_redis") self.config(wait_timeout=1000, group="matchmaker_redis") zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB') - if zmq_use_pub_sub: - self.config(use_pub_sub=zmq_use_pub_sub, - group='oslo_messaging_zmq') + self.config(use_pub_sub=zmq_use_pub_sub, + group='oslo_messaging_zmq') zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY') - if zmq_use_router_proxy: - self.config(use_router_proxy=zmq_use_router_proxy, - group='oslo_messaging_zmq') + self.config(use_router_proxy=zmq_use_router_proxy, + group='oslo_messaging_zmq') + zmq_use_acks = os.environ.get('ZMQ_USE_ACKS') + self.config(rpc_use_acks=zmq_use_acks, + group='oslo_messaging_zmq') class NotificationFixture(fixtures.Fixture): diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh index 12649c88b..d50cad1fb 100755 --- a/setup-test-env-zmq-proxy.sh +++ b/setup-test-env-zmq-proxy.sh @@ -12,6 +12,7 @@ export ZMQ_REDIS_PORT=65123 export ZMQ_IPC_DIR=${DATADIR} export ZMQ_USE_PUB_SUB=false export ZMQ_USE_ROUTER_PROXY=true +export ZMQ_USE_ACKS=false export ZMQ_PROXY_HOST=127.0.0.1 @@ -23,6 +24,7 @@ rpc_zmq_matchmaker=${ZMQ_MATCHMAKER} rpc_zmq_ipc_dir=${ZMQ_IPC_DIR} use_pub_sub=${ZMQ_USE_PUB_SUB} use_router_proxy=${ZMQ_USE_ROUTER_PROXY} +rpc_use_acks=${ZMQ_USE_ACKS} [matchmaker_redis] port=${ZMQ_REDIS_PORT} diff --git a/setup.cfg b/setup.cfg index 9a3665e83..3bc893317 100644 --- a/setup.cfg +++ b/setup.cfg @@ -74,8 +74,8 @@ oslo.messaging.pika.connection_factory = oslo.messaging.zmq.matchmaker = # Matchmakers for ZeroMQ - dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker - redis = oslo_messaging._drivers.zmq_driver.matchmaker.matchmaker_redis:RedisMatchMaker + dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy + redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis oslo.config.opts = oslo.messaging = oslo_messaging.opts:list_opts