Merge "Control length of status_message on Action failures"

This commit is contained in:
Zuul
2026-02-26 17:10:20 +00:00
committed by Gerrit Code Review
3 changed files with 164 additions and 36 deletions

View File

@@ -145,6 +145,24 @@ class BaseTaskFlowActionContainer(flow_task.Task):
def do_abort(self, *args, **kwargs):
raise NotImplementedError()
def _fail_action(self, phase, reason=None):
# Fail action and send notification to the user.
# If a reason is given, it will be used to set the status_message.
LOG.error('The workflow engine has failed '
'to execute the action: %s', self._db_action.uuid)
kwargs = {}
if reason:
kwargs["status_message"] = (_(
"Action failed in %s: %s") % (phase, reason))
db_action = self.engine.notify(self._db_action,
objects.action.State.FAILED,
**kwargs)
notifications.action.send_execution_notification(
self.engine.context, db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
# NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute,
# post_execute) independently. We want to support notifications in base
# class, so child's methods should be named with `do_` prefix and wrapped.
@@ -181,18 +199,12 @@ class BaseTaskFlowActionContainer(flow_task.Task):
notifications.action.send_update(
self.engine.context, db_action,
old_state=objects.action.State.PENDING)
except exception.WatcherException as e:
LOG.exception(e)
self._fail_action("pre_condition", reason=str(e))
except Exception as e:
LOG.exception(e)
status_message = (_(
"Action failed in pre_condition: %s") % str(e))
db_action = self.engine.notify(self._db_action,
objects.action.State.FAILED,
status_message=status_message)
notifications.action.send_execution_notification(
self.engine.context, db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
self._fail_action("pre_condition", reason=type(e).__name__)
def execute(self, *args, **kwargs):
action_object = objects.Action.get_by_uuid(
@@ -213,20 +225,13 @@ class BaseTaskFlowActionContainer(flow_task.Task):
return True
else:
return False
except exception.WatcherException as e:
LOG.exception(e)
self._fail_action("execute", reason=str(e))
return False
except Exception as e:
LOG.exception(e)
LOG.error('The workflow engine has failed '
'to execute the action: %s', self.name)
status_message = (_(
"Action failed in execute: %s") % str(e))
db_action = self.engine.notify(self._db_action,
objects.action.State.FAILED,
status_message=status_message)
notifications.action.send_execution_notification(
self.engine.context, db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
self._fail_action("execute", reason=type(e).__name__)
return False
def post_execute(self):
@@ -236,20 +241,26 @@ class BaseTaskFlowActionContainer(flow_task.Task):
return
try:
self.do_post_execute()
except exception.WatcherException as e:
LOG.exception(e)
# We only add status_message in failed post_condition if the
# action has no status_message yet. Do not override if one
# has been added in the execute method.
if action_object.status_message is None:
reason = str(e)
else:
reason = None
self._fail_action("post_condition", reason=reason)
except Exception as e:
LOG.exception(e)
kwargs = {}
# We only add status_message in failed post_condition if the
# action has no status_message yet. Do not override if one
# has been added in the execute method.
if action_object.status_message is None:
kwargs["status_message"] = (_(
"Action failed in post_condition: %s") % str(e))
db_action = self.engine.notify(self._db_action,
objects.action.State.FAILED,
**kwargs)
notifications.action.send_execution_notification(
self.engine.context, db_action,
fields.NotificationAction.EXECUTION,
fields.NotificationPhase.ERROR,
priority=fields.NotificationPriority.ERROR)
reason = type(e).__name__
else:
reason = None
self._fail_action("post_condition", reason=reason)
def revert(self, *args, **kwargs):
action_plan = objects.ActionPlan.get_by_id(

View File

@@ -129,12 +129,15 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
except tf_exception.WrappedFailure as e:
if e.check("watcher.common.exception.ActionPlanCancelled"):
raise exception.ActionPlanCancelled
raise exception.ActionPlanCancelled(
uuid=actions[0].action_plan_id)
else:
raise exception.WorkflowExecutionException(error=e)
raise exception.WorkflowExecutionException(
error=type(e).__name__)
except Exception as e:
raise exception.WorkflowExecutionException(error=e)
raise exception.WorkflowExecutionException(
error=type(e).__name__)
class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):

View File

@@ -21,6 +21,7 @@ from unittest import mock
from openstack import exceptions as sdk_exc
from oslo_config import cfg
from watcher.applier.actions import nop
from watcher.applier.workflow_engine import default as tflow
from watcher.common import exception
from watcher import objects
@@ -144,6 +145,33 @@ class TestTaskFlowActionContainer(test_utils.NovaResourcesMixin,
obj_action.status_message,
"Action failed in pre_condition: Failed in pre_condition")
@mock.patch.object(nop.Nop, "pre_condition")
def test_pre_execute_with_failed_exception(self, m_pre_condition):
# When failed with a non-watcher exception, the status_message
# should only include the type of the exception.
m_pre_condition.side_effect = Exception("Third party exception")
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World',
'fail_pre_condition': True})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.pre_execute()
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
self.assertEqual(
obj_action.status_message,
"Action failed in pre_condition: Exception")
def test_pre_execute_with_skipped(self):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
@@ -313,3 +341,89 @@ class TestTaskFlowActionContainer(test_utils.NovaResourcesMixin,
action_container.revert()
mock_do_revert.assert_not_called()
@mock.patch('watcher.applier.workflow_engine.base.LOG')
@mock.patch('watcher.notifications.action.send_execution_notification')
def test_fail_action_with_reason(self, mock_notification, mock_log):
"""Test _fail_action method with phase and reason."""
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.ONGOING,
action_type='nop',
input_parameters={'message': 'hello World'})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
# Call _fail_action with a phase and reason
phase = "execute"
reason = "Test error occurred"
action_container._fail_action(phase, reason=reason)
# Verify the action state is set to FAILED
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
# Verify status_message is set correctly
expected_message = f"Action failed in {phase}: {reason}"
self.assertEqual(obj_action.status_message, expected_message)
# Verify LOG.error was called
mock_log.error.assert_called_once_with(
'The workflow engine has failed to execute the action: %s',
action.uuid)
# Verify notification was sent with correct parameters
mock_notification.assert_called_once_with(
self.engine.context,
mock.ANY, # action object - checked separately below
objects.fields.NotificationAction.EXECUTION,
objects.fields.NotificationPhase.ERROR,
priority=objects.fields.NotificationPriority.ERROR
)
call_args = mock_notification.call_args
self.assertEqual(call_args[0][1].uuid, action.uuid)
self.assertEqual(call_args[0][1].state, objects.action.State.FAILED)
@mock.patch('watcher.applier.workflow_engine.base.LOG')
@mock.patch('watcher.notifications.action.send_execution_notification')
def test_fail_action_without_reason(self, mock_notification, mock_log):
"""Test _fail_action method without a reason."""
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.ONGOING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.ONGOING,
action_type='nop',
input_parameters={'message': 'hello World'})
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
# Call _fail_action with only phase, no reason
phase = "pre_condition"
action_container._fail_action(phase)
# Verify the action state is set to FAILED
obj_action = objects.Action.get_by_uuid(
self.engine.context, action.uuid)
self.assertEqual(obj_action.state, objects.action.State.FAILED)
# Verify status_message is not set when no reason is provided
self.assertIsNone(obj_action.status_message)
# Verify LOG.error was called
mock_log.error.assert_called_once_with(
'The workflow engine has failed to execute the action: %s',
action.uuid)