Use message.requeue instead of message.reject
Message.requeue does exactly the same as message.reject with requeue=True, so why not to use it directly. Change-Id: I42462172c2eb5d5c534e3c55ce7dfcc1a7169e31
This commit is contained in:
@@ -66,7 +66,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
|||||||
# acknowledge message before processing.
|
# acknowledge message before processing.
|
||||||
message.ack()
|
message.ack()
|
||||||
except kombu_exc.MessageStateError:
|
except kombu_exc.MessageStateError:
|
||||||
LOG.exception("Failed to acknowledge AMQP message")
|
LOG.exception("Failed to acknowledge AMQP message.")
|
||||||
else:
|
else:
|
||||||
LOG.debug("AMQP message acknowledged.")
|
LOG.debug("AMQP message acknowledged.")
|
||||||
# get task uuid from message correlation id parameter
|
# get task uuid from message correlation id parameter
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ class Server(object):
|
|||||||
"""This method is called on incoming request."""
|
"""This method is called on incoming request."""
|
||||||
LOG.debug("Got request: %s", request)
|
LOG.debug("Got request: %s", request)
|
||||||
# NOTE(skudriashev): Process all incoming requests only if proxy is
|
# NOTE(skudriashev): Process all incoming requests only if proxy is
|
||||||
# running, otherwise reject and requeue them.
|
# running, otherwise requeue them.
|
||||||
if self._proxy.is_running:
|
if self._proxy.is_running:
|
||||||
# NOTE(skudriashev): Process request only if message has been
|
# NOTE(skudriashev): Process request only if message has been
|
||||||
# acknowledged successfully.
|
# acknowledged successfully.
|
||||||
@@ -48,19 +48,19 @@ class Server(object):
|
|||||||
# acknowledge message
|
# acknowledge message
|
||||||
message.ack()
|
message.ack()
|
||||||
except kombu_exc.MessageStateError:
|
except kombu_exc.MessageStateError:
|
||||||
LOG.exception("Failed to acknowledge AMQP message")
|
LOG.exception("Failed to acknowledge AMQP message.")
|
||||||
else:
|
else:
|
||||||
LOG.debug("AMQP message acknowledged")
|
LOG.debug("AMQP message acknowledged.")
|
||||||
# spawn new thread to process request
|
# spawn new thread to process request
|
||||||
self._executor.submit(self._process_request, request, message)
|
self._executor.submit(self._process_request, request, message)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
# reject and requeue message
|
# requeue message
|
||||||
message.reject(requeue=True)
|
message.requeue()
|
||||||
except kombu_exc.MessageStateError:
|
except kombu_exc.MessageStateError:
|
||||||
LOG.exception("Failed to reject/requeue AMQP message")
|
LOG.exception("Failed to requeue AMQP message.")
|
||||||
else:
|
else:
|
||||||
LOG.debug("AMQP message rejected and requeued")
|
LOG.debug("AMQP message requeued.")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _parse_request(task, task_name, action, arguments, result=None,
|
def _parse_request(task, task_name, action, arguments, result=None,
|
||||||
|
|||||||
@@ -139,26 +139,26 @@ class TestServer(test.MockTestCase):
|
|||||||
]
|
]
|
||||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||||
|
|
||||||
def test_on_message_proxy_not_running_reject_success(self):
|
def test_on_message_proxy_not_running_requeue_success(self):
|
||||||
self.proxy_inst_mock.is_running = False
|
self.proxy_inst_mock.is_running = False
|
||||||
s = self.server(reset_master_mock=True)
|
s = self.server(reset_master_mock=True)
|
||||||
s._on_message({}, self.message_mock)
|
s._on_message({}, self.message_mock)
|
||||||
|
|
||||||
# check calls
|
# check calls
|
||||||
master_mock_calls = [
|
master_mock_calls = [
|
||||||
mock.call.message.reject(requeue=True)
|
mock.call.message.requeue()
|
||||||
]
|
]
|
||||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||||
|
|
||||||
def test_on_message_proxy_not_running_reject_failure(self):
|
def test_on_message_proxy_not_running_requeue_failure(self):
|
||||||
self.message_mock.reject.side_effect = exc.MessageStateError('Woot!')
|
self.message_mock.requeue.side_effect = exc.MessageStateError('Woot!')
|
||||||
self.proxy_inst_mock.is_running = False
|
self.proxy_inst_mock.is_running = False
|
||||||
s = self.server(reset_master_mock=True)
|
s = self.server(reset_master_mock=True)
|
||||||
s._on_message({}, self.message_mock)
|
s._on_message({}, self.message_mock)
|
||||||
|
|
||||||
# check calls
|
# check calls
|
||||||
master_mock_calls = [
|
master_mock_calls = [
|
||||||
mock.call.message.reject(requeue=True)
|
mock.call.message.requeue()
|
||||||
]
|
]
|
||||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user