From 34e326f5aa0ee8486b4f5522eb3b2620ba3b347e Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 22 Mar 2017 14:51:46 -0400 Subject: [PATCH] [zmq] Prevent access to rpc_response_timeout rpc_response_timeout is not present when sending notifications. All RPC call requests have a timeout attribute - use that. In the case where there is no timeout, use retry value if present. Change-Id: I27a3dd0962a06f15e85e9c9c8c24aa7786c71056 Closes-Bug: 1675510 --- oslo_messaging/_cmd/zmq_proxy.py | 3 ++- .../publishers/dealer/zmq_dealer_publisher_direct.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index 29bf34e03..c0b07c38f 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -19,6 +19,7 @@ from oslo_config import cfg from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging._i18n import _LI +from oslo_messaging.transport import TransportURL LOG = logging.getLogger(__name__) @@ -29,7 +30,7 @@ def main(): opt_group = cfg.OptGroup(name='zmq_proxy_opts', title='ZeroMQ proxy options') conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) - zmq_options.register_opts(conf) + zmq_options.register_opts(conf, TransportURL.parse(conf)) zmq_proxy.parse_command_line_args(conf) reactor = zmq_proxy.ZmqProxy(conf) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 42575fcd9..6759b34d8 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -92,9 +92,17 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): self.receiver.unregister_socket(socket) def send_request(self, socket, request): + if hasattr(request, 'timeout'): + _stop = tenacity.stop_after_delay(request.timeout) + elif request.retry is not None and request.retry > 0: + # no rpc_response_timeout option if notification + _stop = tenacity.stop_after_attempt(request.retry) + else: + # well, now what? + _stop = tenacity.stop_after_delay(60) + @tenacity.retry(retry=tenacity.retry_if_exception_type(zmq.Again), - stop=tenacity.stop_after_delay( - self.conf.rpc_response_timeout)) + stop=_stop) def send_retrying(): if request.msg_type in zmq_names.MULTISEND_TYPES: for _ in range(socket.connections_count()):