Skip actions automatically based on pre_condition results
This patch is implementing skipping automatically actions based on the result of action pre_condition method. This will allow to manage properly situations as migration actions for vms which does not longer exist. This patch includes: - Adding a new state SKIPPED to the Action objects. - Add a new Exception ActionSkipped. An action which raises it from the pre_condition execution is moved to SKIPPED state. - pre_condition will not be executed for any action in SKIPPED state. - execute will not be executed for any action in SKIPPED or FAILED state. - post_condition will not be executed for any action in SKIPPED state. - moving transition to ONGOING from pre_condition to execute. That means that actions raising ActionSkipped will move from PENDING to SKIPPED while actions raising any other Exception will move from PENDING to FAILED. - Adding information on action failed or skipped state to the `status_message` field. - Adding a new option to the testing action nop to simulate skipping on pre_condition, so that we can easily test it. Implements: blueprint add-skip-actions Assisted-By: Cursor (claude-4-sonnet) Change-Id: I59cb4c7006c7c3bcc5ff2071886d3e2929800f9e Signed-off-by: Alfredo Moralejo <amoralej@redhat.com>
This commit is contained in:
@@ -23,6 +23,9 @@ following:
|
|||||||
|
|
||||||
- **PENDING** : the ``Action`` has not been executed yet by the
|
- **PENDING** : the ``Action`` has not been executed yet by the
|
||||||
``Watcher Applier``.
|
``Watcher Applier``.
|
||||||
|
- **SKIPPED** : the ``Action`` will not be executed because a predefined
|
||||||
|
skipping condition is found by ``Watcher Applier`` or is explicitly
|
||||||
|
skipped by the ``Administrator``.
|
||||||
- **ONGOING** : the ``Action`` is currently being processed by the
|
- **ONGOING** : the ``Action`` is currently being processed by the
|
||||||
``Watcher Applier``.
|
``Watcher Applier``.
|
||||||
- **SUCCEEDED** : the ``Action`` has been executed successfully
|
- **SUCCEEDED** : the ``Action`` has been executed successfully
|
||||||
|
|||||||
@@ -384,7 +384,9 @@ following methods of the :ref:`Action <action_definition>` handler:
|
|||||||
|
|
||||||
- **preconditions()**: this method will make sure that all conditions are met
|
- **preconditions()**: this method will make sure that all conditions are met
|
||||||
before executing the action (for example, it makes sure that an instance
|
before executing the action (for example, it makes sure that an instance
|
||||||
still exists before trying to migrate it).
|
still exists before trying to migrate it). If certain predefined conditions
|
||||||
|
are found in this phase, the Action is set to **SKIPPED** state and will
|
||||||
|
not be executed.
|
||||||
- **execute()**: this method is what triggers real commands on other
|
- **execute()**: this method is what triggers real commands on other
|
||||||
OpenStack services (such as Nova, ...) in order to change target resource
|
OpenStack services (such as Nova, ...) in order to change target resource
|
||||||
state. If the action is successfully executed, a notification message is
|
state. If the action is successfully executed, a notification message is
|
||||||
|
|||||||
@@ -37,6 +37,10 @@ be one of the following:
|
|||||||
|
|
||||||
- **PENDING** : the :ref:`Action <action_definition>` has not been executed
|
- **PENDING** : the :ref:`Action <action_definition>` has not been executed
|
||||||
yet by the :ref:`Watcher Applier <watcher_applier_definition>`
|
yet by the :ref:`Watcher Applier <watcher_applier_definition>`
|
||||||
|
- **SKIPPED** : the :ref:`Action<action_definition>` will not be executed
|
||||||
|
because a predefined skipping condition is found by
|
||||||
|
:ref:`Watcher Applier <watcher_applier_definition>` or is explicitly
|
||||||
|
skipped by the :ref:`Administrator <administrator_definition>`.
|
||||||
- **ONGOING** : the :ref:`Action <action_definition>` is currently being
|
- **ONGOING** : the :ref:`Action <action_definition>` is currently being
|
||||||
processed by the :ref:`Watcher Applier <watcher_applier_definition>`
|
processed by the :ref:`Watcher Applier <watcher_applier_definition>`
|
||||||
- **SUCCEEDED** : the :ref:`Action <action_definition>` has been executed
|
- **SUCCEEDED** : the :ref:`Action <action_definition>` has been executed
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
|
from watcher._i18n import _
|
||||||
from watcher.applier.action_plan import base
|
from watcher.applier.action_plan import base
|
||||||
from watcher.applier import default
|
from watcher.applier import default
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
@@ -74,6 +75,14 @@ class DefaultActionPlanHandler(base.BaseActionPlanHandler):
|
|||||||
'priority': fields.NotificationPriority.ERROR
|
'priority': fields.NotificationPriority.ERROR
|
||||||
}
|
}
|
||||||
|
|
||||||
|
skipped_filter = {'action_plan_uuid': self.action_plan_uuid,
|
||||||
|
'state': objects.action.State.SKIPPED}
|
||||||
|
skipped_actions = objects.Action.list(
|
||||||
|
self.ctx, filters=skipped_filter, eager=True)
|
||||||
|
if skipped_actions:
|
||||||
|
status_message = _("One or more actions were skipped.")
|
||||||
|
action_plan.status_message = status_message
|
||||||
|
|
||||||
action_plan.state = ap_state
|
action_plan.state = ap_state
|
||||||
action_plan.save()
|
action_plan.save()
|
||||||
notifications.action_plan.send_action_notification(
|
notifications.action_plan.send_action_notification(
|
||||||
|
|||||||
@@ -47,6 +47,10 @@ class Nop(base.BaseAction):
|
|||||||
'message': {
|
'message': {
|
||||||
'type': ['string', 'null']
|
'type': ['string', 'null']
|
||||||
},
|
},
|
||||||
|
'skip_pre_condition': {
|
||||||
|
'type': 'boolean',
|
||||||
|
'default': False
|
||||||
|
},
|
||||||
'fail_pre_condition': {
|
'fail_pre_condition': {
|
||||||
'type': 'boolean',
|
'type': 'boolean',
|
||||||
'default': False
|
'default': False
|
||||||
@@ -82,6 +86,8 @@ class Nop(base.BaseAction):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def pre_condition(self):
|
def pre_condition(self):
|
||||||
|
if self.input_parameters.get('skip_pre_condition'):
|
||||||
|
raise exception.ActionSkipped("Skipped in pre_condition")
|
||||||
if self.input_parameters.get('fail_pre_condition'):
|
if self.input_parameters.get('fail_pre_condition'):
|
||||||
raise exception.WatcherException("Failed in pre_condition")
|
raise exception.WatcherException("Failed in pre_condition")
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import eventlet
|
|||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from taskflow import task as flow_task
|
from taskflow import task as flow_task
|
||||||
|
|
||||||
|
from watcher._i18n import _
|
||||||
from watcher.applier.actions import factory
|
from watcher.applier.actions import factory
|
||||||
from watcher.common import clients
|
from watcher.common import clients
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
@@ -85,10 +86,12 @@ class BaseWorkFlowEngine(loadable.Loadable, metaclass=abc.ABCMeta):
|
|||||||
def action_factory(self):
|
def action_factory(self):
|
||||||
return self._action_factory
|
return self._action_factory
|
||||||
|
|
||||||
def notify(self, action, state):
|
def notify(self, action, state, status_message=None):
|
||||||
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
|
db_action = objects.Action.get_by_uuid(self.context, action.uuid,
|
||||||
eager=True)
|
eager=True)
|
||||||
db_action.state = state
|
db_action.state = state
|
||||||
|
if status_message:
|
||||||
|
db_action.status_message = status_message
|
||||||
db_action.save()
|
db_action.save()
|
||||||
return db_action
|
return db_action
|
||||||
|
|
||||||
@@ -161,6 +164,10 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
|||||||
self.engine.context, self._db_action.action_plan_id)
|
self.engine.context, self._db_action.action_plan_id)
|
||||||
if action_plan.state in CANCEL_STATE:
|
if action_plan.state in CANCEL_STATE:
|
||||||
raise exception.ActionPlanCancelled(uuid=action_plan.uuid)
|
raise exception.ActionPlanCancelled(uuid=action_plan.uuid)
|
||||||
|
if self._db_action.state == objects.action.State.SKIPPED:
|
||||||
|
LOG.debug("Action %s is skipped manually",
|
||||||
|
self._db_action.uuid)
|
||||||
|
return
|
||||||
db_action = self.do_pre_execute()
|
db_action = self.do_pre_execute()
|
||||||
notifications.action.send_execution_notification(
|
notifications.action.send_execution_notification(
|
||||||
self.engine.context, db_action,
|
self.engine.context, db_action,
|
||||||
@@ -170,10 +177,24 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
|||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
self.engine.notify_cancel_start(action_plan.uuid)
|
self.engine.notify_cancel_start(action_plan.uuid)
|
||||||
raise
|
raise
|
||||||
|
except exception.ActionSkipped as e:
|
||||||
|
LOG.info("Action %s was skipped automatically: %s",
|
||||||
|
self._db_action.uuid, str(e))
|
||||||
|
status_message = (_(
|
||||||
|
"Action was skipped automatically: %s") % str(e))
|
||||||
|
db_action = self.engine.notify(self._db_action,
|
||||||
|
objects.action.State.SKIPPED,
|
||||||
|
status_message=status_message)
|
||||||
|
notifications.action.send_update(
|
||||||
|
self.engine.context, db_action,
|
||||||
|
old_state=objects.action.State.PENDING)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
|
status_message = (_(
|
||||||
|
"Action failed in pre_condition: %s") % str(e))
|
||||||
db_action = self.engine.notify(self._db_action,
|
db_action = self.engine.notify(self._db_action,
|
||||||
objects.action.State.FAILED)
|
objects.action.State.FAILED,
|
||||||
|
status_message=status_message)
|
||||||
notifications.action.send_execution_notification(
|
notifications.action.send_execution_notification(
|
||||||
self.engine.context, db_action,
|
self.engine.context, db_action,
|
||||||
fields.NotificationAction.EXECUTION,
|
fields.NotificationAction.EXECUTION,
|
||||||
@@ -181,6 +202,12 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
|||||||
priority=fields.NotificationPriority.ERROR)
|
priority=fields.NotificationPriority.ERROR)
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
def execute(self, *args, **kwargs):
|
||||||
|
action_object = objects.Action.get_by_uuid(
|
||||||
|
self.engine.context, self._db_action.uuid, eager=True)
|
||||||
|
if action_object.state in [objects.action.State.SKIPPED,
|
||||||
|
objects.action.State.FAILED]:
|
||||||
|
return True
|
||||||
|
|
||||||
def _do_execute_action(*args, **kwargs):
|
def _do_execute_action(*args, **kwargs):
|
||||||
try:
|
try:
|
||||||
db_action = self.do_execute(*args, **kwargs)
|
db_action = self.do_execute(*args, **kwargs)
|
||||||
@@ -192,8 +219,11 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
|||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
LOG.error('The workflow engine has failed '
|
LOG.error('The workflow engine has failed '
|
||||||
'to execute the action: %s', self.name)
|
'to execute the action: %s', self.name)
|
||||||
|
status_message = (_(
|
||||||
|
"Action failed in execute: %s") % str(e))
|
||||||
db_action = self.engine.notify(self._db_action,
|
db_action = self.engine.notify(self._db_action,
|
||||||
objects.action.State.FAILED)
|
objects.action.State.FAILED,
|
||||||
|
status_message=status_message)
|
||||||
notifications.action.send_execution_notification(
|
notifications.action.send_execution_notification(
|
||||||
self.engine.context, db_action,
|
self.engine.context, db_action,
|
||||||
fields.NotificationAction.EXECUTION,
|
fields.NotificationAction.EXECUTION,
|
||||||
@@ -243,12 +273,21 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def post_execute(self):
|
def post_execute(self):
|
||||||
|
action_object = objects.Action.get_by_uuid(
|
||||||
|
self.engine.context, self._db_action.uuid, eager=True)
|
||||||
|
if action_object.state == objects.action.State.SKIPPED:
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
self.do_post_execute()
|
self.do_post_execute()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
|
kwargs = {}
|
||||||
|
if action_object.status_message is None:
|
||||||
|
kwargs["status_message"] = (_(
|
||||||
|
"Action failed in post_condition: %s") % str(e))
|
||||||
db_action = self.engine.notify(self._db_action,
|
db_action = self.engine.notify(self._db_action,
|
||||||
objects.action.State.FAILED)
|
objects.action.State.FAILED,
|
||||||
|
**kwargs)
|
||||||
notifications.action.send_execution_notification(
|
notifications.action.send_execution_notification(
|
||||||
self.engine.context, db_action,
|
self.engine.context, db_action,
|
||||||
fields.NotificationAction.EXECUTION,
|
fields.NotificationAction.EXECUTION,
|
||||||
|
|||||||
@@ -137,14 +137,13 @@ class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
|
|||||||
engine)
|
engine)
|
||||||
|
|
||||||
def do_pre_execute(self):
|
def do_pre_execute(self):
|
||||||
db_action = self.engine.notify(self._db_action,
|
|
||||||
objects.action.State.ONGOING)
|
|
||||||
LOG.debug("Pre-condition action: %s", self.name)
|
LOG.debug("Pre-condition action: %s", self.name)
|
||||||
self.action.pre_condition()
|
self.action.pre_condition()
|
||||||
return db_action
|
return self._db_action
|
||||||
|
|
||||||
def do_execute(self, *args, **kwargs):
|
def do_execute(self, *args, **kwargs):
|
||||||
LOG.debug("Running action: %s", self.name)
|
LOG.debug("Running action: %s", self.name)
|
||||||
|
self.engine.notify(self._db_action, objects.action.State.ONGOING)
|
||||||
|
|
||||||
# NOTE:Some actions(such as migrate) will return None when exception
|
# NOTE:Some actions(such as migrate) will return None when exception
|
||||||
# Only when True is returned, the action state is set to SUCCEEDED
|
# Only when True is returned, the action state is set to SUCCEEDED
|
||||||
|
|||||||
@@ -278,6 +278,10 @@ class AuditReferenced(Invalid):
|
|||||||
"plans")
|
"plans")
|
||||||
|
|
||||||
|
|
||||||
|
class ActionSkipped(WatcherException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ActionPlanNotFound(ResourceNotFound):
|
class ActionPlanNotFound(ResourceNotFound):
|
||||||
msg_fmt = _("ActionPlan %(action_plan)s could not be found")
|
msg_fmt = _("ActionPlan %(action_plan)s could not be found")
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ class State(object):
|
|||||||
DELETED = 'DELETED'
|
DELETED = 'DELETED'
|
||||||
CANCELLED = 'CANCELLED'
|
CANCELLED = 'CANCELLED'
|
||||||
CANCELLING = 'CANCELLING'
|
CANCELLING = 'CANCELLING'
|
||||||
|
SKIPPED = 'SKIPPED'
|
||||||
|
|
||||||
|
|
||||||
@base.WatcherObjectRegistry.register
|
@base.WatcherObjectRegistry.register
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from unittest import mock
|
|||||||
from watcher.applier.action_plan import default
|
from watcher.applier.action_plan import default
|
||||||
from watcher.applier import default as ap_applier
|
from watcher.applier import default as ap_applier
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
|
from watcher.common import utils
|
||||||
from watcher import notifications
|
from watcher import notifications
|
||||||
from watcher import objects
|
from watcher import objects
|
||||||
from watcher.objects import action_plan as ap_objects
|
from watcher.objects import action_plan as ap_objects
|
||||||
@@ -151,3 +152,70 @@ class TestDefaultActionPlanHandler(base.DbTestCase):
|
|||||||
self.m_action_plan_notifications
|
self.m_action_plan_notifications
|
||||||
.send_action_notification
|
.send_action_notification
|
||||||
.call_args_list)
|
.call_args_list)
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||||
|
def test_launch_action_plan_skipped_actions(self,
|
||||||
|
m_get_action_plan):
|
||||||
|
m_get_action_plan.return_value = self.action_plan
|
||||||
|
skipped_action = obj_utils.create_test_action(
|
||||||
|
self.context, action_plan_id=self.action_plan.id,
|
||||||
|
action_type='nop',
|
||||||
|
uuid=utils.generate_uuid(),
|
||||||
|
input_parameters={'message': 'hello World',
|
||||||
|
'skip_pre_condition': True})
|
||||||
|
command = default.DefaultActionPlanHandler(
|
||||||
|
self.context, mock.MagicMock(), self.action_plan.uuid)
|
||||||
|
command.execute()
|
||||||
|
expected_calls = [
|
||||||
|
mock.call(self.context, self.action_plan,
|
||||||
|
action=objects.fields.NotificationAction.EXECUTION,
|
||||||
|
phase=objects.fields.NotificationPhase.START),
|
||||||
|
mock.call(self.context, self.action_plan,
|
||||||
|
action=objects.fields.NotificationAction.EXECUTION,
|
||||||
|
phase=objects.fields.NotificationPhase.END)
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
self.action.get_by_uuid(self.context, skipped_action.uuid).state,
|
||||||
|
objects.action.State.SKIPPED)
|
||||||
|
self.assertEqual(ap_objects.State.SUCCEEDED, self.action_plan.state)
|
||||||
|
self.assertEqual(self.action_plan.status_message,
|
||||||
|
"One or more actions were skipped.")
|
||||||
|
self.assertEqual(
|
||||||
|
expected_calls,
|
||||||
|
self.m_action_plan_notifications
|
||||||
|
.send_action_notification
|
||||||
|
.call_args_list)
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||||
|
def test_launch_action_plan_manual_skipped_actions(self,
|
||||||
|
m_get_action_plan):
|
||||||
|
m_get_action_plan.return_value = self.action_plan
|
||||||
|
skipped_action = obj_utils.create_test_action(
|
||||||
|
self.context, action_plan_id=self.action_plan.id,
|
||||||
|
action_type='nop',
|
||||||
|
uuid=utils.generate_uuid(),
|
||||||
|
state=objects.action.State.SKIPPED,
|
||||||
|
input_parameters={'message': 'hello World'})
|
||||||
|
command = default.DefaultActionPlanHandler(
|
||||||
|
self.context, mock.MagicMock(), self.action_plan.uuid)
|
||||||
|
command.execute()
|
||||||
|
expected_calls = [
|
||||||
|
mock.call(self.context, self.action_plan,
|
||||||
|
action=objects.fields.NotificationAction.EXECUTION,
|
||||||
|
phase=objects.fields.NotificationPhase.START),
|
||||||
|
mock.call(self.context, self.action_plan,
|
||||||
|
action=objects.fields.NotificationAction.EXECUTION,
|
||||||
|
phase=objects.fields.NotificationPhase.END)
|
||||||
|
]
|
||||||
|
self.assertEqual(
|
||||||
|
self.action.get_by_uuid(self.context, skipped_action.uuid).state,
|
||||||
|
objects.action.State.SKIPPED)
|
||||||
|
self.assertEqual(ap_objects.State.SUCCEEDED, self.action_plan.state)
|
||||||
|
self.assertEqual(self.action_plan.status_message,
|
||||||
|
"One or more actions were skipped.")
|
||||||
|
self.assertEqual(
|
||||||
|
expected_calls,
|
||||||
|
self.m_action_plan_notifications
|
||||||
|
.send_action_notification
|
||||||
|
.call_args_list)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from unittest import mock
|
|||||||
|
|
||||||
from watcher.applier.actions import base as abase
|
from watcher.applier.actions import base as abase
|
||||||
from watcher.applier.actions import factory
|
from watcher.applier.actions import factory
|
||||||
|
from watcher.applier.actions import nop
|
||||||
from watcher.applier.workflow_engine import default as tflow
|
from watcher.applier.workflow_engine import default as tflow
|
||||||
from watcher.common import exception
|
from watcher.common import exception
|
||||||
from watcher.common import utils
|
from watcher.common import utils
|
||||||
@@ -99,6 +100,27 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
|||||||
for a in actions:
|
for a in actions:
|
||||||
self.check_action_state(a, expected_state)
|
self.check_action_state(a, expected_state)
|
||||||
|
|
||||||
|
def check_notifications_contains(self, notification_calls, action_state,
|
||||||
|
old_state=None):
|
||||||
|
"""Check that an action notification contains the expected info.
|
||||||
|
|
||||||
|
notification_calls: list of notification calls arguments
|
||||||
|
action_state: expected action state (dict)
|
||||||
|
old_state: expected old action state (optional)
|
||||||
|
"""
|
||||||
|
if old_state:
|
||||||
|
action_state['old_state'] = old_state
|
||||||
|
for call in notification_calls:
|
||||||
|
data_dict = call.args[1].as_dict()
|
||||||
|
if call.kwargs and 'old_state' in call.kwargs:
|
||||||
|
data_dict['old_state'] = call.kwargs['old_state']
|
||||||
|
try:
|
||||||
|
self.assertLessEqual(action_state.items(), data_dict.items())
|
||||||
|
return True
|
||||||
|
except AssertionError:
|
||||||
|
continue
|
||||||
|
return False
|
||||||
|
|
||||||
@mock.patch('taskflow.engines.load')
|
@mock.patch('taskflow.engines.load')
|
||||||
@mock.patch('taskflow.patterns.graph_flow.Flow.link')
|
@mock.patch('taskflow.patterns.graph_flow.Flow.link')
|
||||||
def test_execute_with_no_actions(self, graph_flow, engines):
|
def test_execute_with_no_actions(self, graph_flow, engines):
|
||||||
@@ -330,6 +352,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
|||||||
|
|
||||||
self.engine.execute(actions)
|
self.engine.execute(actions)
|
||||||
self.check_action_state(actions[0], objects.action.State.FAILED)
|
self.check_action_state(actions[0], objects.action.State.FAILED)
|
||||||
|
self.assertTrue(self.check_notifications_contains(
|
||||||
|
m_send_update.call_args_list,
|
||||||
|
{
|
||||||
|
'state': objects.action.State.FAILED,
|
||||||
|
'uuid': actions[0].uuid,
|
||||||
|
'action_type': 'fake_action',
|
||||||
|
'status_message': "Action failed in execute: The action %s "
|
||||||
|
"execution failed." % actions[0].uuid,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||||
def test_execute_with_action_plan_cancel(self, m_get_actionplan):
|
def test_execute_with_action_plan_cancel(self, m_get_actionplan):
|
||||||
@@ -370,6 +402,187 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
self.fail(exc)
|
self.fail(exc)
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||||
|
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||||
|
@mock.patch.object(notifications.action, 'send_update')
|
||||||
|
@mock.patch.object(nop.Nop, 'debug_message')
|
||||||
|
def test_execute_with_automatic_skipped(self, m_nop_message,
|
||||||
|
m_send_update, m_execution,
|
||||||
|
m_get_actionplan):
|
||||||
|
|
||||||
|
obj_utils.create_test_goal(self.context)
|
||||||
|
strategy = obj_utils.create_test_strategy(self.context)
|
||||||
|
audit = obj_utils.create_test_audit(
|
||||||
|
self.context, strategy_id=strategy.id)
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=audit.id,
|
||||||
|
strategy_id=strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING,
|
||||||
|
id=0)
|
||||||
|
m_get_actionplan.return_value = action_plan
|
||||||
|
actions = []
|
||||||
|
|
||||||
|
action = self.create_action("nop", {'message': 'action2',
|
||||||
|
'skip_pre_condition': True})
|
||||||
|
|
||||||
|
self.check_action_state(action, objects.action.State.PENDING)
|
||||||
|
|
||||||
|
actions.append(action)
|
||||||
|
|
||||||
|
self.engine.execute(actions)
|
||||||
|
|
||||||
|
# action skipped automatically in the pre_condition phase
|
||||||
|
self.check_action_state(action, objects.action.State.SKIPPED)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.Action.get_by_uuid(
|
||||||
|
self.context, action.uuid).status_message,
|
||||||
|
"Action was skipped automatically: Skipped in pre_condition")
|
||||||
|
action_state_dict = {
|
||||||
|
'state': objects.action.State.SKIPPED,
|
||||||
|
'status_message': "Action was skipped automatically: "
|
||||||
|
"Skipped in pre_condition",
|
||||||
|
'uuid': action.uuid,
|
||||||
|
'action_type': 'nop',
|
||||||
|
}
|
||||||
|
self.assertTrue(self.check_notifications_contains(
|
||||||
|
m_send_update.call_args_list, action_state_dict))
|
||||||
|
self.assertTrue(self.check_notifications_contains(
|
||||||
|
m_send_update.call_args_list, action_state_dict,
|
||||||
|
old_state=objects.action.State.PENDING))
|
||||||
|
|
||||||
|
m_nop_message.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||||
|
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||||
|
@mock.patch.object(notifications.action, 'send_update')
|
||||||
|
@mock.patch.object(nop.Nop, 'debug_message')
|
||||||
|
@mock.patch.object(nop.Nop, 'pre_condition')
|
||||||
|
def test_execute_with_manually_skipped(self, m_nop_pre_condition,
|
||||||
|
m_nop_message,
|
||||||
|
m_send_update, m_execution,
|
||||||
|
m_get_actionplan):
|
||||||
|
obj_utils.create_test_goal(self.context)
|
||||||
|
strategy = obj_utils.create_test_strategy(self.context)
|
||||||
|
audit = obj_utils.create_test_audit(
|
||||||
|
self.context, strategy_id=strategy.id)
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=audit.id,
|
||||||
|
strategy_id=strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING,
|
||||||
|
id=0)
|
||||||
|
m_get_actionplan.return_value = action_plan
|
||||||
|
actions = []
|
||||||
|
action1 = obj_utils.create_test_action(
|
||||||
|
self.context,
|
||||||
|
action_type='nop',
|
||||||
|
state=objects.action.State.PENDING,
|
||||||
|
input_parameters={'message': 'action1'})
|
||||||
|
action2 = obj_utils.create_test_action(
|
||||||
|
self.context,
|
||||||
|
action_type='nop',
|
||||||
|
state=objects.action.State.SKIPPED,
|
||||||
|
uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19',
|
||||||
|
input_parameters={'message': 'action2'})
|
||||||
|
self.check_action_state(action1, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action2, objects.action.State.SKIPPED)
|
||||||
|
actions.append(action1)
|
||||||
|
actions.append(action2)
|
||||||
|
self.engine.execute(actions)
|
||||||
|
# action skipped automatically in the pre_condition phase
|
||||||
|
self.check_action_state(action1, objects.action.State.SUCCEEDED)
|
||||||
|
self.check_action_state(action2, objects.action.State.SKIPPED)
|
||||||
|
# pre_condition and execute are only called for action1
|
||||||
|
m_nop_pre_condition.assert_called_once_with()
|
||||||
|
m_nop_message.assert_called_once_with('action1')
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||||
|
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||||
|
@mock.patch.object(notifications.action, 'send_update')
|
||||||
|
@mock.patch.object(nop.Nop, 'debug_message')
|
||||||
|
def test_execute_different_action_results(self, m_nop_message,
|
||||||
|
m_send_update, m_execution,
|
||||||
|
m_get_actionplan):
|
||||||
|
|
||||||
|
obj_utils.create_test_goal(self.context)
|
||||||
|
strategy = obj_utils.create_test_strategy(self.context)
|
||||||
|
audit = obj_utils.create_test_audit(
|
||||||
|
self.context, strategy_id=strategy.id)
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=audit.id,
|
||||||
|
strategy_id=strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING,
|
||||||
|
id=0)
|
||||||
|
m_get_actionplan.return_value = action_plan
|
||||||
|
actions = []
|
||||||
|
|
||||||
|
action1 = self.create_action("nop", {'message': 'action1'})
|
||||||
|
action2 = self.create_action("nop", {'message': 'action2',
|
||||||
|
'skip_pre_condition': True})
|
||||||
|
action3 = self.create_action("nop", {'message': 'action3',
|
||||||
|
'fail_pre_condition': True})
|
||||||
|
action4 = self.create_action("nop", {'message': 'action4',
|
||||||
|
'fail_execute': True})
|
||||||
|
action5 = self.create_action("nop", {'message': 'action5',
|
||||||
|
'fail_post_condition': True})
|
||||||
|
action6 = self.create_action("sleep", {'duration': 1.0})
|
||||||
|
|
||||||
|
self.check_action_state(action1, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action2, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action3, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action4, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action5, objects.action.State.PENDING)
|
||||||
|
self.check_action_state(action6, objects.action.State.PENDING)
|
||||||
|
|
||||||
|
actions.append(action1)
|
||||||
|
actions.append(action2)
|
||||||
|
actions.append(action3)
|
||||||
|
actions.append(action4)
|
||||||
|
actions.append(action5)
|
||||||
|
actions.append(action6)
|
||||||
|
|
||||||
|
self.engine.execute(actions)
|
||||||
|
|
||||||
|
# successful nop action
|
||||||
|
self.check_action_state(action1, objects.action.State.SUCCEEDED)
|
||||||
|
self.assertIsNone(
|
||||||
|
objects.Action.get_by_uuid(self.context, action1.uuid)
|
||||||
|
.status_message)
|
||||||
|
# action skipped automatically in the pre_condition phase
|
||||||
|
self.check_action_state(action2, objects.action.State.SKIPPED)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.Action.get_by_uuid(
|
||||||
|
self.context, action2.uuid).status_message,
|
||||||
|
"Action was skipped automatically: Skipped in pre_condition")
|
||||||
|
# action failed in the pre_condition phase
|
||||||
|
self.check_action_state(action3, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.Action.get_by_uuid(
|
||||||
|
self.context, action3.uuid).status_message,
|
||||||
|
"Action failed in pre_condition: Failed in pre_condition")
|
||||||
|
# action failed in the execute phase
|
||||||
|
self.check_action_state(action4, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.Action.get_by_uuid(
|
||||||
|
self.context, action4.uuid).status_message,
|
||||||
|
"Action failed in execute: The action %s execution failed."
|
||||||
|
% action4.uuid)
|
||||||
|
# action failed in the post_condition phase
|
||||||
|
self.check_action_state(action5, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(
|
||||||
|
objects.Action.get_by_uuid(
|
||||||
|
self.context, action5.uuid).status_message,
|
||||||
|
"Action failed in post_condition: Failed in post_condition")
|
||||||
|
# successful sleep action
|
||||||
|
self.check_action_state(action6, objects.action.State.SUCCEEDED)
|
||||||
|
|
||||||
|
# execute method should not be called for actions that are skipped of
|
||||||
|
# failed in the pre_condition phase
|
||||||
|
expected_execute_calls = [mock.call('action1'),
|
||||||
|
mock.call('action4'),
|
||||||
|
mock.call('action5')]
|
||||||
|
m_nop_message.assert_has_calls(expected_execute_calls, any_order=True)
|
||||||
|
self.assertEqual(m_nop_message.call_count, 3)
|
||||||
|
|
||||||
def test_decider(self):
|
def test_decider(self):
|
||||||
# execution_rule is ALWAYS
|
# execution_rule is ALWAYS
|
||||||
self.engine.execution_rule = 'ALWAYS'
|
self.engine.execution_rule = 'ALWAYS'
|
||||||
@@ -386,3 +599,72 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
|||||||
|
|
||||||
history = {'action1': False}
|
history = {'action1': False}
|
||||||
self.assertTrue(self.engine.decider(history))
|
self.assertTrue(self.engine.decider(history))
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||||
|
def test_notify_with_status_message(self, m_get_actionplan):
|
||||||
|
"""Test that notify method properly handles status_message."""
|
||||||
|
obj_utils.create_test_goal(self.context)
|
||||||
|
strategy = obj_utils.create_test_strategy(self.context)
|
||||||
|
audit = obj_utils.create_test_audit(
|
||||||
|
self.context, strategy_id=strategy.id)
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=audit.id,
|
||||||
|
strategy_id=strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING)
|
||||||
|
action1 = obj_utils.create_test_action(
|
||||||
|
self.context, action_plan_id=action_plan.id,
|
||||||
|
action_type='nop', state=objects.action.State.ONGOING,
|
||||||
|
input_parameters={'message': 'hello World'})
|
||||||
|
m_get_actionplan.return_value = action_plan
|
||||||
|
actions = []
|
||||||
|
actions.append(action1)
|
||||||
|
|
||||||
|
# Test notify with status_message provided
|
||||||
|
test_status_message = "Action completed successfully"
|
||||||
|
result = self.engine.notify(action1, objects.action.State.FAILED,
|
||||||
|
status_message=test_status_message)
|
||||||
|
|
||||||
|
# Verify the action state was updated
|
||||||
|
self.assertEqual(result.state, objects.action.State.FAILED)
|
||||||
|
|
||||||
|
# Verify the status_message was set
|
||||||
|
self.assertEqual(result.status_message, test_status_message)
|
||||||
|
|
||||||
|
# Verify the changes were persisted to the database
|
||||||
|
persisted_action = objects.Action.get_by_uuid(
|
||||||
|
self.context, action1.uuid)
|
||||||
|
self.assertEqual(persisted_action.state, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(persisted_action.status_message, test_status_message)
|
||||||
|
|
||||||
|
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||||
|
def test_notify_without_status_message(self, m_get_actionplan):
|
||||||
|
"""Test that notify method works without status_message parameter."""
|
||||||
|
obj_utils.create_test_goal(self.context)
|
||||||
|
strategy = obj_utils.create_test_strategy(self.context)
|
||||||
|
audit = obj_utils.create_test_audit(
|
||||||
|
self.context, strategy_id=strategy.id)
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=audit.id,
|
||||||
|
strategy_id=strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING)
|
||||||
|
action1 = obj_utils.create_test_action(
|
||||||
|
self.context, action_plan_id=action_plan.id,
|
||||||
|
action_type='nop', state=objects.action.State.ONGOING,
|
||||||
|
input_parameters={'message': 'hello World'})
|
||||||
|
m_get_actionplan.return_value = action_plan
|
||||||
|
actions = []
|
||||||
|
actions.append(action1)
|
||||||
|
|
||||||
|
# Test notify without status_message
|
||||||
|
result = self.engine.notify(action1, objects.action.State.SUCCEEDED)
|
||||||
|
# Verify the action state was updated
|
||||||
|
self.assertEqual(result.state, objects.action.State.SUCCEEDED)
|
||||||
|
|
||||||
|
# Verify the status_message
|
||||||
|
self.assertIsNone(result.status_message)
|
||||||
|
# Verify the changes were persisted to the database
|
||||||
|
persisted_action = objects.Action.get_by_uuid(
|
||||||
|
self.context, action1.uuid)
|
||||||
|
self.assertEqual(persisted_action.state,
|
||||||
|
objects.action.State.SUCCEEDED)
|
||||||
|
self.assertIsNone(persisted_action.status_message)
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
|||||||
self.assertEqual(obj_action.state, objects.action.State.SUCCEEDED)
|
self.assertEqual(obj_action.state, objects.action.State.SUCCEEDED)
|
||||||
|
|
||||||
@mock.patch.object(clients.OpenStackClients, 'nova', mock.Mock())
|
@mock.patch.object(clients.OpenStackClients, 'nova', mock.Mock())
|
||||||
def test_execute_with_failed(self):
|
def test_execute_with_failed_execute(self):
|
||||||
nova_util = nova_helper.NovaHelper()
|
nova_util = nova_helper.NovaHelper()
|
||||||
instance = "31b9dd5c-b1fd-4f61-9b68-a47096326dac"
|
instance = "31b9dd5c-b1fd-4f61-9b68-a47096326dac"
|
||||||
nova_util.nova.servers.get.return_value = instance
|
nova_util.nova.servers.get.return_value = instance
|
||||||
@@ -90,8 +90,11 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
|||||||
obj_action = objects.Action.get_by_uuid(
|
obj_action = objects.Action.get_by_uuid(
|
||||||
self.engine.context, action.uuid)
|
self.engine.context, action.uuid)
|
||||||
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(obj_action.status_message, "Action failed in execute:"
|
||||||
|
" The action 10a47dd1-4874-4298-91cf-eff046dbdb8d "
|
||||||
|
"execution failed.")
|
||||||
|
|
||||||
def test_execute_with_failed_execute(self):
|
def test_pre_execute(self):
|
||||||
action_plan = obj_utils.create_test_action_plan(
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
self.context, audit_id=self.audit.id,
|
self.context, audit_id=self.audit.id,
|
||||||
strategy_id=self.strategy.id,
|
strategy_id=self.strategy.id,
|
||||||
@@ -100,15 +103,16 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
|||||||
self.context, action_plan_id=action_plan.id,
|
self.context, action_plan_id=action_plan.id,
|
||||||
state=objects.action.State.PENDING,
|
state=objects.action.State.PENDING,
|
||||||
action_type='nop',
|
action_type='nop',
|
||||||
input_parameters={'message': 'hello World',
|
input_parameters={'message': 'hello World'})
|
||||||
'fail_execute': True})
|
|
||||||
action_container = tflow.TaskFlowActionContainer(
|
action_container = tflow.TaskFlowActionContainer(
|
||||||
db_action=action,
|
db_action=action,
|
||||||
engine=self.engine)
|
engine=self.engine)
|
||||||
action_container.execute()
|
|
||||||
|
action_container.pre_execute()
|
||||||
obj_action = objects.Action.get_by_uuid(
|
obj_action = objects.Action.get_by_uuid(
|
||||||
self.engine.context, action.uuid)
|
self.engine.context, action.uuid)
|
||||||
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
self.assertEqual(obj_action.state, objects.action.State.PENDING)
|
||||||
|
self.assertIsNone(obj_action.status_message)
|
||||||
|
|
||||||
def test_pre_execute_with_failed_pre_condition(self):
|
def test_pre_execute_with_failed_pre_condition(self):
|
||||||
action_plan = obj_utils.create_test_action_plan(
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
@@ -124,10 +128,37 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
|||||||
action_container = tflow.TaskFlowActionContainer(
|
action_container = tflow.TaskFlowActionContainer(
|
||||||
db_action=action,
|
db_action=action,
|
||||||
engine=self.engine)
|
engine=self.engine)
|
||||||
|
|
||||||
action_container.pre_execute()
|
action_container.pre_execute()
|
||||||
obj_action = objects.Action.get_by_uuid(
|
obj_action = objects.Action.get_by_uuid(
|
||||||
self.engine.context, action.uuid)
|
self.engine.context, action.uuid)
|
||||||
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(
|
||||||
|
obj_action.status_message,
|
||||||
|
"Action failed in pre_condition: Failed in pre_condition")
|
||||||
|
|
||||||
|
def test_pre_execute_with_skipped(self):
|
||||||
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
|
self.context, audit_id=self.audit.id,
|
||||||
|
strategy_id=self.strategy.id,
|
||||||
|
state=objects.action_plan.State.ONGOING)
|
||||||
|
action = obj_utils.create_test_action(
|
||||||
|
self.context, action_plan_id=action_plan.id,
|
||||||
|
state=objects.action.State.PENDING,
|
||||||
|
action_type='nop',
|
||||||
|
input_parameters={'message': 'hello World',
|
||||||
|
'skip_pre_condition': True})
|
||||||
|
action_container = tflow.TaskFlowActionContainer(
|
||||||
|
db_action=action,
|
||||||
|
engine=self.engine)
|
||||||
|
|
||||||
|
action_container.pre_execute()
|
||||||
|
obj_action = objects.Action.get_by_uuid(
|
||||||
|
self.engine.context, action.uuid)
|
||||||
|
self.assertEqual(obj_action.state, objects.action.State.SKIPPED)
|
||||||
|
self.assertEqual(obj_action.status_message,
|
||||||
|
"Action was skipped automatically: "
|
||||||
|
"Skipped in pre_condition")
|
||||||
|
|
||||||
def test_post_execute_with_failed_post_condition(self):
|
def test_post_execute_with_failed_post_condition(self):
|
||||||
action_plan = obj_utils.create_test_action_plan(
|
action_plan = obj_utils.create_test_action_plan(
|
||||||
@@ -143,10 +174,14 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
|||||||
action_container = tflow.TaskFlowActionContainer(
|
action_container = tflow.TaskFlowActionContainer(
|
||||||
db_action=action,
|
db_action=action,
|
||||||
engine=self.engine)
|
engine=self.engine)
|
||||||
|
|
||||||
action_container.post_execute()
|
action_container.post_execute()
|
||||||
obj_action = objects.Action.get_by_uuid(
|
obj_action = objects.Action.get_by_uuid(
|
||||||
self.engine.context, action.uuid)
|
self.engine.context, action.uuid)
|
||||||
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
self.assertEqual(obj_action.state, objects.action.State.FAILED)
|
||||||
|
self.assertEqual(
|
||||||
|
obj_action.status_message,
|
||||||
|
"Action failed in post_condition: Failed in post_condition")
|
||||||
|
|
||||||
@mock.patch('eventlet.spawn')
|
@mock.patch('eventlet.spawn')
|
||||||
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
|
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
|
||||||
|
|||||||
Reference in New Issue
Block a user