diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 3b5a0355..9ff7078b 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -110,7 +110,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # publish waiting requests for request in self._requests_cache.get_waiting_requests(tasks): - if request.transition_log_error(pr.PENDING, logger=LOG): + if request.transition_and_log_error(pr.PENDING, logger=LOG): self._publish_request(request, topic) def _process_response(self, response, message): @@ -125,12 +125,12 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): if request is not None: response = pr.Response.from_dict(response) if response.state == pr.RUNNING: - request.transition_log_error(pr.RUNNING, logger=LOG) + request.transition_and_log_error(pr.RUNNING, logger=LOG) elif response.state == pr.PROGRESS: request.on_progress(**response.data) elif response.state in (pr.FAILURE, pr.SUCCESS): - moved = request.transition_log_error(response.state, - logger=LOG) + moved = request.transition_and_log_error(response.state, + logger=LOG) if moved: # NOTE(imelnikov): request should not be in the # cache when another thread can see its result and @@ -151,7 +151,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): When request has expired it is removed from the requests cache and the `RequestTimeout` exception is set as a request result. """ - if request.transition_log_error(pr.FAILURE, logger=LOG): + if request.transition_and_log_error(pr.FAILURE, logger=LOG): # Raise an exception (and then catch it) so we get a nice # traceback that the request will get instead of it getting # just an exception with no traceback... @@ -184,7 +184,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # before putting it into the requests cache to prevent the notify # processing thread get list of waiting requests and publish it # before it is published here, so it wouldn't be published twice. - if request.transition_log_error(pr.PENDING, logger=LOG): + if request.transition_and_log_error(pr.PENDING, logger=LOG): self._requests_cache[request.uuid] = request self._publish_request(request, topic) else: @@ -202,7 +202,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): except Exception: with misc.capture_failure() as failure: LOG.exception("Failed to submit the '%s' request.", request) - if request.transition_log_error(pr.FAILURE, logger=LOG): + if request.transition_and_log_error(pr.FAILURE, logger=LOG): del self._requests_cache[request.uuid] request.set_result(failure) diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 334c1d93..6e54f9fb 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -295,7 +295,16 @@ class Request(Message): def on_progress(self, event_data, progress): self._progress_callback(self._task, event_data, progress) - def transition_log_error(self, new_state, logger=None): + def transition_and_log_error(self, new_state, logger=None): + """Transitions *and* logs an error if that transitioning raises. + + This overlays the transition function and performs nearly the same + functionality but instead of raising if the transition was not valid + it logs a warning to the provided logger and returns False to + indicate that the transition was not performed (note that this + is *different* from the transition function where False means + ignored). + """ if logger is None: logger = LOG moved = False @@ -311,7 +320,7 @@ class Request(Message): """Transitions the request to a new state. If transition was performed, it returns True. If transition - should was ignored, it returns False. If transition is not + should was ignored, it returns False. If transition was not valid (and will not be performed), it raises an InvalidState exception. """ diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index aa236fcf..e6c97e17 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -97,7 +97,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex._process_response(response.to_dict(), self.message_mock) expected_calls = [ - mock.call.transition_log_error(pr.RUNNING, logger=mock.ANY), + mock.call.transition_and_log_error(pr.RUNNING, logger=mock.ANY), ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) @@ -120,7 +120,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache), 0) expected_calls = [ - mock.call.transition_log_error(pr.FAILURE, logger=mock.ANY), + mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), mock.call.set_result(result=utils.FailureMatcher(failure)) ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) @@ -133,7 +133,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex._process_response(response.to_dict(), self.message_mock) expected_calls = [ - mock.call.transition_log_error(pr.SUCCESS, logger=mock.ANY), + mock.call.transition_and_log_error(pr.SUCCESS, logger=mock.ANY), mock.call.set_result(result=self.task_result, event='executed') ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) @@ -212,8 +212,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): expected_calls = [ mock.call.Request(self.task, self.task_uuid, 'execute', self.task_args, None, self.timeout), - mock.call.request.transition_log_error(pr.PENDING, - logger=mock.ANY), + mock.call.request.transition_and_log_error(pr.PENDING, + logger=mock.ANY), mock.call.proxy.publish(msg=self.request_inst_mock, routing_key=self.executor_topic, reply_to=self.executor_uuid, @@ -234,8 +234,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.task_args, None, self.timeout, failures=self.task_failures, result=self.task_result), - mock.call.request.transition_log_error(pr.PENDING, - logger=mock.ANY), + mock.call.request.transition_and_log_error(pr.PENDING, + logger=mock.ANY), mock.call.proxy.publish(msg=self.request_inst_mock, routing_key=self.executor_topic, reply_to=self.executor_uuid, @@ -265,14 +265,14 @@ class TestWorkerTaskExecutor(test.MockTestCase): expected_calls = [ mock.call.Request(self.task, self.task_uuid, 'execute', self.task_args, None, self.timeout), - mock.call.request.transition_log_error(pr.PENDING, - logger=mock.ANY), + mock.call.request.transition_and_log_error(pr.PENDING, + logger=mock.ANY), mock.call.proxy.publish(msg=self.request_inst_mock, routing_key=self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid), - mock.call.request.transition_log_error(pr.FAILURE, - logger=mock.ANY), + mock.call.request.transition_and_log_error(pr.FAILURE, + logger=mock.ANY), mock.call.request.set_result(mock.ANY) ] self.assertEqual(expected_calls, self.master_mock.mock_calls)