diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 50d448856..bebe568a2 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -12,14 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. -import time - from oslo_config import cfg from oslo_log import log as logging +from oslo_utils import timeutils import pika_pool import retrying from oslo_messaging._drivers import base +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr @@ -158,7 +158,8 @@ class PikaDriver(base.BaseDriver): def require_features(self, requeue=False): pass - def _declare_rpc_exchange(self, exchange, timeout): + def _declare_rpc_exchange(self, exchange, stopwatch): + timeout = stopwatch.leftover(return_none=True) with (self._pika_engine.connection_without_confirmation_pool .acquire(timeout=timeout)) as conn: try: @@ -177,82 +178,75 @@ class PikaDriver(base.BaseDriver): def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): - expiration_time = None if timeout is None else time.time() + timeout + with timeutils.StopWatch(duration=timeout) as stopwatch: + if retry is None: + retry = self._pika_engine.default_rpc_retry_attempts - if retry is None: - retry = self._pika_engine.default_rpc_retry_attempts + exchange = self._pika_engine.get_rpc_exchange_name( + target.exchange + ) - exchange = self._pika_engine.get_rpc_exchange_name( - target.exchange - ) + def on_exception(ex): + if isinstance(ex, pika_drv_exc.ExchangeNotFoundException): + # it is desired to create exchange because if we sent to + # exchange which is not exists, we get ChannelClosed + # exception and need to reconnect + try: + self._declare_rpc_exchange(exchange, stopwatch) + except pika_drv_exc.ConnectionException as e: + LOG.warning("Problem during declaring exchange. %s", e) + return True + elif isinstance(ex, (pika_drv_exc.ConnectionException, + exceptions.MessageDeliveryFailure)): + LOG.warning("Problem during message sending. %s", ex) + return True + else: + return False - def on_exception(ex): - if isinstance(ex, pika_drv_exc.ExchangeNotFoundException): - # it is desired to create exchange because if we sent to - # exchange which is not exists, we get ChannelClosed exception - # and need to reconnect + retrier = ( + None if retry == 0 else + retrying.retry( + stop_max_attempt_number=(None if retry == -1 else retry), + retry_on_exception=on_exception, + wait_fixed=self._pika_engine.rpc_retry_delay * 1000, + ) + ) + + if target.fanout: + return self.cast_all_workers( + exchange, target.topic, ctxt, message, stopwatch, retrier + ) + + routing_key = self._pika_engine.get_rpc_queue_name( + target.topic, target.server, retrier is None + ) + + msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, + message, ctxt) + try: + reply = msg.send( + exchange=exchange, + routing_key=routing_key, + reply_listener=( + self._reply_listener if wait_for_reply else None + ), + stopwatch=stopwatch, + retrier=retrier + ) + except pika_drv_exc.ExchangeNotFoundException as ex: try: - self._declare_rpc_exchange( - exchange, - None if expiration_time is None else - expiration_time - time.time() - ) + self._declare_rpc_exchange(exchange, stopwatch) except pika_drv_exc.ConnectionException as e: LOG.warning("Problem during declaring exchange. %s", e) - return True - elif isinstance(ex, (pika_drv_exc.ConnectionException, - exceptions.MessageDeliveryFailure)): - LOG.warning("Problem during message sending. %s", ex) - return True - else: - return False + raise ex - retrier = ( - None if retry == 0 else - retrying.retry( - stop_max_attempt_number=(None if retry == -1 else retry), - retry_on_exception=on_exception, - wait_fixed=self._pika_engine.rpc_retry_delay * 1000, - ) - ) + if reply is not None: + if reply.failure is not None: + raise reply.failure - if target.fanout: - return self.cast_all_workers( - exchange, target.topic, ctxt, message, expiration_time, - retrier - ) + return reply.result - routing_key = self._pika_engine.get_rpc_queue_name( - target.topic, target.server, retrier is None - ) - - msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message, - ctxt) - try: - reply = msg.send( - exchange=exchange, - routing_key=routing_key, - reply_listener=( - self._reply_listener if wait_for_reply else None - ), - expiration_time=expiration_time, - retrier=retrier - ) - except pika_drv_exc.ExchangeNotFoundException as ex: - try: - self._declare_rpc_exchange(exchange, - expiration_time - time.time()) - except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %s", e) - raise ex - - if reply is not None: - if reply.failure is not None: - raise reply.failure - - return reply.result - - def cast_all_workers(self, exchange, topic, ctxt, message, expiration_time, + def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch, retrier=None): msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, ctxt) @@ -263,23 +257,23 @@ class PikaDriver(base.BaseDriver): topic, "all_workers", retrier is None ), mandatory=False, - expiration_time=expiration_time, + stopwatch=stopwatch, retrier=retrier ) except pika_drv_exc.ExchangeNotFoundException: try: - self._declare_rpc_exchange( - exchange, expiration_time - time.time() - ) + self._declare_rpc_exchange(exchange, stopwatch) except pika_drv_exc.ConnectionException as e: LOG.warning("Problem during declaring exchange. %s", e) - def _declare_notification_queue_binding(self, target, timeout=None): - if timeout is not None and timeout < 0: + def _declare_notification_queue_binding( + self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH): + if stopwatch.expired(): raise exceptions.MessagingTimeout( "Timeout for current operation was expired." ) try: + timeout = stopwatch.leftover(return_none=True) with (self._pika_engine.connection_without_confirmation_pool .acquire)(timeout=timeout) as conn: self._pika_engine.declare_queue_binding_by_channel( diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py index df004a8f9..f032a334d 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_commons.py +++ b/oslo_messaging/_drivers/pika_driver/pika_commons.py @@ -16,6 +16,7 @@ import select import socket import sys +from oslo_utils import timeutils from pika import exceptions as pika_exceptions import six @@ -30,6 +31,8 @@ PIKA_CONNECTIVITY_ERRORS = ( EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' +INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start() + def is_eventlet_monkey_patched(module): """Determines safely is eventlet patching for module enabled or not diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py index a2dd40d35..1739c7932 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -42,14 +42,12 @@ class RpcReplyPikaListener(object): self._poller_thread = None self._shutdown = False - def get_reply_qname(self, expiration_time=None): + def get_reply_qname(self): """As result return reply queue name, shared for whole process, but before this check is RPC listener initialized or not and perform initialization if needed - :param expiration_time: Float, expiration time in seconds - (like time.time()), - :return: String, queue name which should be used for reply sending + :return: String, queue name which hould be used for reply sending """ if self._reply_consumer_initialized: return self._reply_queue diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 1f687ff68..3ca4b0104 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -22,6 +22,7 @@ from concurrent import futures from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import importutils +from oslo_utils import timeutils from pika import exceptions as pika_exceptions from pika import spec as pika_spec import pika_pool @@ -31,6 +32,7 @@ import six import oslo_messaging from oslo_messaging._drivers import base +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging import _utils as utils from oslo_messaging import exceptions @@ -213,11 +215,14 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): ) if self._pika_engine.rpc_reply_retry_attempts else None try: - reply_outgoing_message.send( - reply_q=self.reply_q, - expiration_time=self.expiration_time, - retrier=retrier - ) + timeout = (None if self.expiration_time is None else + max(self.expiration_time - time.time(), 0)) + with timeutils.StopWatch(duration=timeout) as stopwatch: + reply_outgoing_message.send( + reply_q=self.reply_q, + stopwatch=stopwatch, + retrier=retrier + ) LOG.debug( "Message [id:'%s'] replied to '%s'.", self.msg_id, self.reply_q ) @@ -346,7 +351,7 @@ class PikaOutgoingMessage(object): @staticmethod def _publish(pool, exchange, routing_key, body, properties, mandatory, - expiration_time): + stopwatch): """Execute pika publish method using connection from connection pool Also this message catches all pika related exceptions and raise oslo.messaging specific exceptions @@ -358,16 +363,15 @@ class PikaOutgoingMessage(object): :param properties: Properties, RabbitMQ message properties :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise exception if it is not possible to deliver message to any queue) - :param expiration_time: Float, expiration time in seconds - (like time.time()) + :param stopwatch: StopWatch, stopwatch object for calculating + allowed timeouts """ - timeout = (None if expiration_time is None else - expiration_time - time.time()) - if timeout is not None and timeout < 0: + if stopwatch.expired(): raise exceptions.MessagingTimeout( "Timeout for current operation was expired." ) try: + timeout = stopwatch.leftover(return_none=True) with pool.acquire(timeout=timeout) as conn: if timeout is not None: properties.expiration = str(int(timeout * 1000)) @@ -422,7 +426,7 @@ class PikaOutgoingMessage(object): def _do_send(self, exchange, routing_key, msg_dict, msg_props, confirm=True, mandatory=True, persistent=False, - expiration_time=None, retrier=None): + stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None): """Send prepared message with configured retrying :param exchange: String, RabbitMQ exchange name for message sending @@ -434,8 +438,8 @@ class PikaOutgoingMessage(object): exception if it is not possible to deliver message to any queue) :param persistent: Boolean, send persistent message if True, works only for routing into durable queues - :param expiration_time: Float, expiration time in seconds - (like time.time()) + :param stopwatch: StopWatch, stopwatch object for calculating + allowed timeouts :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ @@ -458,10 +462,11 @@ class PikaOutgoingMessage(object): retrier(self._publish)) return publish(pool, exchange, routing_key, body, msg_props, - mandatory, expiration_time) + mandatory, stopwatch) def send(self, exchange, routing_key='', confirm=True, mandatory=True, - persistent=False, expiration_time=None, retrier=None): + persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, + retrier=None): """Send message with configured retrying :param exchange: String, RabbitMQ exchange name for message sending @@ -471,16 +476,16 @@ class PikaOutgoingMessage(object): exception if it is not possible to deliver message to any queue) :param persistent: Boolean, send persistent message if True, works only for routing into durable queues - :param expiration_time: Float, expiration time in seconds - (like time.time()) + :param stopwatch: StopWatch, stopwatch object for calculating + allowed timeouts :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ msg_dict, msg_props = self._prepare_message_to_send() return self._do_send(exchange, routing_key, msg_dict, msg_props, - confirm, mandatory, persistent, expiration_time, - retrier) + confirm, mandatory, persistent, + stopwatch, retrier) class RpcPikaOutgoingMessage(PikaOutgoingMessage): @@ -496,15 +501,15 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): self.reply_q = None def send(self, exchange, routing_key, reply_listener=None, - expiration_time=None, retrier=None): + stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None): """Send RPC message with configured retrying :param exchange: String, RabbitMQ exchange name for message sending :param routing_key: String, RabbitMQ routing key for message routing :param reply_listener: RpcReplyPikaListener, listener for waiting reply. If None - return immediately without reply waiting - :param expiration_time: Float, expiration time in seconds - (like time.time()) + :param stopwatch: StopWatch, stopwatch object for calculating + allowed timeouts :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ @@ -515,9 +520,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): msg_props.correlation_id = self.msg_id LOG.debug('MSG_ID is %s', self.msg_id) - self.reply_q = reply_listener.get_reply_qname( - expiration_time - time.time() - ) + self.reply_q = reply_listener.get_reply_qname() msg_props.reply_to = self.reply_q future = reply_listener.register_reply_waiter(msg_id=self.msg_id) @@ -525,12 +528,11 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): self._do_send( exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, msg_props=msg_props, confirm=True, mandatory=True, - persistent=False, expiration_time=expiration_time, - retrier=retrier + persistent=False, stopwatch=stopwatch, retrier=retrier ) try: - return future.result(expiration_time - time.time()) + return future.result(stopwatch.leftover(return_none=True)) except BaseException as e: reply_listener.unregister_reply_waiter(self.msg_id) if isinstance(e, futures.TimeoutError): @@ -540,8 +542,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): self._do_send( exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, msg_props=msg_props, confirm=True, mandatory=True, - persistent=False, expiration_time=expiration_time, - retrier=retrier + persistent=False, stopwatch=stopwatch, retrier=retrier ) @@ -592,12 +593,13 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): pika_engine, msg, None, content_type, content_encoding ) - def send(self, reply_q, expiration_time=None, retrier=None): + def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, + retrier=None): """Send RPC message with configured retrying :param reply_q: String, queue name for sending reply - :param expiration_time: Float, expiration time in seconds - (like time.time()) + :param stopwatch: StopWatch, stopwatch object for calculating + allowed timeouts :param retrier: retrying.Retrier, configured retrier object for sending message, if None no retrying is performed """ @@ -608,6 +610,6 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): self._do_send( exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q, msg_dict=msg_dict, msg_props=msg_props, confirm=True, - mandatory=True, persistent=False, expiration_time=expiration_time, + mandatory=True, persistent=False, stopwatch=stopwatch, retrier=retrier ) diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index 7cddb082c..aece3ecbf 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -12,13 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. import functools -import time import unittest from concurrent import futures from mock import mock from mock import patch from oslo_serialization import jsonutils +from oslo_utils import timeutils import pika import oslo_messaging @@ -200,7 +200,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): content_encoding='utf-8', content_type='application/json' ) outgoing_message_mock().send.assert_called_once_with( - expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY ) retry_mock.assert_called_once_with( retry_on_exception=mock.ANY, stop_max_attempt_number=3, @@ -240,7 +240,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): content_type='application/json' ) outgoing_message_mock().send.assert_called_once_with( - expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY ) retry_mock.assert_called_once_with( retry_on_exception=mock.ANY, stop_max_attempt_number=3, @@ -314,7 +314,9 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): self._exchange = "it is exchange" self._routing_key = "it is routing key" self._expiration = 1 - self._expiration_time = time.time() + self._expiration + self._stopwatch = ( + timeutils.StopWatch(duration=self._expiration).start() + ) self._mandatory = object() self._message = {"msg_type": 1, "msg_str": "hello"} @@ -333,7 +335,7 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): confirm=True, mandatory=self._mandatory, persistent=True, - expiration_time=self._expiration_time, + stopwatch=self._stopwatch, retrier=None ) @@ -378,7 +380,7 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): confirm=False, mandatory=self._mandatory, persistent=False, - expiration_time=self._expiration_time, + stopwatch=self._stopwatch, retrier=None ) @@ -431,13 +433,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): ) expiration = 1 - expiration_time = time.time() + expiration + stopwatch = timeutils.StopWatch(duration=expiration).start() message.send( exchange=self._exchange, routing_key=self._routing_key, reply_listener=None, - expiration_time=expiration_time, + stopwatch=stopwatch, retrier=None ) @@ -478,7 +480,7 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): ) expiration = 1 - expiration_time = time.time() + expiration + stopwatch = timeutils.StopWatch(duration=expiration).start() result = "it is a result" reply_queue_name = "reply_queue_name" @@ -493,7 +495,7 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): exchange=self._exchange, routing_key=self._routing_key, reply_listener=reply_listener, - expiration_time=expiration_time, + stopwatch=stopwatch, retrier=None ) @@ -534,7 +536,9 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): self._reply_q = "reply_queue_name" self._expiration = 1 - self._expiration_time = time.time() + self._expiration + self._stopwatch = ( + timeutils.StopWatch(duration=self._expiration).start() + ) self._pika_engine = mock.MagicMock() @@ -550,8 +554,7 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): self._pika_engine, self._msg_id, reply="all_fine" ) - message.send(self._reply_q, expiration_time=self._expiration_time, - retrier=None) + message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None) self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.assert_called_once_with( @@ -586,8 +589,7 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): self._pika_engine, self._msg_id, failure_info=failure_info ) - message.send(self._reply_q, expiration_time=self._expiration_time, - retrier=None) + message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None) self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.assert_called_once_with(