Homogenize ActionPlans cancel behavior in threading and eventlet mode
Currently, the applier implements different behaviors when cancelling an action if using threading or eventlet mode. In eventlet mode and depending on the action type, it can try to abort actions which are in ONGOING state. However, in threading mode, it will always wait for the ONGOING actions. This patch is homogenizing the behavior on Cancelling action-plans. Now, it will wait for the running task to complete and cancel all pending actions. This is also simplifying the implementation and cleaning the current mechanism based on killing a separate thread which has scalability and reliability issues. Implements: blueprint remove-cancel-ongoing-actions Change-Id: I7058b32b762de03455d1b082c3cbd5849314350a Signed-off-by: Alfredo Moralejo <amoralej@redhat.com>
This commit is contained in:
@@ -467,7 +467,10 @@ state may be one of the following:
|
||||
not returned any more through the Watcher APIs.
|
||||
- **CANCELLED** : the :ref:`Action Plan <action_plan_definition>` was in
|
||||
**RECOMMENDED**, **PENDING** or **ONGOING** state and was cancelled by the
|
||||
:ref:`Administrator <administrator_definition>`
|
||||
:ref:`Administrator <administrator_definition>`. Note that, if the action
|
||||
plan is cancelled after it is started, only the **PENDING** actions will
|
||||
be cancelled. The **ONGOING** actions will continue to run until they are
|
||||
completed.
|
||||
- **SUPERSEDED** : the :ref:`Action Plan <action_plan_definition>` was in
|
||||
RECOMMENDED state and was automatically superseded by Watcher, due to an
|
||||
expiration delay or an update of the
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
Now, when cancelling a running action plan, only the **PENDING** actions
|
||||
will be cancelled. The **ONGOING** actions will continue to run until they
|
||||
finish.
|
||||
|
||||
For more details, see: https://blueprints.launchpad.net/watcher/+spec/remove-cancel-ongoing-actions.
|
||||
@@ -16,9 +16,6 @@
|
||||
#
|
||||
|
||||
import abc
|
||||
import time
|
||||
|
||||
from eventlet import greenlet
|
||||
|
||||
from oslo_log import log
|
||||
from taskflow import task as flow_task
|
||||
@@ -28,7 +25,6 @@ from watcher.applier.actions import factory
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
from watcher.common.loader import loadable
|
||||
from watcher.common import utils
|
||||
from watcher import notifications
|
||||
from watcher import objects
|
||||
from watcher.objects import fields
|
||||
@@ -205,76 +201,32 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
objects.action.State.FAILED]:
|
||||
return True
|
||||
|
||||
def _do_execute_action(*args, **kwargs):
|
||||
try:
|
||||
db_action = self.do_execute(*args, **kwargs)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.END)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error('The workflow engine has failed '
|
||||
'to execute the action: %s', self.name)
|
||||
status_message = (_(
|
||||
"Action failed in execute: %s") % str(e))
|
||||
db_action = self.engine.notify(self._db_action,
|
||||
objects.action.State.FAILED,
|
||||
status_message=status_message)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.ERROR,
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
raise
|
||||
# NOTE: spawn a new thread for action execution, so that if action plan
|
||||
# is cancelled workflow engine will not wait to finish action execution
|
||||
et = utils.thread_spawn(_do_execute_action, *args, **kwargs)
|
||||
utils.thread_start(et)
|
||||
# NOTE: check for the state of action plan periodically,so that if
|
||||
# action is finished or action plan is cancelled we can exit from here.
|
||||
result = False
|
||||
while True:
|
||||
try:
|
||||
db_action = self.do_execute(*args, **kwargs)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.END)
|
||||
action_object = objects.Action.get_by_uuid(
|
||||
self.engine.context, self._db_action.uuid, eager=True)
|
||||
action_plan_object = objects.ActionPlan.get_by_id(
|
||||
self.engine.context, action_object.action_plan_id)
|
||||
if action_object.state == objects.action.State.SUCCEEDED:
|
||||
result = True
|
||||
if (action_object.state in [objects.action.State.SUCCEEDED,
|
||||
objects.action.State.FAILED] or
|
||||
action_plan_object.state in
|
||||
objects.action_plan.State.CANCEL_STATES):
|
||||
break
|
||||
time.sleep(1)
|
||||
try:
|
||||
# NOTE: kill the action execution thread, if action plan is
|
||||
# cancelled for all other cases wait for the result from action
|
||||
# execution thread.
|
||||
# Not all actions support abort operations, kill only those action
|
||||
# which support abort operations
|
||||
abort = self.action.check_abort()
|
||||
if (action_plan_object.state in
|
||||
objects.action_plan.State.CANCEL_STATES and
|
||||
abort):
|
||||
# NOTE(dviroel): killing green thread will raise an exception
|
||||
# which will be caught by taskflow and revert and abort will
|
||||
# be called. For threading mode, we will continue to wait for
|
||||
# it to finish, and no abort will be called.
|
||||
utils.thread_kill(et)
|
||||
utils.thread_wait(et)
|
||||
return result
|
||||
|
||||
# NOTE: catch the greenlet exit exception due to thread kill,
|
||||
# taskflow will call revert for the action,
|
||||
# we will redirect it to abort.
|
||||
except greenlet.GreenletExit:
|
||||
self.engine.notify_cancel_start(action_plan_object.uuid)
|
||||
raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid)
|
||||
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
# return False instead of raising an exception
|
||||
LOG.error('The workflow engine has failed '
|
||||
'to execute the action: %s', self.name)
|
||||
status_message = (_(
|
||||
"Action failed in execute: %s") % str(e))
|
||||
db_action = self.engine.notify(self._db_action,
|
||||
objects.action.State.FAILED,
|
||||
status_message=status_message)
|
||||
notifications.action.send_execution_notification(
|
||||
self.engine.context, db_action,
|
||||
fields.NotificationAction.EXECUTION,
|
||||
fields.NotificationPhase.ERROR,
|
||||
priority=fields.NotificationPriority.ERROR)
|
||||
return False
|
||||
|
||||
def post_execute(self):
|
||||
|
||||
@@ -31,7 +31,6 @@ from oslo_utils import strutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from watcher.common import exception
|
||||
from watcher import eventlet as eventlet_helper
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
@@ -192,95 +191,3 @@ def async_compat_call(f, *args, **kwargs):
|
||||
seconds=timeout,
|
||||
exception=asyncio.TimeoutError(f"Timeout: {timeout}s")):
|
||||
return tpool.execute(tpool_wrapper)
|
||||
|
||||
|
||||
class InlineThread:
|
||||
"""Executes function inline (synchronously), mimicking thread interface.
|
||||
|
||||
This class is used in non-eventlet mode to execute functions synchronously
|
||||
in the current thread, avoiding nested threading issues while maintaining
|
||||
compatibility with the eventlet greenthread interface.
|
||||
"""
|
||||
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
"""Initialize InlineThread.
|
||||
|
||||
:param func: The function to execute.
|
||||
:param args: Positional arguments for the function.
|
||||
:param kwargs: Keyword arguments for the function.
|
||||
"""
|
||||
self.func = func
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.result = None
|
||||
self.exception = None
|
||||
|
||||
def start(self):
|
||||
"""Execute the function synchronously in the current thread.
|
||||
|
||||
Exceptions are caught and stored to be re-raised in wait(),
|
||||
mimicking the behavior of actual threads where exceptions occur
|
||||
in the background and are retrieved when joining/waiting.
|
||||
"""
|
||||
try:
|
||||
self.result = self.func(*self.args, **self.kwargs)
|
||||
self.exception = None
|
||||
except Exception as e:
|
||||
self.result = None
|
||||
self.exception = e
|
||||
|
||||
def wait(self):
|
||||
"""Return the result of execution.
|
||||
|
||||
If an exception occurred during execution, it is re-raised here,
|
||||
mimicking the behavior of thread.join() which allows exceptions
|
||||
from background threads to be handled by the caller.
|
||||
|
||||
:return: The result of the function execution.
|
||||
:raises: Any exception that occurred during execution.
|
||||
"""
|
||||
if self.exception is not None:
|
||||
raise self.exception
|
||||
return self.result
|
||||
|
||||
|
||||
def thread_spawn(func, *args, **kwargs):
|
||||
"""Create a synchronous thread or eventlet green thread.
|
||||
|
||||
:param func: The function to spawn.
|
||||
:param args: The arguments to pass to the function.
|
||||
:param kwargs: The keyword arguments to pass to the function.
|
||||
:return: The spawned thread or eventlet green thread.
|
||||
"""
|
||||
if eventlet_helper.is_patched():
|
||||
return eventlet.spawn(func, *args, **kwargs)
|
||||
else:
|
||||
return InlineThread(func, *args, **kwargs)
|
||||
|
||||
|
||||
def thread_start(thread):
|
||||
"""Start a thread.
|
||||
|
||||
:param thread: The thread to start.
|
||||
"""
|
||||
if isinstance(thread, InlineThread):
|
||||
thread.start()
|
||||
|
||||
|
||||
def thread_wait(thread):
|
||||
"""Wait for a thread to finish.
|
||||
|
||||
:param thread: The thread to wait for.
|
||||
:return: The result of thread execution (for InlineThread/eventlet).
|
||||
"""
|
||||
if isinstance(thread, eventlet.greenthread.GreenThread | InlineThread):
|
||||
return thread.wait()
|
||||
|
||||
|
||||
def thread_kill(thread):
|
||||
"""Kill a green thread.
|
||||
|
||||
:param thread: The thread to kill.
|
||||
"""
|
||||
if isinstance(thread, eventlet.greenthread.GreenThread):
|
||||
thread.kill()
|
||||
|
||||
@@ -21,8 +21,8 @@ from oslo_config import cfg
|
||||
|
||||
from watcher.applier.workflow_engine import default as tflow
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
from watcher.common import nova_helper
|
||||
from watcher.common import utils
|
||||
from watcher import objects
|
||||
from watcher.tests.unit.db import base
|
||||
from watcher.tests.unit.objects import utils as obj_utils
|
||||
@@ -183,10 +183,13 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
||||
obj_action.status_message,
|
||||
"Action failed in post_condition: Failed in post_condition")
|
||||
|
||||
@mock.patch.object(utils, 'thread_kill')
|
||||
@mock.patch.object(utils, 'thread_spawn')
|
||||
def test_execute_with_cancel_action_plan(
|
||||
self, mock_thread_spawn, mock_thread_kill):
|
||||
def test_pre_execute_with_cancel_action_plan(self):
|
||||
"""Test that actions are cancelled in the pre_execute method.
|
||||
|
||||
Actions are cancelled when the action plan is in a cancelling
|
||||
state when entering the pre_execute method.
|
||||
"""
|
||||
|
||||
action_plan = obj_utils.create_test_action_plan(
|
||||
self.context, audit_id=self.audit.id,
|
||||
strategy_id=self.strategy.id,
|
||||
@@ -197,16 +200,41 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
||||
action_type='nop',
|
||||
input_parameters={'message': 'hello World'})
|
||||
|
||||
mock_thread_spawn.return_value = mock.Mock()
|
||||
action_container = tflow.TaskFlowActionContainer(
|
||||
db_action=action,
|
||||
engine=self.engine)
|
||||
|
||||
self.assertRaises(exception.ActionPlanCancelled,
|
||||
action_container.pre_execute)
|
||||
|
||||
def test_execute_with_cancel_action_plan(self):
|
||||
"""Test that actions are not cancelled in the execute method.
|
||||
|
||||
Actions are cancelled only if the action plan is in a cancelling
|
||||
state when entering the pre_execute method.
|
||||
Execute will run even if the action plan is in a cancelling state.
|
||||
"""
|
||||
|
||||
action_plan = obj_utils.create_test_action_plan(
|
||||
self.context, audit_id=self.audit.id,
|
||||
strategy_id=self.strategy.id,
|
||||
state=objects.action_plan.State.CANCELLING)
|
||||
action = obj_utils.create_test_action(
|
||||
self.context, action_plan_id=action_plan.id,
|
||||
state=objects.action.State.PENDING,
|
||||
action_type='nop',
|
||||
input_parameters={'message': 'hello World'})
|
||||
|
||||
action_container = tflow.TaskFlowActionContainer(
|
||||
db_action=action,
|
||||
engine=self.engine)
|
||||
|
||||
action_container.execute()
|
||||
result = action_container.execute()
|
||||
self.assertTrue(result)
|
||||
|
||||
mock_thread_kill.assert_called_once_with(
|
||||
mock_thread_spawn.return_value)
|
||||
obj_action = objects.Action.get_by_uuid(
|
||||
self.engine.context, action.uuid)
|
||||
self.assertEqual(obj_action.state, objects.action.State.SUCCEEDED)
|
||||
|
||||
@mock.patch('watcher.applier.workflow_engine.default.LOG')
|
||||
def test_execute_without_rollback(self, mock_log):
|
||||
|
||||
Reference in New Issue
Block a user