diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index e059be5fd..c4df9afb1 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -145,6 +145,24 @@ class BaseTaskFlowActionContainer(flow_task.Task): def do_abort(self, *args, **kwargs): raise NotImplementedError() + def _fail_action(self, phase, reason=None): + # Fail action and send notification to the user. + # If a reason is given, it will be used to set the status_message. + LOG.error('The workflow engine has failed ' + 'to execute the action: %s', self._db_action.uuid) + kwargs = {} + if reason: + kwargs["status_message"] = (_( + "Action failed in %s: %s") % (phase, reason)) + db_action = self.engine.notify(self._db_action, + objects.action.State.FAILED, + **kwargs) + notifications.action.send_execution_notification( + self.engine.context, db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.ERROR, + priority=fields.NotificationPriority.ERROR) + # NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute, # post_execute) independently. We want to support notifications in base # class, so child's methods should be named with `do_` prefix and wrapped. @@ -181,18 +199,12 @@ class BaseTaskFlowActionContainer(flow_task.Task): notifications.action.send_update( self.engine.context, db_action, old_state=objects.action.State.PENDING) + except exception.WatcherException as e: + LOG.exception(e) + self._fail_action("pre_condition", reason=str(e)) except Exception as e: LOG.exception(e) - status_message = (_( - "Action failed in pre_condition: %s") % str(e)) - db_action = self.engine.notify(self._db_action, - objects.action.State.FAILED, - status_message=status_message) - notifications.action.send_execution_notification( - self.engine.context, db_action, - fields.NotificationAction.EXECUTION, - fields.NotificationPhase.ERROR, - priority=fields.NotificationPriority.ERROR) + self._fail_action("pre_condition", reason=type(e).__name__) def execute(self, *args, **kwargs): action_object = objects.Action.get_by_uuid( @@ -213,20 +225,13 @@ class BaseTaskFlowActionContainer(flow_task.Task): return True else: return False + except exception.WatcherException as e: + LOG.exception(e) + self._fail_action("execute", reason=str(e)) + return False except Exception as e: LOG.exception(e) - LOG.error('The workflow engine has failed ' - 'to execute the action: %s', self.name) - status_message = (_( - "Action failed in execute: %s") % str(e)) - db_action = self.engine.notify(self._db_action, - objects.action.State.FAILED, - status_message=status_message) - notifications.action.send_execution_notification( - self.engine.context, db_action, - fields.NotificationAction.EXECUTION, - fields.NotificationPhase.ERROR, - priority=fields.NotificationPriority.ERROR) + self._fail_action("execute", reason=type(e).__name__) return False def post_execute(self): @@ -236,20 +241,26 @@ class BaseTaskFlowActionContainer(flow_task.Task): return try: self.do_post_execute() + except exception.WatcherException as e: + LOG.exception(e) + # We only add status_message in failed post_condition if the + # action has no status_message yet. Do not override if one + # has been added in the execute method. + if action_object.status_message is None: + reason = str(e) + else: + reason = None + self._fail_action("post_condition", reason=reason) except Exception as e: LOG.exception(e) - kwargs = {} + # We only add status_message in failed post_condition if the + # action has no status_message yet. Do not override if one + # has been added in the execute method. if action_object.status_message is None: - kwargs["status_message"] = (_( - "Action failed in post_condition: %s") % str(e)) - db_action = self.engine.notify(self._db_action, - objects.action.State.FAILED, - **kwargs) - notifications.action.send_execution_notification( - self.engine.context, db_action, - fields.NotificationAction.EXECUTION, - fields.NotificationPhase.ERROR, - priority=fields.NotificationPriority.ERROR) + reason = type(e).__name__ + else: + reason = None + self._fail_action("post_condition", reason=reason) def revert(self, *args, **kwargs): action_plan = objects.ActionPlan.get_by_id( diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index e0f7a6a85..df726d981 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -129,12 +129,15 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): except tf_exception.WrappedFailure as e: if e.check("watcher.common.exception.ActionPlanCancelled"): - raise exception.ActionPlanCancelled + raise exception.ActionPlanCancelled( + uuid=actions[0].action_plan_id) else: - raise exception.WorkflowExecutionException(error=e) + raise exception.WorkflowExecutionException( + error=type(e).__name__) except Exception as e: - raise exception.WorkflowExecutionException(error=e) + raise exception.WorkflowExecutionException( + error=type(e).__name__) class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): diff --git a/watcher/tests/unit/applier/workflow_engine/test_taskflow_action_container.py b/watcher/tests/unit/applier/workflow_engine/test_taskflow_action_container.py index 1518495e3..d4f7f4ca3 100644 --- a/watcher/tests/unit/applier/workflow_engine/test_taskflow_action_container.py +++ b/watcher/tests/unit/applier/workflow_engine/test_taskflow_action_container.py @@ -21,6 +21,7 @@ from unittest import mock from openstack import exceptions as sdk_exc from oslo_config import cfg +from watcher.applier.actions import nop from watcher.applier.workflow_engine import default as tflow from watcher.common import exception from watcher import objects @@ -144,6 +145,33 @@ class TestTaskFlowActionContainer(test_utils.NovaResourcesMixin, obj_action.status_message, "Action failed in pre_condition: Failed in pre_condition") + @mock.patch.object(nop.Nop, "pre_condition") + def test_pre_execute_with_failed_exception(self, m_pre_condition): + # When failed with a non-watcher exception, the status_message + # should only include the type of the exception. + m_pre_condition.side_effect = Exception("Third party exception") + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=self.audit.id, + strategy_id=self.strategy.id, + state=objects.action_plan.State.ONGOING) + action = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + state=objects.action.State.PENDING, + action_type='nop', + input_parameters={'message': 'hello World', + 'fail_pre_condition': True}) + action_container = tflow.TaskFlowActionContainer( + db_action=action, + engine=self.engine) + + action_container.pre_execute() + obj_action = objects.Action.get_by_uuid( + self.engine.context, action.uuid) + self.assertEqual(obj_action.state, objects.action.State.FAILED) + self.assertEqual( + obj_action.status_message, + "Action failed in pre_condition: Exception") + def test_pre_execute_with_skipped(self): action_plan = obj_utils.create_test_action_plan( self.context, audit_id=self.audit.id, @@ -313,3 +341,89 @@ class TestTaskFlowActionContainer(test_utils.NovaResourcesMixin, action_container.revert() mock_do_revert.assert_not_called() + + @mock.patch('watcher.applier.workflow_engine.base.LOG') + @mock.patch('watcher.notifications.action.send_execution_notification') + def test_fail_action_with_reason(self, mock_notification, mock_log): + """Test _fail_action method with phase and reason.""" + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=self.audit.id, + strategy_id=self.strategy.id, + state=objects.action_plan.State.ONGOING) + + action = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + state=objects.action.State.ONGOING, + action_type='nop', + input_parameters={'message': 'hello World'}) + + action_container = tflow.TaskFlowActionContainer( + db_action=action, + engine=self.engine) + + # Call _fail_action with a phase and reason + phase = "execute" + reason = "Test error occurred" + action_container._fail_action(phase, reason=reason) + + # Verify the action state is set to FAILED + obj_action = objects.Action.get_by_uuid( + self.engine.context, action.uuid) + self.assertEqual(obj_action.state, objects.action.State.FAILED) + + # Verify status_message is set correctly + expected_message = f"Action failed in {phase}: {reason}" + self.assertEqual(obj_action.status_message, expected_message) + + # Verify LOG.error was called + mock_log.error.assert_called_once_with( + 'The workflow engine has failed to execute the action: %s', + action.uuid) + + # Verify notification was sent with correct parameters + mock_notification.assert_called_once_with( + self.engine.context, + mock.ANY, # action object - checked separately below + objects.fields.NotificationAction.EXECUTION, + objects.fields.NotificationPhase.ERROR, + priority=objects.fields.NotificationPriority.ERROR + ) + call_args = mock_notification.call_args + self.assertEqual(call_args[0][1].uuid, action.uuid) + self.assertEqual(call_args[0][1].state, objects.action.State.FAILED) + + @mock.patch('watcher.applier.workflow_engine.base.LOG') + @mock.patch('watcher.notifications.action.send_execution_notification') + def test_fail_action_without_reason(self, mock_notification, mock_log): + """Test _fail_action method without a reason.""" + action_plan = obj_utils.create_test_action_plan( + self.context, audit_id=self.audit.id, + strategy_id=self.strategy.id, + state=objects.action_plan.State.ONGOING) + + action = obj_utils.create_test_action( + self.context, action_plan_id=action_plan.id, + state=objects.action.State.ONGOING, + action_type='nop', + input_parameters={'message': 'hello World'}) + + action_container = tflow.TaskFlowActionContainer( + db_action=action, + engine=self.engine) + + # Call _fail_action with only phase, no reason + phase = "pre_condition" + action_container._fail_action(phase) + + # Verify the action state is set to FAILED + obj_action = objects.Action.get_by_uuid( + self.engine.context, action.uuid) + self.assertEqual(obj_action.state, objects.action.State.FAILED) + + # Verify status_message is not set when no reason is provided + self.assertIsNone(obj_action.status_message) + + # Verify LOG.error was called + mock_log.error.assert_called_once_with( + 'The workflow engine has failed to execute the action: %s', + action.uuid)