diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index cf747f2d1..4c8364a26 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -66,7 +66,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # acknowledge message before processing. message.ack() except kombu_exc.MessageStateError: - LOG.exception("Failed to acknowledge AMQP message") + LOG.exception("Failed to acknowledge AMQP message.") else: LOG.debug("AMQP message acknowledged.") # get task uuid from message correlation id parameter diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index a554dcdeb..55ee2fe33 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -40,7 +40,7 @@ class Server(object): """This method is called on incoming request.""" LOG.debug("Got request: %s", request) # NOTE(skudriashev): Process all incoming requests only if proxy is - # running, otherwise reject and requeue them. + # running, otherwise requeue them. if self._proxy.is_running: # NOTE(skudriashev): Process request only if message has been # acknowledged successfully. @@ -48,19 +48,19 @@ class Server(object): # acknowledge message message.ack() except kombu_exc.MessageStateError: - LOG.exception("Failed to acknowledge AMQP message") + LOG.exception("Failed to acknowledge AMQP message.") else: - LOG.debug("AMQP message acknowledged") + LOG.debug("AMQP message acknowledged.") # spawn new thread to process request self._executor.submit(self._process_request, request, message) else: try: - # reject and requeue message - message.reject(requeue=True) + # requeue message + message.requeue() except kombu_exc.MessageStateError: - LOG.exception("Failed to reject/requeue AMQP message") + LOG.exception("Failed to requeue AMQP message.") else: - LOG.debug("AMQP message rejected and requeued") + LOG.debug("AMQP message requeued.") @staticmethod def _parse_request(task, task_name, action, arguments, result=None, diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 34af585a6..30e274644 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -139,26 +139,26 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - def test_on_message_proxy_not_running_reject_success(self): + def test_on_message_proxy_not_running_requeue_success(self): self.proxy_inst_mock.is_running = False s = self.server(reset_master_mock=True) s._on_message({}, self.message_mock) # check calls master_mock_calls = [ - mock.call.message.reject(requeue=True) + mock.call.message.requeue() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - def test_on_message_proxy_not_running_reject_failure(self): - self.message_mock.reject.side_effect = exc.MessageStateError('Woot!') + def test_on_message_proxy_not_running_requeue_failure(self): + self.message_mock.requeue.side_effect = exc.MessageStateError('Woot!') self.proxy_inst_mock.is_running = False s = self.server(reset_master_mock=True) s._on_message({}, self.message_mock) # check calls master_mock_calls = [ - mock.call.message.reject(requeue=True) + mock.call.message.requeue() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls)