diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index dcf08287e..82b36412e 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -67,6 +67,11 @@ class BaseDriver(object): self._default_exchange = default_exchange self._allowed_remote_exmods = allowed_remote_exmods + def require_features(self, requeue=False): + if requeue: + raise NotImplementedError('Message requeueing not supported by ' + 'this transport driver') + @abc.abstractmethod def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=False): diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 2bfbe16b3..564dc21ce 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -112,6 +112,9 @@ class FakeDriver(base.BaseDriver): self._exchanges_lock = threading.Lock() self._exchanges = {} + def require_features(self, requeue=True): + pass + @staticmethod def _check_serialize(message): """Make sure a message intended for rpc can be serialized. diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index c53ad8946..f9b275f70 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -100,8 +100,7 @@ class QpidMessage(dict): self._session.acknowledge(self._raw_message) def requeue(self): - raise NotImplementedError('The QPID driver does not yet support ' - 'requeuing messages') + pass class ConsumerBase(object): diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index be41a4ec1..4df6964b5 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -744,3 +744,6 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): connection_pool, default_exchange, allowed_remote_exmods) + + def require_features(self, requeue=True): + pass diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 6b2b53957..af5b612b2 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -844,8 +844,7 @@ class ZmqIncomingMessage(base.IncomingMessage): self.condition.notify() def requeue(self): - raise NotImplementedError('The ZeroMQ driver does not yet support ' - 'requeuing messages') + pass class ZmqListener(base.Listener): diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py index 87deee854..ca8a2f01e 100644 --- a/oslo/messaging/notify/dispatcher.py +++ b/oslo/messaging/notify/dispatcher.py @@ -44,10 +44,11 @@ class NotificationDispatcher(object): message to the endpoints """ - def __init__(self, targets, endpoints, serializer): + def __init__(self, targets, endpoints, serializer, allow_requeue): self.targets = targets self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() + self.allow_requeue = allow_requeue self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): @@ -114,7 +115,7 @@ class NotificationDispatcher(object): try: ret = callback(ctxt, publisher_id, event_type, payload) ret = NotificationResult.HANDLED if ret is None else ret - if ret != NotificationResult.HANDLED: + if self.allow_requeue and ret == NotificationResult.REQUEUE: return ret finally: localcontext.clear_local_context() diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py index 06a2e24ce..c6c17df8b 100644 --- a/oslo/messaging/notify/listener.py +++ b/oslo/messaging/notify/listener.py @@ -73,26 +73,20 @@ priority Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload. -An endpoint method can return explicitly messaging.NotificationResult.HANDLED +By supplying a serializer object, a listener can deserialize a request context +and arguments from - and serialize return values to - primitive types. + +An endpoint method can explicitly return messaging.NotificationResult.HANDLED to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the message. -The message is acknowledge only if all endpoints return -messaging.NotificationResult.HANDLED +The message is acknowledged only if all endpoints either return +messaging.NotificationResult.HANDLED or None. -If nothing is returned by an endpoint, this is considered like -messaging.NotificationResult.HANDLED - -messaging.NotificationResult values needs to be handled by drivers: - -* HANDLED: supported by all drivers -* REQUEUE: supported by drivers: fake://, rabbit:// - -In case of an unsupported driver nothing is done to the message and a -NotImplementedError is raised and logged. - -By supplying a serializer object, a listener can deserialize a request context -and arguments from - and serialize return values to - primitive types. +Note that not all transport drivers implement support for requeueing. In order +to use this feature, applications should assert that the feature is available +by passing allow_requeue=True to get_notification_listener(). If the driver +does not support requeueing, it will raise NotImplementedError at this point. """ from oslo.messaging.notify import dispatcher as notify_dispatcher @@ -100,7 +94,8 @@ from oslo.messaging import server as msg_server def get_notification_listener(transport, targets, endpoints, - executor='blocking', serializer=None): + executor='blocking', serializer=None, + allow_requeue=False): """Construct a notification listener The executor parameter controls how incoming messages will be received and @@ -117,7 +112,12 @@ def get_notification_listener(transport, targets, endpoints, :type executor: str :param serializer: an optional entity serializer :type serializer: Serializer + :param allow_requeue: whether NotificationResult.REQUEUE support is needed + :type allow_requeue: bool + :raises: NotImplementedError """ + transport._require_driver_features(requeue=allow_requeue) dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints, - serializer) + serializer, + allow_requeue) return msg_server.MessageHandlingServer(transport, dispatcher, executor) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 7c8a3be97..16e6f49bd 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -78,6 +78,9 @@ class Transport(object): self.conf = driver.conf self._driver = driver + def _require_driver_features(self, requeue=False): + self._driver.require_features(requeue=requeue) + def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py index 3e2c966a1..7b3531374 100644 --- a/tests/test_notify_dispatcher.py +++ b/tests/test_notify_dispatcher.py @@ -97,9 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = self.priority targets = [messaging.Target(topic='notifications')] - dispatcher = notify_dispatcher.NotificationDispatcher(targets, - endpoints, - None) + dispatcher = notify_dispatcher.NotificationDispatcher( + targets, endpoints, None, allow_requeue=True) # check it listen on wanted topics self.assertEqual(sorted(dispatcher._targets_priorities), @@ -138,9 +137,8 @@ class TestDispatcher(test_utils.BaseTestCase): def test_dispatcher_unknown_prio(self, mylog): msg = notification_msg.copy() msg['priority'] = 'what???' - dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()], - [mock.Mock()], - None) + dispatcher = notify_dispatcher.NotificationDispatcher( + [mock.Mock()], [mock.Mock()], None, allow_requeue=True) with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: callback() mylog.warning.assert_called_once_with('Unknown priority "what???"') diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py index 3b35a3bb1..0cf5f5405 100644 --- a/tests/test_notify_listener.py +++ b/tests/test_notify_listener.py @@ -35,7 +35,7 @@ class ListenerSetupMixin(object): self._expect_messages = expect_messages self._received_msgs = 0 self._listener = messaging.get_notification_listener( - transport, targets, endpoints + [self]) + transport, targets, endpoints + [self], allow_requeue=True) def info(self, ctxt, publisher_id, event_type, payload): self._received_msgs += 1