Replace expriration_time by timer
This patch refactors some classes and methods of the pika driver by replacing manually calculated expiration times with specialized timer/stopwatch objects (oslo_utils.timeutils.StopWatch). Change-Id: I28f83797eafbfa612bac748796e3e39742eb27ba
This commit is contained in:
parent
d4e8ac42b5
commit
fbc5df6645
@ -12,14 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
import pika_pool
|
||||
import retrying
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
|
||||
@ -158,7 +158,8 @@ class PikaDriver(base.BaseDriver):
|
||||
def require_features(self, requeue=False):
|
||||
pass
|
||||
|
||||
def _declare_rpc_exchange(self, exchange, timeout):
|
||||
def _declare_rpc_exchange(self, exchange, stopwatch):
|
||||
timeout = stopwatch.leftover(return_none=True)
|
||||
with (self._pika_engine.connection_without_confirmation_pool
|
||||
.acquire(timeout=timeout)) as conn:
|
||||
try:
|
||||
@ -177,8 +178,7 @@ class PikaDriver(base.BaseDriver):
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
expiration_time = None if timeout is None else time.time() + timeout
|
||||
|
||||
with timeutils.StopWatch(duration=timeout) as stopwatch:
|
||||
if retry is None:
|
||||
retry = self._pika_engine.default_rpc_retry_attempts
|
||||
|
||||
@ -189,14 +189,10 @@ class PikaDriver(base.BaseDriver):
|
||||
def on_exception(ex):
|
||||
if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
|
||||
# it is desired to create exchange because if we sent to
|
||||
# exchange which is not exists, we get ChannelClosed exception
|
||||
# and need to reconnect
|
||||
# exchange which is not exists, we get ChannelClosed
|
||||
# exception and need to reconnect
|
||||
try:
|
||||
self._declare_rpc_exchange(
|
||||
exchange,
|
||||
None if expiration_time is None else
|
||||
expiration_time - time.time()
|
||||
)
|
||||
self._declare_rpc_exchange(exchange, stopwatch)
|
||||
except pika_drv_exc.ConnectionException as e:
|
||||
LOG.warning("Problem during declaring exchange. %s", e)
|
||||
return True
|
||||
@ -218,16 +214,15 @@ class PikaDriver(base.BaseDriver):
|
||||
|
||||
if target.fanout:
|
||||
return self.cast_all_workers(
|
||||
exchange, target.topic, ctxt, message, expiration_time,
|
||||
retrier
|
||||
exchange, target.topic, ctxt, message, stopwatch, retrier
|
||||
)
|
||||
|
||||
routing_key = self._pika_engine.get_rpc_queue_name(
|
||||
target.topic, target.server, retrier is None
|
||||
)
|
||||
|
||||
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message,
|
||||
ctxt)
|
||||
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine,
|
||||
message, ctxt)
|
||||
try:
|
||||
reply = msg.send(
|
||||
exchange=exchange,
|
||||
@ -235,13 +230,12 @@ class PikaDriver(base.BaseDriver):
|
||||
reply_listener=(
|
||||
self._reply_listener if wait_for_reply else None
|
||||
),
|
||||
expiration_time=expiration_time,
|
||||
stopwatch=stopwatch,
|
||||
retrier=retrier
|
||||
)
|
||||
except pika_drv_exc.ExchangeNotFoundException as ex:
|
||||
try:
|
||||
self._declare_rpc_exchange(exchange,
|
||||
expiration_time - time.time())
|
||||
self._declare_rpc_exchange(exchange, stopwatch)
|
||||
except pika_drv_exc.ConnectionException as e:
|
||||
LOG.warning("Problem during declaring exchange. %s", e)
|
||||
raise ex
|
||||
@ -252,7 +246,7 @@ class PikaDriver(base.BaseDriver):
|
||||
|
||||
return reply.result
|
||||
|
||||
def cast_all_workers(self, exchange, topic, ctxt, message, expiration_time,
|
||||
def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch,
|
||||
retrier=None):
|
||||
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
||||
ctxt)
|
||||
@ -263,23 +257,23 @@ class PikaDriver(base.BaseDriver):
|
||||
topic, "all_workers", retrier is None
|
||||
),
|
||||
mandatory=False,
|
||||
expiration_time=expiration_time,
|
||||
stopwatch=stopwatch,
|
||||
retrier=retrier
|
||||
)
|
||||
except pika_drv_exc.ExchangeNotFoundException:
|
||||
try:
|
||||
self._declare_rpc_exchange(
|
||||
exchange, expiration_time - time.time()
|
||||
)
|
||||
self._declare_rpc_exchange(exchange, stopwatch)
|
||||
except pika_drv_exc.ConnectionException as e:
|
||||
LOG.warning("Problem during declaring exchange. %s", e)
|
||||
|
||||
def _declare_notification_queue_binding(self, target, timeout=None):
|
||||
if timeout is not None and timeout < 0:
|
||||
def _declare_notification_queue_binding(
|
||||
self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH):
|
||||
if stopwatch.expired():
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired."
|
||||
)
|
||||
try:
|
||||
timeout = stopwatch.leftover(return_none=True)
|
||||
with (self._pika_engine.connection_without_confirmation_pool
|
||||
.acquire)(timeout=timeout) as conn:
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
|
@ -16,6 +16,7 @@ import select
|
||||
import socket
|
||||
import sys
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from pika import exceptions as pika_exceptions
|
||||
import six
|
||||
|
||||
@ -30,6 +31,8 @@ PIKA_CONNECTIVITY_ERRORS = (
|
||||
|
||||
EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
|
||||
|
||||
INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
|
||||
|
||||
|
||||
def is_eventlet_monkey_patched(module):
|
||||
"""Determines safely is eventlet patching for module enabled or not
|
||||
|
@ -42,14 +42,12 @@ class RpcReplyPikaListener(object):
|
||||
self._poller_thread = None
|
||||
self._shutdown = False
|
||||
|
||||
def get_reply_qname(self, expiration_time=None):
|
||||
def get_reply_qname(self):
|
||||
"""As result return reply queue name, shared for whole process,
|
||||
but before this check is RPC listener initialized or not and perform
|
||||
initialization if needed
|
||||
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time()),
|
||||
:return: String, queue name which should be used for reply sending
|
||||
:return: String, queue name which hould be used for reply sending
|
||||
"""
|
||||
if self._reply_consumer_initialized:
|
||||
return self._reply_queue
|
||||
|
@ -22,6 +22,7 @@ from concurrent import futures
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
from pika import exceptions as pika_exceptions
|
||||
from pika import spec as pika_spec
|
||||
import pika_pool
|
||||
@ -31,6 +32,7 @@ import six
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import exceptions
|
||||
@ -213,9 +215,12 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
|
||||
) if self._pika_engine.rpc_reply_retry_attempts else None
|
||||
|
||||
try:
|
||||
timeout = (None if self.expiration_time is None else
|
||||
max(self.expiration_time - time.time(), 0))
|
||||
with timeutils.StopWatch(duration=timeout) as stopwatch:
|
||||
reply_outgoing_message.send(
|
||||
reply_q=self.reply_q,
|
||||
expiration_time=self.expiration_time,
|
||||
stopwatch=stopwatch,
|
||||
retrier=retrier
|
||||
)
|
||||
LOG.debug(
|
||||
@ -346,7 +351,7 @@ class PikaOutgoingMessage(object):
|
||||
|
||||
@staticmethod
|
||||
def _publish(pool, exchange, routing_key, body, properties, mandatory,
|
||||
expiration_time):
|
||||
stopwatch):
|
||||
"""Execute pika publish method using connection from connection pool
|
||||
Also this message catches all pika related exceptions and raise
|
||||
oslo.messaging specific exceptions
|
||||
@ -358,16 +363,15 @@ class PikaOutgoingMessage(object):
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
"""
|
||||
timeout = (None if expiration_time is None else
|
||||
expiration_time - time.time())
|
||||
if timeout is not None and timeout < 0:
|
||||
if stopwatch.expired():
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired."
|
||||
)
|
||||
try:
|
||||
timeout = stopwatch.leftover(return_none=True)
|
||||
with pool.acquire(timeout=timeout) as conn:
|
||||
if timeout is not None:
|
||||
properties.expiration = str(int(timeout * 1000))
|
||||
@ -422,7 +426,7 @@ class PikaOutgoingMessage(object):
|
||||
|
||||
def _do_send(self, exchange, routing_key, msg_dict, msg_props,
|
||||
confirm=True, mandatory=True, persistent=False,
|
||||
expiration_time=None, retrier=None):
|
||||
stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
|
||||
"""Send prepared message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
@ -434,8 +438,8 @@ class PikaOutgoingMessage(object):
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
@ -458,10 +462,11 @@ class PikaOutgoingMessage(object):
|
||||
retrier(self._publish))
|
||||
|
||||
return publish(pool, exchange, routing_key, body, msg_props,
|
||||
mandatory, expiration_time)
|
||||
mandatory, stopwatch)
|
||||
|
||||
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=None, retrier=None):
|
||||
persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
|
||||
retrier=None):
|
||||
"""Send message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
@ -471,16 +476,16 @@ class PikaOutgoingMessage(object):
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
return self._do_send(exchange, routing_key, msg_dict, msg_props,
|
||||
confirm, mandatory, persistent, expiration_time,
|
||||
retrier)
|
||||
confirm, mandatory, persistent,
|
||||
stopwatch, retrier)
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
@ -496,15 +501,15 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
self.reply_q = None
|
||||
|
||||
def send(self, exchange, routing_key, reply_listener=None,
|
||||
expiration_time=None, retrier=None):
|
||||
stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param reply_listener: RpcReplyPikaListener, listener for waiting
|
||||
reply. If None - return immediately without reply waiting
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
@ -515,9 +520,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
msg_props.correlation_id = self.msg_id
|
||||
LOG.debug('MSG_ID is %s', self.msg_id)
|
||||
|
||||
self.reply_q = reply_listener.get_reply_qname(
|
||||
expiration_time - time.time()
|
||||
)
|
||||
self.reply_q = reply_listener.get_reply_qname()
|
||||
msg_props.reply_to = self.reply_q
|
||||
|
||||
future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
|
||||
@ -525,12 +528,11 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
persistent=False, stopwatch=stopwatch, retrier=retrier
|
||||
)
|
||||
|
||||
try:
|
||||
return future.result(expiration_time - time.time())
|
||||
return future.result(stopwatch.leftover(return_none=True))
|
||||
except BaseException as e:
|
||||
reply_listener.unregister_reply_waiter(self.msg_id)
|
||||
if isinstance(e, futures.TimeoutError):
|
||||
@ -540,8 +542,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
persistent=False, stopwatch=stopwatch, retrier=retrier
|
||||
)
|
||||
|
||||
|
||||
@ -592,12 +593,13 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
pika_engine, msg, None, content_type, content_encoding
|
||||
)
|
||||
|
||||
def send(self, reply_q, expiration_time=None, retrier=None):
|
||||
def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
|
||||
retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param reply_q: String, queue name for sending reply
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param stopwatch: StopWatch, stopwatch object for calculating
|
||||
allowed timeouts
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
@ -608,6 +610,6 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
self._do_send(
|
||||
exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
|
||||
msg_dict=msg_dict, msg_props=msg_props, confirm=True,
|
||||
mandatory=True, persistent=False, expiration_time=expiration_time,
|
||||
mandatory=True, persistent=False, stopwatch=stopwatch,
|
||||
retrier=retrier
|
||||
)
|
||||
|
@ -12,13 +12,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import functools
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from concurrent import futures
|
||||
from mock import mock
|
||||
from mock import patch
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
import pika
|
||||
|
||||
import oslo_messaging
|
||||
@ -200,7 +200,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
content_encoding='utf-8', content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
@ -240,7 +240,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
@ -314,7 +314,9 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
self._exchange = "it is exchange"
|
||||
self._routing_key = "it is routing key"
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
self._stopwatch = (
|
||||
timeutils.StopWatch(duration=self._expiration).start()
|
||||
)
|
||||
self._mandatory = object()
|
||||
|
||||
self._message = {"msg_type": 1, "msg_str": "hello"}
|
||||
@ -333,7 +335,7 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
confirm=True,
|
||||
mandatory=self._mandatory,
|
||||
persistent=True,
|
||||
expiration_time=self._expiration_time,
|
||||
stopwatch=self._stopwatch,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
@ -378,7 +380,7 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
confirm=False,
|
||||
mandatory=self._mandatory,
|
||||
persistent=False,
|
||||
expiration_time=self._expiration_time,
|
||||
stopwatch=self._stopwatch,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
@ -431,13 +433,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
stopwatch = timeutils.StopWatch(duration=expiration).start()
|
||||
|
||||
message.send(
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
reply_listener=None,
|
||||
expiration_time=expiration_time,
|
||||
stopwatch=stopwatch,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
@ -478,7 +480,7 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
stopwatch = timeutils.StopWatch(duration=expiration).start()
|
||||
|
||||
result = "it is a result"
|
||||
reply_queue_name = "reply_queue_name"
|
||||
@ -493,7 +495,7 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
reply_listener=reply_listener,
|
||||
expiration_time=expiration_time,
|
||||
stopwatch=stopwatch,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
@ -534,7 +536,9 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
self._reply_q = "reply_queue_name"
|
||||
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
self._stopwatch = (
|
||||
timeutils.StopWatch(duration=self._expiration).start()
|
||||
)
|
||||
|
||||
self._pika_engine = mock.MagicMock()
|
||||
|
||||
@ -550,8 +554,7 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
self._pika_engine, self._msg_id, reply="all_fine"
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
@ -586,8 +589,7 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
self._pika_engine, self._msg_id, failure_info=failure_info
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
|
Loading…
Reference in New Issue
Block a user