Add missing ack to impl_qpid.
Fix bug 1012374. Johannes Erdfelt pointed out that impl_qpid wasn't acking messages that it received. This turned out to be a nasty oversight, resulting in unbounded message queue growth inside of the python-qpid library. This fixes it. Change-Id: I0370293807f0282e1dbdd59246f70be031e888a9
This commit is contained in:
@@ -137,7 +137,12 @@ class ConsumerBase(object):
|
|||||||
def consume(self):
|
def consume(self):
|
||||||
"""Fetch the message and pass it to the callback object"""
|
"""Fetch the message and pass it to the callback object"""
|
||||||
message = self.receiver.fetch()
|
message = self.receiver.fetch()
|
||||||
self.callback(message.content)
|
try:
|
||||||
|
self.callback(message.content)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception(_("Failed to process message... skipping it."))
|
||||||
|
finally:
|
||||||
|
self.session.acknowledge(message)
|
||||||
|
|
||||||
def get_receiver(self):
|
def get_receiver(self):
|
||||||
return self.receiver
|
return self.receiver
|
||||||
|
|||||||
@@ -296,6 +296,7 @@ class RpcQpidTestCase(test.TestCase):
|
|||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||||
{"result": "foo", "failure": False, "ending": False}))
|
{"result": "foo", "failure": False, "ending": False}))
|
||||||
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
if multi:
|
if multi:
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
@@ -303,16 +304,19 @@ class RpcQpidTestCase(test.TestCase):
|
|||||||
qpid.messaging.Message(
|
qpid.messaging.Message(
|
||||||
{"result": "bar", "failure": False,
|
{"result": "bar", "failure": False,
|
||||||
"ending": False}))
|
"ending": False}))
|
||||||
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(
|
self.mock_receiver.fetch().AndReturn(
|
||||||
qpid.messaging.Message(
|
qpid.messaging.Message(
|
||||||
{"result": "baz", "failure": False,
|
{"result": "baz", "failure": False,
|
||||||
"ending": False}))
|
"ending": False}))
|
||||||
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
|
||||||
self.mock_receiver)
|
self.mock_receiver)
|
||||||
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
|
||||||
{"failure": False, "ending": True}))
|
{"failure": False, "ending": True}))
|
||||||
|
self.mock_session.acknowledge(mox.IgnoreArg())
|
||||||
self.mock_session.close()
|
self.mock_session.close()
|
||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user