Only ack message if successfully processed
This commit is contained in:
@@ -96,7 +96,6 @@ class NovaConsumerTestCase(unittest.TestCase):
|
|||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_queue_with_queue_args(self):
|
def test_create_queue_with_queue_args(self):
|
||||||
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
||||||
exchange = self.mox.CreateMockAnything()
|
exchange = self.mox.CreateMockAnything()
|
||||||
@@ -127,14 +126,38 @@ class NovaConsumerTestCase(unittest.TestCase):
|
|||||||
args = (routing_key, body_dict)
|
args = (routing_key, body_dict)
|
||||||
views.process_raw_data(deployment, args, json.dumps(args))\
|
views.process_raw_data(deployment, args, json.dumps(args))\
|
||||||
.AndReturn(raw)
|
.AndReturn(raw)
|
||||||
|
message.ack()
|
||||||
self.mox.StubOutWithMock(consumer, '_check_memory',
|
self.mox.StubOutWithMock(consumer, '_check_memory',
|
||||||
use_mock_anything=True)
|
use_mock_anything=True)
|
||||||
consumer._check_memory()
|
consumer._check_memory()
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
consumer._process(message)
|
consumer._process(message)
|
||||||
self.assertEqual(consumer.processed, 1)
|
self.assertEqual(consumer.processed, 1)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
|
def test_process_no_raw_dont_ack(self):
|
||||||
|
deployment = self.mox.CreateMockAnything()
|
||||||
|
raw = self.mox.CreateMockAnything()
|
||||||
|
message = self.mox.CreateMockAnything()
|
||||||
|
|
||||||
|
consumer = worker.NovaConsumer('test', None, deployment, True, {})
|
||||||
|
routing_key = 'monitor.info'
|
||||||
|
message.delivery_info = {'routing_key': routing_key}
|
||||||
|
body_dict = {u'key': u'value'}
|
||||||
|
message.body = json.dumps(body_dict)
|
||||||
|
self.mox.StubOutWithMock(views, 'process_raw_data',
|
||||||
|
use_mock_anything=True)
|
||||||
|
args = (routing_key, body_dict)
|
||||||
|
views.process_raw_data(deployment, args, json.dumps(args))\
|
||||||
|
.AndReturn(None)
|
||||||
|
self.mox.StubOutWithMock(consumer, '_check_memory',
|
||||||
|
use_mock_anything=True)
|
||||||
|
consumer._check_memory()
|
||||||
|
self.mox.ReplayAll()
|
||||||
|
consumer._process(message)
|
||||||
|
self.assertEqual(consumer.processed, 0)
|
||||||
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def test_run(self):
|
def test_run(self):
|
||||||
config = {
|
config = {
|
||||||
'name': 'east_coast.prod.global',
|
'name': 'east_coast.prod.global',
|
||||||
|
@@ -89,6 +89,7 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
raw = views.process_raw_data(self.deployment, args, asJson)
|
raw = views.process_raw_data(self.deployment, args, asJson)
|
||||||
if raw:
|
if raw:
|
||||||
self.processed += 1
|
self.processed += 1
|
||||||
|
message.ack()
|
||||||
|
|
||||||
self._check_memory()
|
self._check_memory()
|
||||||
|
|
||||||
@@ -125,7 +126,6 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
self._process(message)
|
self._process(message)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
LOG.exception("Problem %s" % e)
|
LOG.exception("Problem %s" % e)
|
||||||
message.ack()
|
|
||||||
|
|
||||||
|
|
||||||
def continue_running():
|
def continue_running():
|
||||||
|
Reference in New Issue
Block a user