diff --git a/.zuul.yaml b/.zuul.yaml index b9e58fca4..8d812acc8 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -284,6 +284,7 @@ devstack_localrc: 'SYSTEMD_ENV_VARS["watcher-api"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true 'SYSTEMD_ENV_VARS["watcher-decision-engine"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true + 'SYSTEMD_ENV_VARS["watcher-applier"]': OS_WATCHER_DISABLE_EVENTLET_PATCHING=true devstack_local_conf: post-config: $WATCHER_CONF: diff --git a/doc/source/contributor/concurrency.rst b/doc/source/contributor/concurrency.rst index 2a001288d..d33b14a3f 100644 --- a/doc/source/contributor/concurrency.rst +++ b/doc/source/contributor/concurrency.rst @@ -58,7 +58,7 @@ Concurrency modes Evenlet has been the main concurrency library within the OpenStack community for the last 10 years since the removal of twisted. Over the last few years, the maintenance of eventlet has decreased and the efforts to remove the GIL -from Python (PEP 703), have fundamentally changed how concurrency is making +from Python (PEP 703), have fundamentally changed how concurrency works, making eventlet no longer viable. While transitioning to a new native thread solution, Watcher services will be supporting both modes, with the usage of native threading mode initially classified as ``experimental``. @@ -70,11 +70,6 @@ environment variable in the corresponding service configuration: OS_WATCHER_DISABLE_EVENTLET_PATCHING=true -.. note:: - - The only service that supports two different concurrency modes is the - ``decision engine``. - Decision engine concurrency *************************** @@ -207,16 +202,17 @@ the underlying hardware. Applier concurrency ******************* -The applier does not use the futurist_ GreenThreadPoolExecutor_ directly but -instead uses taskflow_. However, taskflow still utilizes a greenthreadpool. -This threadpool is initialized in the workflow engine called -:class:`~.DefaultWorkFlowEngine`. Currently Watcher supports one workflow -engine but the base class allows contributors to develop other workflow engines -as well. In taskflow tasks are created using different types of flows such as a -linear, unordered or a graph flow. The linear and graph flow allow for strong -ordering between individual tasks and it is for this reason that the workflow -engine utilizes a graph flow. The creation of tasks, subsequently linking them -into a graph like structure and submitting them is shown below. +The applier does not use the futurist_ pool executors directly but instead uses +taskflow_. Taskflow supports both green thread pools and native thread pools, +depending on the service configuration. The threadpool is initialized in the +workflow engine called :class:`~.DefaultWorkFlowEngine`. Currently Watcher +supports one workflow engine but the base class allows contributors to develop +other workflow engines as well. In taskflow tasks are created using different +types of flows such as a linear, unordered or a graph flow. The linear and +graph flow allow for strong ordering between individual tasks and it is for +this reason that the workflow engine utilizes a graph flow. The creation of +tasks, subsequently linking them into a graph like structure and submitting +them is shown below. .. code-block:: python @@ -240,6 +236,13 @@ into a graph like structure and submitting them is shown below. return flow +.. note:: + + When running in native threading mode, the default workflow engine Taskflow + will be configure with a serial engine, which will execute the actions + sequentially, due to a limitation of the current implementation of watcher + services. + In the applier tasks are contained in a :class:`~.TaskFlowActionContainer` which allows them to trigger events in the workflow engine. This way the workflow engine can halt or take other actions while the action plan is being diff --git a/releasenotes/notes/applier-threading-mode-882c79ca8843bc5a.yaml b/releasenotes/notes/applier-threading-mode-882c79ca8843bc5a.yaml new file mode 100644 index 000000000..ff0a99146 --- /dev/null +++ b/releasenotes/notes/applier-threading-mode-882c79ca8843bc5a.yaml @@ -0,0 +1,15 @@ +--- +features: + - | + The Applier service now supports running with ``native threading`` + mode enabled as opposed to the use of the Eventlet library. Note that the + use of ``native threading`` is still ``experimental``, and is disabled by + default. It should not be used in production. To switch from Eventlet to + native threading mode, the environment variable + ``OS_WATCHER_DISABLE_EVENTLET_PATCHING=true`` needs to be added to the + applier service configuration. When running in native threading mode, the + default workflow engine (Taskflow) will be configured with a serial engine, + which will execute the actions sequentially, due to a limitation of the + current implementation of watcher services. + For more information, please check `eventlet + removal `__ documentation. diff --git a/tox.ini b/tox.ini index c9d5dc821..a6d9b251d 100644 --- a/tox.ini +++ b/tox.ini @@ -45,9 +45,7 @@ setenv = commands = rm -f .testrepository/times.dbm find . -type f -name "*.py[c|o]" -delete - # NOTE(dviroel): Applier still requires changes to support - # threading mode - stestr run {posargs} --exclude-regex 'applier' + stestr run {posargs} [testenv:pep8] description = diff --git a/watcher/applier/messaging/trigger.py b/watcher/applier/messaging/trigger.py index acb0b2867..605ced4f4 100644 --- a/watcher/applier/messaging/trigger.py +++ b/watcher/applier/messaging/trigger.py @@ -15,12 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import futurist - from oslo_config import cfg from oslo_log import log from watcher.applier.action_plan import default +from watcher.common import executor LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -30,7 +29,8 @@ class TriggerActionPlan: def __init__(self, applier_manager): self.applier_manager = applier_manager workers = CONF.watcher_applier.workers - self.executor = futurist.GreenThreadPoolExecutor(max_workers=workers) + self.executor = executor.get_futurist_pool_executor( + max_workers=workers) def do_launch_action_plan(self, context, action_plan_uuid): try: @@ -44,6 +44,7 @@ class TriggerActionPlan: def launch_action_plan(self, context, action_plan_uuid): LOG.debug("Trigger ActionPlan %s", action_plan_uuid) # submit + executor.log_executor_stats(self.executor, name="action-plan-pool") self.executor.submit(self.do_launch_action_plan, context, action_plan_uuid) return action_plan_uuid diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 0a6d9bd84..f3883494c 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -18,7 +18,7 @@ import abc import time -import eventlet +from eventlet import greenlet from oslo_log import log from taskflow import task as flow_task @@ -28,6 +28,7 @@ from watcher.applier.actions import factory from watcher.common import clients from watcher.common import exception from watcher.common.loader import loadable +from watcher.common import utils from watcher import notifications from watcher import objects from watcher.objects import fields @@ -228,7 +229,8 @@ class BaseTaskFlowActionContainer(flow_task.Task): raise # NOTE: spawn a new thread for action execution, so that if action plan # is cancelled workflow engine will not wait to finish action execution - et = eventlet.spawn(_do_execute_action, *args, **kwargs) + et = utils.thread_spawn(_do_execute_action, *args, **kwargs) + utils.thread_start(et) # NOTE: check for the state of action plan periodically,so that if # action is finished or action plan is cancelled we can exit from here. result = False @@ -255,14 +257,18 @@ class BaseTaskFlowActionContainer(flow_task.Task): if (action_plan_object.state in objects.action_plan.State.CANCEL_STATES and abort): - et.kill() - et.wait() + # NOTE(dviroel): killing green thread will raise an exception + # which will be caught by taskflow and revert and abort will + # be called. For threading mode, we will continue to wait for + # it to finish, and no abort will be called. + utils.thread_kill(et) + utils.thread_wait(et) return result # NOTE: catch the greenlet exit exception due to thread kill, # taskflow will call revert for the action, # we will redirect it to abort. - except eventlet.greenlet.GreenletExit: + except greenlet.GreenletExit: self.engine.notify_cancel_start(action_plan_object.uuid) raise exception.ActionPlanCancelled(uuid=action_plan_object.uuid) diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index f3bc71cb7..5cad93347 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -25,6 +25,7 @@ from taskflow import task as flow_task from watcher.applier.workflow_engine import base from watcher.common import exception from watcher import conf +from watcher import eventlet as eventlet_helper from watcher import objects CONF = conf.CONF @@ -107,9 +108,20 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): flow.link(actions_uuid[parent_id], actions_uuid[a.uuid], decider=self.decider) - e = engines.load( - flow, executor='greenthreaded', engine='parallel', - max_workers=self.config.max_workers) + e = None + if eventlet_helper.is_patched(): + executor_type = "greenthreaded" + engine_type = "parallel" + e = engines.load( + flow, executor=executor_type, engine=engine_type, + max_workers=self.config.max_workers) + else: + # Serial engine does not use an executor internally + LOG.info("Using Taskflow serial engine when running " + "in native threading mode.") + engine_type = "serial" + e = engines.load(flow, engine=engine_type) + e.run() return flow diff --git a/watcher/common/utils.py b/watcher/common/utils.py index 4073cf983..77f1ce958 100644 --- a/watcher/common/utils.py +++ b/watcher/common/utils.py @@ -31,6 +31,7 @@ from oslo_utils import strutils from oslo_utils import uuidutils from watcher.common import exception +from watcher import eventlet as eventlet_helper CONF = cfg.CONF @@ -191,3 +192,95 @@ def async_compat_call(f, *args, **kwargs): seconds=timeout, exception=asyncio.TimeoutError(f"Timeout: {timeout}s")): return tpool.execute(tpool_wrapper) + + +class InlineThread: + """Executes function inline (synchronously), mimicking thread interface. + + This class is used in non-eventlet mode to execute functions synchronously + in the current thread, avoiding nested threading issues while maintaining + compatibility with the eventlet greenthread interface. + """ + + def __init__(self, func, *args, **kwargs): + """Initialize InlineThread. + + :param func: The function to execute. + :param args: Positional arguments for the function. + :param kwargs: Keyword arguments for the function. + """ + self.func = func + self.args = args + self.kwargs = kwargs + self.result = None + self.exception = None + + def start(self): + """Execute the function synchronously in the current thread. + + Exceptions are caught and stored to be re-raised in wait(), + mimicking the behavior of actual threads where exceptions occur + in the background and are retrieved when joining/waiting. + """ + try: + self.result = self.func(*self.args, **self.kwargs) + self.exception = None + except Exception as e: + self.result = None + self.exception = e + + def wait(self): + """Return the result of execution. + + If an exception occurred during execution, it is re-raised here, + mimicking the behavior of thread.join() which allows exceptions + from background threads to be handled by the caller. + + :return: The result of the function execution. + :raises: Any exception that occurred during execution. + """ + if self.exception is not None: + raise self.exception + return self.result + + +def thread_spawn(func, *args, **kwargs): + """Create a synchronous thread or eventlet green thread. + + :param func: The function to spawn. + :param args: The arguments to pass to the function. + :param kwargs: The keyword arguments to pass to the function. + :return: The spawned thread or eventlet green thread. + """ + if eventlet_helper.is_patched(): + return eventlet.spawn(func, *args, **kwargs) + else: + return InlineThread(func, *args, **kwargs) + + +def thread_start(thread): + """Start a thread. + + :param thread: The thread to start. + """ + if isinstance(thread, InlineThread): + thread.start() + + +def thread_wait(thread): + """Wait for a thread to finish. + + :param thread: The thread to wait for. + :return: The result of thread execution (for InlineThread/eventlet). + """ + if isinstance(thread, eventlet.greenthread.GreenThread | InlineThread): + return thread.wait() + + +def thread_kill(thread): + """Kill a green thread. + + :param thread: The thread to kill. + """ + if isinstance(thread, eventlet.greenthread.GreenThread): + thread.kill() diff --git a/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py index 4727d60fe..53e4c8cd6 100644 --- a/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py +++ b/watcher/tests/applier/workflow_engine/test_taskflow_action_container.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import eventlet from unittest import mock from oslo_config import cfg @@ -23,6 +22,7 @@ from oslo_config import cfg from watcher.applier.workflow_engine import default as tflow from watcher.common import clients from watcher.common import nova_helper +from watcher.common import utils from watcher import objects from watcher.tests.db import base from watcher.tests.objects import utils as obj_utils @@ -54,6 +54,7 @@ class TestTaskFlowActionContainer(base.DbTestCase): action_container = tflow.TaskFlowActionContainer( db_action=action, engine=self.engine) + action_container.execute() obj_action = objects.Action.get_by_uuid( @@ -182,28 +183,30 @@ class TestTaskFlowActionContainer(base.DbTestCase): obj_action.status_message, "Action failed in post_condition: Failed in post_condition") - @mock.patch('eventlet.spawn') - def test_execute_with_cancel_action_plan(self, mock_eventlet_spawn): + @mock.patch.object(utils, 'thread_kill') + @mock.patch.object(utils, 'thread_spawn') + def test_execute_with_cancel_action_plan( + self, mock_thread_spawn, mock_thread_kill): action_plan = obj_utils.create_test_action_plan( self.context, audit_id=self.audit.id, strategy_id=self.strategy.id, state=objects.action_plan.State.CANCELLING) - action = obj_utils.create_test_action( self.context, action_plan_id=action_plan.id, - state=objects.action.State.ONGOING, + state=objects.action.State.PENDING, action_type='nop', input_parameters={'message': 'hello World'}) + + mock_thread_spawn.return_value = mock.Mock() + action_container = tflow.TaskFlowActionContainer( db_action=action, engine=self.engine) - def empty_test(): - pass - et = eventlet.spawn(empty_test) - mock_eventlet_spawn.return_value = et action_container.execute() - et.kill.assert_called_with() + + mock_thread_kill.assert_called_once_with( + mock_thread_spawn.return_value) @mock.patch('watcher.applier.workflow_engine.default.LOG') def test_execute_without_rollback(self, mock_log):