Merge "notification listener: add allow_requeue param"
This commit is contained in:
@@ -67,6 +67,11 @@ class BaseDriver(object):
|
|||||||
self._default_exchange = default_exchange
|
self._default_exchange = default_exchange
|
||||||
self._allowed_remote_exmods = allowed_remote_exmods
|
self._allowed_remote_exmods = allowed_remote_exmods
|
||||||
|
|
||||||
|
def require_features(self, requeue=False):
|
||||||
|
if requeue:
|
||||||
|
raise NotImplementedError('Message requeueing not supported by '
|
||||||
|
'this transport driver')
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def send(self, target, ctxt, message,
|
def send(self, target, ctxt, message,
|
||||||
wait_for_reply=None, timeout=None, envelope=False):
|
wait_for_reply=None, timeout=None, envelope=False):
|
||||||
|
|||||||
@@ -122,6 +122,9 @@ class FakeDriver(base.BaseDriver):
|
|||||||
|
|
||||||
self._exchange_manager = FakeExchangeManager(default_exchange)
|
self._exchange_manager = FakeExchangeManager(default_exchange)
|
||||||
|
|
||||||
|
def require_features(self, requeue=True):
|
||||||
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _check_serialize(message):
|
def _check_serialize(message):
|
||||||
"""Make sure a message intended for rpc can be serialized.
|
"""Make sure a message intended for rpc can be serialized.
|
||||||
|
|||||||
@@ -100,8 +100,7 @@ class QpidMessage(dict):
|
|||||||
self._session.acknowledge(self._raw_message)
|
self._session.acknowledge(self._raw_message)
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
raise NotImplementedError('The QPID driver does not yet support '
|
pass
|
||||||
'requeuing messages')
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerBase(object):
|
class ConsumerBase(object):
|
||||||
|
|||||||
@@ -744,3 +744,6 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
|||||||
connection_pool,
|
connection_pool,
|
||||||
default_exchange,
|
default_exchange,
|
||||||
allowed_remote_exmods)
|
allowed_remote_exmods)
|
||||||
|
|
||||||
|
def require_features(self, requeue=True):
|
||||||
|
pass
|
||||||
|
|||||||
@@ -844,8 +844,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
|||||||
self.condition.notify()
|
self.condition.notify()
|
||||||
|
|
||||||
def requeue(self):
|
def requeue(self):
|
||||||
raise NotImplementedError('The ZeroMQ driver does not yet support '
|
pass
|
||||||
'requeuing messages')
|
|
||||||
|
|
||||||
|
|
||||||
class ZmqListener(base.Listener):
|
class ZmqListener(base.Listener):
|
||||||
|
|||||||
@@ -44,10 +44,11 @@ class NotificationDispatcher(object):
|
|||||||
message to the endpoints
|
message to the endpoints
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, targets, endpoints, serializer):
|
def __init__(self, targets, endpoints, serializer, allow_requeue):
|
||||||
self.targets = targets
|
self.targets = targets
|
||||||
self.endpoints = endpoints
|
self.endpoints = endpoints
|
||||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||||
|
self.allow_requeue = allow_requeue
|
||||||
|
|
||||||
self._callbacks_by_priority = {}
|
self._callbacks_by_priority = {}
|
||||||
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
|
||||||
@@ -119,7 +120,7 @@ class NotificationDispatcher(object):
|
|||||||
ret = callback(ctxt, publisher_id, event_type, payload,
|
ret = callback(ctxt, publisher_id, event_type, payload,
|
||||||
metadata)
|
metadata)
|
||||||
ret = NotificationResult.HANDLED if ret is None else ret
|
ret = NotificationResult.HANDLED if ret is None else ret
|
||||||
if ret != NotificationResult.HANDLED:
|
if self.allow_requeue and ret == NotificationResult.REQUEUE:
|
||||||
return ret
|
return ret
|
||||||
finally:
|
finally:
|
||||||
localcontext.clear_local_context()
|
localcontext.clear_local_context()
|
||||||
|
|||||||
@@ -73,26 +73,20 @@ priority
|
|||||||
Parameters to endpoint methods are the request context supplied by the client,
|
Parameters to endpoint methods are the request context supplied by the client,
|
||||||
the publisher_id of the notification message, the event_type, the payload.
|
the publisher_id of the notification message, the event_type, the payload.
|
||||||
|
|
||||||
An endpoint method can return explicitly messaging.NotificationResult.HANDLED
|
By supplying a serializer object, a listener can deserialize a request context
|
||||||
|
and arguments from - and serialize return values to - primitive types.
|
||||||
|
|
||||||
|
An endpoint method can explicitly return messaging.NotificationResult.HANDLED
|
||||||
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
|
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
|
||||||
message.
|
message.
|
||||||
|
|
||||||
The message is acknowledge only if all endpoints return
|
The message is acknowledged only if all endpoints either return
|
||||||
messaging.NotificationResult.HANDLED
|
messaging.NotificationResult.HANDLED or None.
|
||||||
|
|
||||||
If nothing is returned by an endpoint, this is considered like
|
Note that not all transport drivers implement support for requeueing. In order
|
||||||
messaging.NotificationResult.HANDLED
|
to use this feature, applications should assert that the feature is available
|
||||||
|
by passing allow_requeue=True to get_notification_listener(). If the driver
|
||||||
messaging.NotificationResult values needs to be handled by drivers:
|
does not support requeueing, it will raise NotImplementedError at this point.
|
||||||
|
|
||||||
* HANDLED: supported by all drivers
|
|
||||||
* REQUEUE: supported by drivers: fake://, rabbit://
|
|
||||||
|
|
||||||
In case of an unsupported driver nothing is done to the message and a
|
|
||||||
NotImplementedError is raised and logged.
|
|
||||||
|
|
||||||
By supplying a serializer object, a listener can deserialize a request context
|
|
||||||
and arguments from - and serialize return values to - primitive types.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from oslo.messaging.notify import dispatcher as notify_dispatcher
|
from oslo.messaging.notify import dispatcher as notify_dispatcher
|
||||||
@@ -100,7 +94,8 @@ from oslo.messaging import server as msg_server
|
|||||||
|
|
||||||
|
|
||||||
def get_notification_listener(transport, targets, endpoints,
|
def get_notification_listener(transport, targets, endpoints,
|
||||||
executor='blocking', serializer=None):
|
executor='blocking', serializer=None,
|
||||||
|
allow_requeue=False):
|
||||||
"""Construct a notification listener
|
"""Construct a notification listener
|
||||||
|
|
||||||
The executor parameter controls how incoming messages will be received and
|
The executor parameter controls how incoming messages will be received and
|
||||||
@@ -117,7 +112,12 @@ def get_notification_listener(transport, targets, endpoints,
|
|||||||
:type executor: str
|
:type executor: str
|
||||||
:param serializer: an optional entity serializer
|
:param serializer: an optional entity serializer
|
||||||
:type serializer: Serializer
|
:type serializer: Serializer
|
||||||
|
:param allow_requeue: whether NotificationResult.REQUEUE support is needed
|
||||||
|
:type allow_requeue: bool
|
||||||
|
:raises: NotImplementedError
|
||||||
"""
|
"""
|
||||||
|
transport._require_driver_features(requeue=allow_requeue)
|
||||||
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
|
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
|
||||||
serializer)
|
serializer,
|
||||||
|
allow_requeue)
|
||||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||||
|
|||||||
@@ -78,6 +78,9 @@ class Transport(object):
|
|||||||
self.conf = driver.conf
|
self.conf = driver.conf
|
||||||
self._driver = driver
|
self._driver = driver
|
||||||
|
|
||||||
|
def _require_driver_features(self, requeue=False):
|
||||||
|
self._driver.require_features(requeue=requeue)
|
||||||
|
|
||||||
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||||
if not target.topic:
|
if not target.topic:
|
||||||
raise exceptions.InvalidTarget('A topic is required to send',
|
raise exceptions.InvalidTarget('A topic is required to send',
|
||||||
|
|||||||
@@ -97,9 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
|||||||
msg['priority'] = self.priority
|
msg['priority'] = self.priority
|
||||||
|
|
||||||
targets = [messaging.Target(topic='notifications')]
|
targets = [messaging.Target(topic='notifications')]
|
||||||
dispatcher = notify_dispatcher.NotificationDispatcher(targets,
|
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||||
endpoints,
|
targets, endpoints, None, allow_requeue=True)
|
||||||
None)
|
|
||||||
|
|
||||||
# check it listen on wanted topics
|
# check it listen on wanted topics
|
||||||
self.assertEqual(sorted(dispatcher._targets_priorities),
|
self.assertEqual(sorted(dispatcher._targets_priorities),
|
||||||
@@ -141,9 +140,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
|||||||
def test_dispatcher_unknown_prio(self, mylog):
|
def test_dispatcher_unknown_prio(self, mylog):
|
||||||
msg = notification_msg.copy()
|
msg = notification_msg.copy()
|
||||||
msg['priority'] = 'what???'
|
msg['priority'] = 'what???'
|
||||||
dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
|
dispatcher = notify_dispatcher.NotificationDispatcher(
|
||||||
[mock.Mock()],
|
[mock.Mock()], [mock.Mock()], None, allow_requeue=True)
|
||||||
None)
|
|
||||||
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
|
||||||
callback()
|
callback()
|
||||||
mylog.warning.assert_called_once_with('Unknown priority "what???"')
|
mylog.warning.assert_called_once_with('Unknown priority "what???"')
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ class ListenerSetupMixin(object):
|
|||||||
self._expect_messages = expect_messages
|
self._expect_messages = expect_messages
|
||||||
self._received_msgs = 0
|
self._received_msgs = 0
|
||||||
self._listener = messaging.get_notification_listener(
|
self._listener = messaging.get_notification_listener(
|
||||||
transport, targets, endpoints + [self])
|
transport, targets, endpoints + [self], allow_requeue=True)
|
||||||
|
|
||||||
def info(self, ctxt, publisher_id, event_type, payload):
|
def info(self, ctxt, publisher_id, event_type, payload):
|
||||||
self._received_msgs += 1
|
self._received_msgs += 1
|
||||||
|
|||||||
Reference in New Issue
Block a user