Merge "Add missing ack to impl_qpid."
This commit is contained in:
@@ -137,7 +137,12 @@ class ConsumerBase(object):
|
||||
def consume(self):
|
||||
"""Fetch the message and pass it to the callback object"""
|
||||
message = self.receiver.fetch()
|
||||
self.callback(message.content)
|
||||
try:
|
||||
self.callback(message.content)
|
||||
except Exception:
|
||||
LOG.exception(_("Failed to process message... skipping it."))
|
||||
finally:
|
||||
self.session.acknowledge(message)
|
||||
|
||||
def get_receiver(self):
|
||||
return self.receiver
|
||||
|
@@ -296,6 +296,7 @@ class RpcQpidTestCase(test.TestCase):
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||
{"result": "foo", "failure": False, "ending": False}))
|
||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||
if multi:
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
@@ -303,16 +304,19 @@ class RpcQpidTestCase(test.TestCase):
|
||||
qpid.messaging.Message(
|
||||
{"result": "bar", "failure": False,
|
||||
"ending": False}))
|
||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(
|
||||
qpid.messaging.Message(
|
||||
{"result": "baz", "failure": False,
|
||||
"ending": False}))
|
||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||
self.mock_receiver)
|
||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||
{"failure": False, "ending": True}))
|
||||
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||
self.mock_session.close()
|
||||
self.mock_connection.session().AndReturn(self.mock_session)
|
||||
|
||||
|
Reference in New Issue
Block a user