diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 3da6ce8b6..9639a473f 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -66,6 +66,9 @@ class AMQPIncomingMessage(base.IncomingMessage): def acknowledge(self): self.message.acknowledge() + def requeue(self): + self.message.requeue() + class AMQPListener(base.Listener): diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 65b3a758c..752be6d2b 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -41,6 +41,10 @@ class IncomingMessage(object): def acknowledge(self): "Acknowledge the message." + @abc.abstractmethod + def requeue(self): + "Requeue the message." + @six.add_metaclass(abc.ABCMeta) class Listener(object): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 0c5449199..ab39b8249 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -26,8 +26,12 @@ from oslo.messaging._drivers import base class FakeIncomingMessage(base.IncomingMessage): - def __init__(self, listener, ctxt, message, reply_q): + def __init__(self, listener, ctxt, message, topic, server, fanout, + reply_q): super(FakeIncomingMessage, self).__init__(listener, ctxt, message) + self._topic = topic + self._server = server + self._fanout = fanout self._reply_q = reply_q def reply(self, reply=None, failure=None, log_failure=True): @@ -39,6 +43,12 @@ class FakeIncomingMessage(base.IncomingMessage): def acknowledge(): pass + def requeue(self): + self.listener._exchange.deliver_message( + self._topic, self.ctxt, self.message, + server=self._server, fanout=self._fanout, + reply_q=self._reply_q) + class FakeListener(base.Listener): @@ -50,9 +60,12 @@ class FakeListener(base.Listener): def poll(self): while True: for target in self._targets: - (ctxt, message, reply_q) = self._exchange.poll(target) + (ctxt, message, server, fanout, reply_q) = \ + self._exchange.poll(target) if message is not None: - message = FakeIncomingMessage(self, ctxt, message, reply_q) + message = FakeIncomingMessage(self, ctxt, message, + target.topic, + server, fanout, reply_q) message.acknowledge() return message time.sleep(.05) @@ -83,7 +96,7 @@ class FakeExchange(object): else: queues = [self._get_topic_queue(topic)] for queue in queues: - queue.append((ctxt, message, reply_q)) + queue.append((ctxt, message, server, fanout, reply_q)) def poll(self, target): with self._queues_lock: @@ -91,7 +104,7 @@ class FakeExchange(object): queue = self._get_server_queue(target.topic, target.server) else: queue = self._get_topic_queue(target.topic) - return queue.pop(0) if queue else (None, None, None) + return queue.pop(0) if queue else (None, None, None, None, None) class FakeDriver(base.BaseDriver): diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 0fc869df9..fc4d98904 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -101,6 +101,11 @@ class QpidMessage(dict): def acknowledge(self): self._session.acknowledge(self._raw_message) + @staticmethod + def requeue(): + raise NotImplementedError('The QPID driver does not yet support ' + 'requeue messages') + class ConsumerBase(object): """Consumer base class.""" diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 804d7dd13..7ddee3b53 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -127,6 +127,9 @@ class RabbitMessage(dict): def acknowledge(self): self._raw_message.ack() + def requeue(self): + self._raw_message.requeue() + class ConsumerBase(object): """Consumer base class.""" diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index a12b55619..25e56b5da 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -847,6 +847,10 @@ class ZmqIncomingMessage(base.IncomingMessage): def acknowledge(): pass + def requeue(self): + raise NotImplementedError('The ZeroMQ driver does not yet support ' + 'requeuing message') + class ZmqListener(base.Listener): @@ -964,6 +968,8 @@ class ZmqDriver(base.BaseDriver): return listener def listen_for_notifications(self, targets_and_priorities): + # NOTE(sileht): this listener implementation is limited + # because zeromq doesn't support requeing message conn = create_connection(self.conf) listener = ZmqListener(self, None) diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py index bc827b321..9cb6bc681 100644 --- a/oslo/messaging/_executors/base.py +++ b/oslo/messaging/_executors/base.py @@ -34,6 +34,9 @@ class ExecutorBase(object): def _dispatch(self, incoming): try: self.callback(incoming.ctxt, dict(incoming.message)) + except messaging.RequeueMessageException: + _LOG.debug('Requeue exception during message handling') + raise except Exception: # sys.exc_info() is deleted by LOG.exception(). exc_info = sys.exc_info() diff --git a/oslo/messaging/_executors/impl_blocking.py b/oslo/messaging/_executors/impl_blocking.py index f1dfd53ce..91b38ad22 100644 --- a/oslo/messaging/_executors/impl_blocking.py +++ b/oslo/messaging/_executors/impl_blocking.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging from oslo.messaging._executors import base @@ -41,8 +42,12 @@ class BlockingExecutor(base.ExecutorBase): incoming.acknowledge() self._dispatch_and_reply(incoming) else: - self._dispatch(incoming) - incoming.acknowledge() + try: + self._dispatch(incoming) + except messaging.RequeueMessageException: + incoming.requeue() + else: + incoming.acknowledge() def stop(self): self._running = False diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py index 6fda62c6e..ca963bc6a 100644 --- a/oslo/messaging/_executors/impl_eventlet.py +++ b/oslo/messaging/_executors/impl_eventlet.py @@ -19,6 +19,7 @@ import greenlet from oslo.config import cfg +from oslo import messaging from oslo.messaging._executors import base from oslo.messaging.openstack.common import excutils @@ -67,9 +68,13 @@ class EventletExecutor(base.ExecutorBase): except greenlet.GreenletExit: return - def _acknowledgement_callback(self, greenthread, incoming): - greenthread.wait() - incoming.acknowledge() + def _acknowledgement_callback(self, thread, incoming): + try: + thread.wait() + except messaging.RequeueMessageException: + incoming.requeue() + else: + incoming.acknowledge() def stop(self): if self._thread is None: diff --git a/oslo/messaging/notify/__init__.py b/oslo/messaging/notify/__init__.py index 4b87d72c3..b2d7216a8 100644 --- a/oslo/messaging/notify/__init__.py +++ b/oslo/messaging/notify/__init__.py @@ -15,7 +15,8 @@ __all__ = ['Notifier', 'LoggingNotificationHandler', - 'get_notification_listener'] + 'get_notification_listener', + 'RequeueMessageException'] from .notifier import * from .listener import * diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py index f7384c148..85bcb498e 100644 --- a/oslo/messaging/notify/listener.py +++ b/oslo/messaging/notify/listener.py @@ -81,6 +81,14 @@ from oslo.messaging.notify import dispatcher as notify_dispatcher from oslo.messaging import server as msg_server +class RequeueMessageException(Exception): + """Encapsulates an Requeue exception + + Merely instantiating this exception will ask to the executor to + requeue the message + """ + + def get_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None): """Construct a notification listener diff --git a/tests/test_executor.py b/tests/test_executor.py index 5970ad334..1e945c96e 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -19,6 +19,7 @@ import threading import mock import testscenarios +from oslo import messaging from oslo.messaging._executors import impl_blocking from oslo.messaging._executors import impl_eventlet from tests import utils as test_utils @@ -29,8 +30,12 @@ load_tests = testscenarios.load_tests_apply_scenarios class TestExecutor(test_utils.BaseTestCase): _scenarios = [ - ('rpc', dict(sender_expects_reply=True)), - ('notify', dict(sender_expects_reply=False)) + ('rpc', dict(sender_expects_reply=True, + msg_action='acknowledge')), + ('notify_ack', dict(sender_expects_reply=False, + msg_action='acknowledge')), + ('notify_requeue', dict(sender_expects_reply=False, + msg_action='requeue')), ] _impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor, @@ -55,8 +60,12 @@ class TestExecutor(test_utils.BaseTestCase): def test_executor_dispatch(self): callback = mock.MagicMock(sender_expects_reply= - self.sender_expects_reply, - return_value='result') + self.sender_expects_reply) + if self.msg_action == 'acknowledge': + callback.return_value = 'result' + elif self.msg_action == 'requeue': + callback.side_effect = messaging.RequeueMessageException() + listener = mock.Mock(spec=['poll']) executor = self.executor(self.conf, listener, callback) @@ -76,7 +85,8 @@ class TestExecutor(test_utils.BaseTestCase): self._run_in_thread(executor) - incoming_message.acknowledge.assert_called_once_with() + msg_action_method = getattr(incoming_message, self.msg_action) + msg_action_method.assert_called_once_with() callback.assert_called_once_with({}, {'payload': 'data'}) if self.sender_expects_reply: incoming_message.reply.assert_called_once_with('result') diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py index e1c1f9abb..3a569156a 100644 --- a/tests/test_notify_dispatcher.py +++ b/tests/test_notify_dispatcher.py @@ -41,24 +41,39 @@ class TestDispatcher(test_utils.BaseTestCase): ('no_endpoints', dict(endpoints=[], endpoints_expect_calls=[], - priority='info')), + priority='info', + success=True, ex=None)), ('one_endpoints', dict(endpoints=[['warn']], endpoints_expect_calls=['warn'], - priority='warn')), + priority='warn', + success=True, ex=None)), ('two_endpoints_only_one_match', dict(endpoints=[['warn'], ['info']], endpoints_expect_calls=[None, 'info'], - priority='info')), + priority='info', + success=True, ex=None)), ('two_endpoints_both_match', dict(endpoints=[['debug', 'info'], ['info', 'debug']], endpoints_expect_calls=['debug', 'debug'], - priority='debug')), + priority='debug', + success=True, ex=None)), + ('requeue_exception', + dict(endpoints=[['debug', 'warn']], + endpoints_expect_calls=['debug'], + priority='debug', msg=notification_msg, + success=False, ex=messaging.RequeueMessageException)), ] def test_dispatcher(self): - endpoints = [mock.Mock(spec=endpoint_methods) - for endpoint_methods in self.endpoints] + endpoints = [] + for endpoint_methods in self.endpoints: + e = mock.Mock(spec=endpoint_methods) + endpoints.append(e) + if self.ex: + for m in endpoint_methods: + getattr(e, m).side_effect = self.ex() + msg = notification_msg.copy() msg['priority'] = self.priority @@ -73,7 +88,16 @@ class TestDispatcher(test_utils.BaseTestCase): for prio in itertools.chain.from_iterable( self.endpoints)))) - dispatcher({}, msg) + try: + dispatcher({}, msg) + except Exception as ex: + self.assertFalse(self.success, ex) + self.assertIsNotNone(self.ex, ex) + self.assertIsInstance(ex, self.ex, ex) + if isinstance(ex, messaging.NoSuchMethod): + self.assertEqual(ex.method, self.method) + else: + self.assertTrue(self.success) # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py index fb3132fd4..f2deec7fc 100644 --- a/tests/test_notify_listener.py +++ b/tests/test_notify_listener.py @@ -173,3 +173,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test') self._stop_listener(listener_thread) + + def test_requeue(self): + transport = messaging.get_transport(self.conf, url='fake:') + endpoint = mock.Mock() + endpoint.info = mock.Mock() + + def side_effect_requeue(*args, **kwargs): + if endpoint.info.call_count == 1: + raise messaging.RequeueMessageException() + + endpoint.info.side_effect = side_effect_requeue + listener_thread = self._setup_listener(transport, + [endpoint]) + notifier = self._setup_notifier(transport) + notifier.info({}, 'an_event.start', 'test') + + time.sleep(0.5) + + expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'), + mock.call({}, 'testpublisher', 'an_event.start', 'test')] + self.assertEqual(endpoint.info.call_args_list, expected) + self._stop_listener(listener_thread)