Adds support for threading mode in applier

To support threading mode in the applier, this patch switches taskflow
engine to serial when eventlet is not patched. This avoids an issue
caused by the multiple threads reading/writing Action objects when
running with parallel engine. This limitation should be adressed in
a different patch which may also require a refactoring in database api
and how watcher manages its sessions and contexts.

Changes:
1. Add InlineThread class
   - Executes functions synchronously while maintaining thread interface
2. Use serial engine for threading mode
   - Switches to taskflow serial engine when eventlet is not patched
3. Replace default GreenThreadPoolExecutor in TriggerActionPlan
   - A new executor will be created based on the thread mode configured
4. Adds helper functions to spawn, start, wait and kill threads
   - The thread will be handled based on the thread mode configured
5. Threading mode job and tox-threading updated to include applier
   - These jobs will now enable threading mode for the applier content
     and service

Assisted-By: claude-code sonnet-4.5
Co-Authored-By: Sean Mooney <work@seanmooney.info>
Change-Id: I85926cdd1e4393be42fe6d543af50bf2948b5ce6
Signed-off-by: Douglas Viroel <viroel@gmail.com>
This commit is contained in:
Douglas Viroel
2025-10-09 16:34:05 -03:00
parent 1d32e734f3
commit e45860787f
9 changed files with 172 additions and 40 deletions

View File

@@ -284,6 +284,7 @@
devstack_localrc:
'SYSTEMD_ENV_VARS["watcher-api"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
'SYSTEMD_ENV_VARS["watcher-decision-engine"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
'SYSTEMD_ENV_VARS["watcher-applier"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
devstack_local_conf:
post-config:
$WATCHER_CONF:

View File

@@ -58,7 +58,7 @@ Concurrency modes
Evenlet has been the main concurrency library within the OpenStack community
for the last 10 years since the removal of twisted. Over the last few years,
the maintenance of eventlet has decreased and the efforts to remove the GIL
from Python (PEP 703), have fundamentally changed how concurrency is making
from Python (PEP 703), have fundamentally changed how concurrency works, making
eventlet no longer viable. While transitioning to a new native thread
solution, Watcher services will be supporting both modes, with the usage of
native threading mode initially classified as ``experimental``.
@@ -70,11 +70,6 @@ environment variable in the corresponding service configuration:
OS_WATCHER_DISABLE_EVENTLET_PATCHING=true
.. note::
The only service that supports two different concurrency modes is the
``decision engine``.
Decision engine concurrency
***************************
@@ -207,16 +202,17 @@ the underlying hardware.
Applier concurrency
*******************
The applier does not use the futurist_ GreenThreadPoolExecutor_ directly but
instead uses taskflow_. However, taskflow still utilizes a greenthreadpool.
This threadpool is initialized in the workflow engine called
:class:`~.DefaultWorkFlowEngine`. Currently Watcher supports one workflow
engine but the base class allows contributors to develop other workflow engines
as well. In taskflow tasks are created using different types of flows such as a
linear, unordered or a graph flow. The linear and graph flow allow for strong
ordering between individual tasks and it is for this reason that the workflow
engine utilizes a graph flow. The creation of tasks, subsequently linking them
into a graph like structure and submitting them is shown below.
The applier does not use the futurist_ pool executors directly but instead uses
taskflow_. Taskflow supports both green thread pools and native thread pools,
depending on the service configuration. The threadpool is initialized in the
workflow engine called :class:`~.DefaultWorkFlowEngine`. Currently Watcher
supports one workflow engine but the base class allows contributors to develop
other workflow engines as well. In taskflow tasks are created using different
types of flows such as a linear, unordered or a graph flow. The linear and
graph flow allow for strong ordering between individual tasks and it is for
this reason that the workflow engine utilizes a graph flow. The creation of
tasks, subsequently linking them into a graph like structure and submitting
them is shown below.
.. code-block:: python
@@ -240,6 +236,13 @@ into a graph like structure and submitting them is shown below.
return flow
.. note::
When running in native threading mode, the default workflow engine Taskflow
will be configure with a serial engine, which will execute the actions
sequentially, due to a limitation of the current implementation of watcher
services.
In the applier tasks are contained in a :class:`~.TaskFlowActionContainer`
which allows them to trigger events in the workflow engine. This way the
workflow engine can halt or take other actions while the action plan is being

View File

@@ -0,0 +1,15 @@
---
features:
- |
The Applier service now supports running with ``native threading``
mode enabled as opposed to the use of the Eventlet library. Note that the
use of ``native threading`` is still ``experimental``, and is disabled by
default. It should not be used in production. To switch from Eventlet to
native threading mode, the environment variable
``OS_WATCHER_DISABLE_EVENTLET_PATCHING=true`` needs to be added to the
applier service configuration. When running in native threading mode, the
default workflow engine (Taskflow) will be configured with a serial engine,
which will execute the actions sequentially, due to a limitation of the
current implementation of watcher services.
For more information, please check `eventlet
removal <https://wiki.openstack.org/wiki/Eventlet-removal>`__ documentation.

View File

@@ -45,9 +45,7 @@ setenv =
commands =
rm -f .testrepository/times.dbm
find . -type f -name "*.py[c|o]" -delete
# NOTE(dviroel): Applier still requires changes to support
# threading mode
stestr run {posargs} --exclude-regex 'applier'
stestr run {posargs}
[testenv:pep8]
description =

View File

@@ -15,12 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import futurist
from oslo_config import cfg
from oslo_log import log
from watcher.applier.action_plan import default
from watcher.common import executor
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@@ -30,7 +29,8 @@ class TriggerActionPlan:
def __init__(self, applier_manager):
self.applier_manager = applier_manager
workers = CONF.watcher_applier.workers
self.executor = futurist.GreenThreadPoolExecutor(max_workers=workers)
self.executor = executor.get_futurist_pool_executor(
max_workers=workers)
def do_launch_action_plan(self, context, action_plan_uuid):
try:
@@ -44,6 +44,7 @@ class TriggerActionPlan:
def launch_action_plan(self, context, action_plan_uuid):
LOG.debug("Trigger ActionPlan %s", action_plan_uuid)
# submit
executor.log_executor_stats(self.executor, name="action-plan-pool")
self.executor.submit(self.do_launch_action_plan, context,
action_plan_uuid)
return action_plan_uuid

View File

@@ -18,7 +18,7 @@
import abc
import time
import eventlet
from eventlet import greenlet
from oslo_log import log
from taskflow import task as flow_task
@@ -28,6 +28,7 @@ from watcher.applier.actions import factory
from watcher.common import clients
from watcher.common import exception
from watcher.common.loader import loadable
from watcher.common import utils
from watcher import notifications
from watcher import objects
from watcher.objects import fields
@@ -228,7 +229,8 @@ class BaseTaskFlowActionContainer(flow_task.Task):
raise
# NOTE: spawn a new thread for action execution, so that if action plan
# is cancelled workflow engine will not wait to finish action execution
et = eventlet.spawn(_do_execute_action, *args, **kwargs)
et = utils.thread_spawn(_do_execute_action, *args, **kwargs)
utils.thread_start(et)
# NOTE: check for the state of action plan periodically,so that if
# action is finished or action plan is cancelled we can exit from here.
result = False
@@ -255,14 +257,18 @@ class BaseTaskFlowActionContainer(flow_task.Task):
if (action_plan_object.state in
objects.action_plan.State.CANCEL_STATES and
abort):
et.kill()
et.wait()
# NOTE(dviroel): killing green thread will raise an exception
# which will be caught by taskflow and revert and abort will
# be called. For threading mode, we will continue to wait for
# it to finish, and no abort will be called.
utils.thread_kill(et)
utils.thread_wait(et)
return result
# NOTE: catch the greenlet exit exception due to thread kill,
# taskflow will call revert for the action,
# we will redirect it to abort.
except eventlet.greenlet.GreenletExit:
except greenlet.GreenletExit:
self.engine.notify_cancel_start(action_plan_object.uuid)
raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid)

View File

@@ -25,6 +25,7 @@ from taskflow import task as flow_task
from watcher.applier.workflow_engine import base
from watcher.common import exception
from watcher import conf
from watcher import eventlet as eventlet_helper
from watcher import objects
CONF = conf.CONF
@@ -107,9 +108,20 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
flow.link(actions_uuid[parent_id], actions_uuid[a.uuid],
decider=self.decider)
e = engines.load(
flow, executor='greenthreaded', engine='parallel',
max_workers=self.config.max_workers)
e = None
if eventlet_helper.is_patched():
executor_type = "greenthreaded"
engine_type = "parallel"
e = engines.load(
flow, executor=executor_type, engine=engine_type,
max_workers=self.config.max_workers)
else:
# Serial engine does not use an executor internally
LOG.info("Using Taskflow serial engine when running "
"in native threading mode.")
engine_type = "serial"
e = engines.load(flow, engine=engine_type)
e.run()
return flow

View File

@@ -31,6 +31,7 @@ from oslo_utils import strutils
from oslo_utils import uuidutils
from watcher.common import exception
from watcher import eventlet as eventlet_helper
CONF = cfg.CONF
@@ -191,3 +192,95 @@ def async_compat_call(f, *args, **kwargs):
seconds=timeout,
exception=asyncio.TimeoutError(f"Timeout: {timeout}s")):
return tpool.execute(tpool_wrapper)
class InlineThread:
"""Executes function inline (synchronously), mimicking thread interface.
This class is used in non-eventlet mode to execute functions synchronously
in the current thread, avoiding nested threading issues while maintaining
compatibility with the eventlet greenthread interface.
"""
def __init__(self, func, *args, **kwargs):
"""Initialize InlineThread.
:param func: The function to execute.
:param args: Positional arguments for the function.
:param kwargs: Keyword arguments for the function.
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.result = None
self.exception = None
def start(self):
"""Execute the function synchronously in the current thread.
Exceptions are caught and stored to be re-raised in wait(),
mimicking the behavior of actual threads where exceptions occur
in the background and are retrieved when joining/waiting.
"""
try:
self.result = self.func(*self.args, **self.kwargs)
self.exception = None
except Exception as e:
self.result = None
self.exception = e
def wait(self):
"""Return the result of execution.
If an exception occurred during execution, it is re-raised here,
mimicking the behavior of thread.join() which allows exceptions
from background threads to be handled by the caller.
:return: The result of the function execution.
:raises: Any exception that occurred during execution.
"""
if self.exception is not None:
raise self.exception
return self.result
def thread_spawn(func, *args, **kwargs):
"""Create a synchronous thread or eventlet green thread.
:param func: The function to spawn.
:param args: The arguments to pass to the function.
:param kwargs: The keyword arguments to pass to the function.
:return: The spawned thread or eventlet green thread.
"""
if eventlet_helper.is_patched():
return eventlet.spawn(func, *args, **kwargs)
else:
return InlineThread(func, *args, **kwargs)
def thread_start(thread):
"""Start a thread.
:param thread: The thread to start.
"""
if isinstance(thread, InlineThread):
thread.start()
def thread_wait(thread):
"""Wait for a thread to finish.
:param thread: The thread to wait for.
:return: The result of thread execution (for InlineThread/eventlet).
"""
if isinstance(thread, eventlet.greenthread.GreenThread | InlineThread):
return thread.wait()
def thread_kill(thread):
"""Kill a green thread.
:param thread: The thread to kill.
"""
if isinstance(thread, eventlet.greenthread.GreenThread):
thread.kill()

View File

@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import eventlet
from unittest import mock
from oslo_config import cfg
@@ -23,6 +22,7 @@ from oslo_config import cfg
from watcher.applier.workflow_engine import default as tflow
from watcher.common import clients
from watcher.common import nova_helper
from watcher.common import utils
from watcher import objects
from watcher.tests.db import base
from watcher.tests.objects import utils as obj_utils
@@ -54,6 +54,7 @@ class TestTaskFlowActionContainer(base.DbTestCase):
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
action_container.execute()
obj_action = objects.Action.get_by_uuid(
@@ -182,28 +183,30 @@ class TestTaskFlowActionContainer(base.DbTestCase):
obj_action.status_message,
"Action failed in post_condition: Failed in post_condition")
@mock.patch('eventlet.spawn')
def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn):
@mock.patch.object(utils, 'thread_kill')
@mock.patch.object(utils, 'thread_spawn')
def test_execute_with_cancel_action_plan(
self, mock_thread_spawn, mock_thread_kill):
action_plan = obj_utils.create_test_action_plan(
self.context, audit_id=self.audit.id,
strategy_id=self.strategy.id,
state=objects.action_plan.State.CANCELLING)
action = obj_utils.create_test_action(
self.context, action_plan_id=action_plan.id,
state=objects.action.State.ONGOING,
state=objects.action.State.PENDING,
action_type='nop',
input_parameters={'message': 'hello World'})
mock_thread_spawn.return_value = mock.Mock()
action_container = tflow.TaskFlowActionContainer(
db_action=action,
engine=self.engine)
def empty_test():
pass
et = eventlet.spawn(empty_test)
mock_eventlet_spawn.return_value = et
action_container.execute()
et.kill.assert_called_with()
mock_thread_kill.assert_called_once_with(
mock_thread_spawn.return_value)
@mock.patch('watcher.applier.workflow_engine.default.LOG')
def test_execute_without_rollback(self, mock_log):