From 37bdae0f0c7bf50a06a0fc7d52ceba2a5a00a5ee Mon Sep 17 00:00:00 2001 From: Stanislav Kudriashev Date: Tue, 11 Mar 2014 15:30:53 +0200 Subject: [PATCH] Use message.requeue instead of message.reject Message.requeue does exactly the same as message.reject with requeue=True, so why not to use it directly. Change-Id: I42462172c2eb5d5c534e3c55ce7dfcc1a7169e31 --- taskflow/engines/worker_based/executor.py | 2 +- taskflow/engines/worker_based/server.py | 14 +++++++------- taskflow/tests/unit/worker_based/test_server.py | 10 +++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index cf747f2d1..4c8364a26 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -66,7 +66,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # acknowledge message before processing. message.ack() except kombu_exc.MessageStateError: - LOG.exception("Failed to acknowledge AMQP message") + LOG.exception("Failed to acknowledge AMQP message.") else: LOG.debug("AMQP message acknowledged.") # get task uuid from message correlation id parameter diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index a554dcdeb..55ee2fe33 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -40,7 +40,7 @@ class Server(object): """This method is called on incoming request.""" LOG.debug("Got request: %s", request) # NOTE(skudriashev): Process all incoming requests only if proxy is - # running, otherwise reject and requeue them. + # running, otherwise requeue them. if self._proxy.is_running: # NOTE(skudriashev): Process request only if message has been # acknowledged successfully. @@ -48,19 +48,19 @@ class Server(object): # acknowledge message message.ack() except kombu_exc.MessageStateError: - LOG.exception("Failed to acknowledge AMQP message") + LOG.exception("Failed to acknowledge AMQP message.") else: - LOG.debug("AMQP message acknowledged") + LOG.debug("AMQP message acknowledged.") # spawn new thread to process request self._executor.submit(self._process_request, request, message) else: try: - # reject and requeue message - message.reject(requeue=True) + # requeue message + message.requeue() except kombu_exc.MessageStateError: - LOG.exception("Failed to reject/requeue AMQP message") + LOG.exception("Failed to requeue AMQP message.") else: - LOG.debug("AMQP message rejected and requeued") + LOG.debug("AMQP message requeued.") @staticmethod def _parse_request(task, task_name, action, arguments, result=None, diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 34af585a6..30e274644 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -139,26 +139,26 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - def test_on_message_proxy_not_running_reject_success(self): + def test_on_message_proxy_not_running_requeue_success(self): self.proxy_inst_mock.is_running = False s = self.server(reset_master_mock=True) s._on_message({}, self.message_mock) # check calls master_mock_calls = [ - mock.call.message.reject(requeue=True) + mock.call.message.requeue() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - def test_on_message_proxy_not_running_reject_failure(self): - self.message_mock.reject.side_effect = exc.MessageStateError('Woot!') + def test_on_message_proxy_not_running_requeue_failure(self): + self.message_mock.requeue.side_effect = exc.MessageStateError('Woot!') self.proxy_inst_mock.is_running = False s = self.server(reset_master_mock=True) s._on_message({}, self.message_mock) # check calls master_mock_calls = [ - mock.call.message.reject(requeue=True) + mock.call.message.requeue() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls)