Allow to requeue the notification message

This patch allow to requeue the notification received by the
notification listener.

Partial implements blueprint notification-subscriber-server

Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
This commit is contained in:
Mehdi Abaakouk 2013-12-11 16:50:09 +01:00 committed by Mehdi Abaakouk
parent d581b8a16d
commit a7eb574128
14 changed files with 135 additions and 23 deletions

View File

@ -66,6 +66,9 @@ class AMQPIncomingMessage(base.IncomingMessage):
def acknowledge(self):
self.message.acknowledge()
def requeue(self):
self.message.requeue()
class AMQPListener(base.Listener):

View File

@ -41,6 +41,10 @@ class IncomingMessage(object):
def acknowledge(self):
"Acknowledge the message."
@abc.abstractmethod
def requeue(self):
"Requeue the message."
@six.add_metaclass(abc.ABCMeta)
class Listener(object):

View File

@ -26,8 +26,12 @@ from oslo.messaging._drivers import base
class FakeIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, reply_q):
def __init__(self, listener, ctxt, message, topic, server, fanout,
reply_q):
super(FakeIncomingMessage, self).__init__(listener, ctxt, message)
self._topic = topic
self._server = server
self._fanout = fanout
self._reply_q = reply_q
def reply(self, reply=None, failure=None, log_failure=True):
@ -39,6 +43,12 @@ class FakeIncomingMessage(base.IncomingMessage):
def acknowledge():
pass
def requeue(self):
self.listener._exchange.deliver_message(
self._topic, self.ctxt, self.message,
server=self._server, fanout=self._fanout,
reply_q=self._reply_q)
class FakeListener(base.Listener):
@ -50,9 +60,12 @@ class FakeListener(base.Listener):
def poll(self):
while True:
for target in self._targets:
(ctxt, message, reply_q) = self._exchange.poll(target)
(ctxt, message, server, fanout, reply_q) = \
self._exchange.poll(target)
if message is not None:
message = FakeIncomingMessage(self, ctxt, message, reply_q)
message = FakeIncomingMessage(self, ctxt, message,
target.topic,
server, fanout, reply_q)
message.acknowledge()
return message
time.sleep(.05)
@ -83,7 +96,7 @@ class FakeExchange(object):
else:
queues = [self._get_topic_queue(topic)]
for queue in queues:
queue.append((ctxt, message, reply_q))
queue.append((ctxt, message, server, fanout, reply_q))
def poll(self, target):
with self._queues_lock:
@ -91,7 +104,7 @@ class FakeExchange(object):
queue = self._get_server_queue(target.topic, target.server)
else:
queue = self._get_topic_queue(target.topic)
return queue.pop(0) if queue else (None, None, None)
return queue.pop(0) if queue else (None, None, None, None, None)
class FakeDriver(base.BaseDriver):

View File

@ -101,6 +101,11 @@ class QpidMessage(dict):
def acknowledge(self):
self._session.acknowledge(self._raw_message)
@staticmethod
def requeue():
raise NotImplementedError('The QPID driver does not yet support '
'requeue messages')
class ConsumerBase(object):
"""Consumer base class."""

View File

@ -127,6 +127,9 @@ class RabbitMessage(dict):
def acknowledge(self):
self._raw_message.ack()
def requeue(self):
self._raw_message.requeue()
class ConsumerBase(object):
"""Consumer base class."""

View File

@ -847,6 +847,10 @@ class ZmqIncomingMessage(base.IncomingMessage):
def acknowledge():
pass
def requeue(self):
raise NotImplementedError('The ZeroMQ driver does not yet support '
'requeuing message')
class ZmqListener(base.Listener):
@ -964,6 +968,8 @@ class ZmqDriver(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities):
# NOTE(sileht): this listener implementation is limited
# because zeromq doesn't support requeing message
conn = create_connection(self.conf)
listener = ZmqListener(self, None)

View File

@ -34,6 +34,9 @@ class ExecutorBase(object):
def _dispatch(self, incoming):
try:
self.callback(incoming.ctxt, dict(incoming.message))
except messaging.RequeueMessageException:
_LOG.debug('Requeue exception during message handling')
raise
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo import messaging
from oslo.messaging._executors import base
@ -41,8 +42,12 @@ class BlockingExecutor(base.ExecutorBase):
incoming.acknowledge()
self._dispatch_and_reply(incoming)
else:
self._dispatch(incoming)
incoming.acknowledge()
try:
self._dispatch(incoming)
except messaging.RequeueMessageException:
incoming.requeue()
else:
incoming.acknowledge()
def stop(self):
self._running = False

View File

@ -19,6 +19,7 @@ import greenlet
from oslo.config import cfg
from oslo import messaging
from oslo.messaging._executors import base
from oslo.messaging.openstack.common import excutils
@ -67,9 +68,13 @@ class EventletExecutor(base.ExecutorBase):
except greenlet.GreenletExit:
return
def _acknowledgement_callback(self, greenthread, incoming):
greenthread.wait()
incoming.acknowledge()
def _acknowledgement_callback(self, thread, incoming):
try:
thread.wait()
except messaging.RequeueMessageException:
incoming.requeue()
else:
incoming.acknowledge()
def stop(self):
if self._thread is None:

View File

@ -15,7 +15,8 @@
__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_listener']
'get_notification_listener',
'RequeueMessageException']
from .notifier import *
from .listener import *

View File

@ -81,6 +81,14 @@ from oslo.messaging.notify import dispatcher as notify_dispatcher
from oslo.messaging import server as msg_server
class RequeueMessageException(Exception):
"""Encapsulates an Requeue exception
Merely instantiating this exception will ask to the executor to
requeue the message
"""
def get_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None):
"""Construct a notification listener

View File

@ -19,6 +19,7 @@ import threading
import mock
import testscenarios
from oslo import messaging
from oslo.messaging._executors import impl_blocking
from oslo.messaging._executors import impl_eventlet
from tests import utils as test_utils
@ -29,8 +30,12 @@ load_tests = testscenarios.load_tests_apply_scenarios
class TestExecutor(test_utils.BaseTestCase):
_scenarios = [
('rpc', dict(sender_expects_reply=True)),
('notify', dict(sender_expects_reply=False))
('rpc', dict(sender_expects_reply=True,
msg_action='acknowledge')),
('notify_ack', dict(sender_expects_reply=False,
msg_action='acknowledge')),
('notify_requeue', dict(sender_expects_reply=False,
msg_action='requeue')),
]
_impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
@ -55,8 +60,12 @@ class TestExecutor(test_utils.BaseTestCase):
def test_executor_dispatch(self):
callback = mock.MagicMock(sender_expects_reply=
self.sender_expects_reply,
return_value='result')
self.sender_expects_reply)
if self.msg_action == 'acknowledge':
callback.return_value = 'result'
elif self.msg_action == 'requeue':
callback.side_effect = messaging.RequeueMessageException()
listener = mock.Mock(spec=['poll'])
executor = self.executor(self.conf, listener, callback)
@ -76,7 +85,8 @@ class TestExecutor(test_utils.BaseTestCase):
self._run_in_thread(executor)
incoming_message.acknowledge.assert_called_once_with()
msg_action_method = getattr(incoming_message, self.msg_action)
msg_action_method.assert_called_once_with()
callback.assert_called_once_with({}, {'payload': 'data'})
if self.sender_expects_reply:
incoming_message.reply.assert_called_once_with('result')

View File

@ -41,24 +41,39 @@ class TestDispatcher(test_utils.BaseTestCase):
('no_endpoints',
dict(endpoints=[],
endpoints_expect_calls=[],
priority='info')),
priority='info',
success=True, ex=None)),
('one_endpoints',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn')),
priority='warn',
success=True, ex=None)),
('two_endpoints_only_one_match',
dict(endpoints=[['warn'], ['info']],
endpoints_expect_calls=[None, 'info'],
priority='info')),
priority='info',
success=True, ex=None)),
('two_endpoints_both_match',
dict(endpoints=[['debug', 'info'], ['info', 'debug']],
endpoints_expect_calls=['debug', 'debug'],
priority='debug')),
priority='debug',
success=True, ex=None)),
('requeue_exception',
dict(endpoints=[['debug', 'warn']],
endpoints_expect_calls=['debug'],
priority='debug', msg=notification_msg,
success=False, ex=messaging.RequeueMessageException)),
]
def test_dispatcher(self):
endpoints = [mock.Mock(spec=endpoint_methods)
for endpoint_methods in self.endpoints]
endpoints = []
for endpoint_methods in self.endpoints:
e = mock.Mock(spec=endpoint_methods)
endpoints.append(e)
if self.ex:
for m in endpoint_methods:
getattr(e, m).side_effect = self.ex()
msg = notification_msg.copy()
msg['priority'] = self.priority
@ -73,7 +88,16 @@ class TestDispatcher(test_utils.BaseTestCase):
for prio in itertools.chain.from_iterable(
self.endpoints))))
dispatcher({}, msg)
try:
dispatcher({}, msg)
except Exception as ex:
self.assertFalse(self.success, ex)
self.assertIsNotNone(self.ex, ex)
self.assertIsInstance(ex, self.ex, ex)
if isinstance(ex, messaging.NoSuchMethod):
self.assertEqual(ex.method, self.method)
else:
self.assertTrue(self.success)
# check endpoint callbacks are called or not
for i, endpoint_methods in enumerate(self.endpoints):

View File

@ -173,3 +173,25 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')
self._stop_listener(listener_thread)
def test_requeue(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
def side_effect_requeue(*args, **kwargs):
if endpoint.info.call_count == 1:
raise messaging.RequeueMessageException()
endpoint.info.side_effect = side_effect_requeue
listener_thread = self._setup_listener(transport,
[endpoint])
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
time.sleep(0.5)
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'),
mock.call({}, 'testpublisher', 'an_event.start', 'test')]
self.assertEqual(endpoint.info.call_args_list, expected)
self._stop_listener(listener_thread)