From 64d101d47d0ea90990faa89dc1de9aaba873f0f0 Mon Sep 17 00:00:00 2001 From: Douglas Viroel Date: Tue, 3 Feb 2026 07:37:03 -0300 Subject: [PATCH] Enable Applier parallel engine in native thread mode This patch modifies the Taskflow engine to 'parallel' when using native thread mode and introduces a new test fixtures for multi-threaded testing: * DatabaseWriteLock: Serializes database writes by wrapping _session_for_write with threading.RLock, preventing race conditions in SQLite-based tests with limited concurrency support. to the existing TempDir fixture. Related-Bug: #2133505 Assisted-by: claude-code (Sonnet 4.5) Change-Id: I3883d3b34691f076bde6ecec109bac94cfa12dd9 Signed-off-by: Douglas Viroel --- doc/source/contributor/concurrency.rst | 7 -- watcher/applier/workflow_engine/default.py | 14 ++-- watcher/tests/fixtures/db_lock.py | 81 ++++++++++++++++++++++ watcher/tests/unit/db/base.py | 49 ++++++++++++- 4 files changed, 133 insertions(+), 18 deletions(-) create mode 100644 watcher/tests/fixtures/db_lock.py diff --git a/doc/source/contributor/concurrency.rst b/doc/source/contributor/concurrency.rst index d33b14a3f..feffcd145 100644 --- a/doc/source/contributor/concurrency.rst +++ b/doc/source/contributor/concurrency.rst @@ -236,13 +236,6 @@ them is shown below. return flow -.. note:: - - When running in native threading mode, the default workflow engine Taskflow - will be configure with a serial engine, which will execute the actions - sequentially, due to a limitation of the current implementation of watcher - services. - In the applier tasks are contained in a :class:`~.TaskFlowActionContainer` which allows them to trigger events in the workflow engine. This way the workflow engine can halt or take other actions while the action plan is being diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index 5cad93347..e0f7a6a85 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -109,18 +109,16 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): decider=self.decider) e = None + engine_type = "parallel" if eventlet_helper.is_patched(): executor_type = "greenthreaded" - engine_type = "parallel" - e = engines.load( - flow, executor=executor_type, engine=engine_type, - max_workers=self.config.max_workers) else: - # Serial engine does not use an executor internally - LOG.info("Using Taskflow serial engine when running " + LOG.info("Using Taskflow parallel engine when running " "in native threading mode.") - engine_type = "serial" - e = engines.load(flow, engine=engine_type) + executor_type = "threaded" + e = engines.load( + flow, executor=executor_type, engine=engine_type, + max_workers=self.config.max_workers) e.run() diff --git a/watcher/tests/fixtures/db_lock.py b/watcher/tests/fixtures/db_lock.py new file mode 100644 index 000000000..8d18ff4a5 --- /dev/null +++ b/watcher/tests/fixtures/db_lock.py @@ -0,0 +1,81 @@ +# Copyright 2026 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Threading lock fixture for database write operations in Watcher tests.""" + +import contextlib +import threading + +import fixtures + +from watcher.db.sqlalchemy import api as db_api + + +class DatabaseWriteLock(fixtures.Fixture): + """Fixture that adds a threading lock to database write operations. + + This fixture ensures that multiple threads cannot write to the database + simultaneously by adding a global lock around the _session_for_write + function. This is useful for tests that use threading or concurrent + operations to avoid race conditions and database conflicts. + + The lock is applied by wrapping Watcher's db_api._session_for_write + function, ensuring that all database writes are serialized. + """ + + def __init__(self, lock_timeout=None): + """Initialize the DatabaseWriteLock fixture. + + :param lock_timeout: Optional timeout for acquiring the lock (seconds). + If None, the lock will block indefinitely. + :raises ValueError: If lock_timeout is not None and not a positive + number. + """ + super().__init__() + if lock_timeout is not None and lock_timeout <= 0: + raise ValueError( + "lock_timeout must be None or a positive number, " + f"got {lock_timeout}") + self.lock_timeout = lock_timeout + self._db_write_lock = threading.RLock() # Reentrant lock + + def _setUp(self): + """Set up the database write lock.""" + # Store the original _session_for_write function + original_session_for_write = db_api._session_for_write + + @contextlib.contextmanager + def locked_session_for_write(): + """Wrapper that adds locking around _session_for_write.""" + # Acquire the lock before entering the database session + acquired = self._db_write_lock.acquire( + blocking=True, + timeout=self.lock_timeout if self.lock_timeout else -1) + + if not acquired: + raise RuntimeError( + f"Failed to acquire database write lock within " + f"{self.lock_timeout} seconds") + + try: + # Enter the original writer session context + with original_session_for_write() as session: + yield session + finally: + # Release the lock after exiting the database session + self._db_write_lock.release() + + # Patch _session_for_write in watcher's db api + self.useFixture(fixtures.MockPatchObject( + db_api, '_session_for_write', locked_session_for_write)) diff --git a/watcher/tests/unit/db/base.py b/watcher/tests/unit/db/base.py index 040a8541b..d50a638bd 100644 --- a/watcher/tests/unit/db/base.py +++ b/watcher/tests/unit/db/base.py @@ -14,7 +14,11 @@ # under the License. """Watcher DB test base class.""" +import os +import sqlite3 +import tempfile +import fixtures from oslo_config import cfg from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import test_fixtures @@ -22,6 +26,7 @@ from oslo_db.sqlalchemy import test_fixtures from watcher.db import api as dbapi from watcher.db.sqlalchemy import migration +from watcher.tests.fixtures import db_lock as db_fixture from watcher.tests.unit import base from watcher.tests.unit.db import utils @@ -56,15 +61,53 @@ class DbTestCase(base.TestCase): return next(self._id_gen) def setUp(self): - cfg.CONF.set_override("enable_authentication", False) - # To use in-memory SQLite DB - cfg.CONF.set_override("connection", "sqlite://", group="database") + CONF.set_override("enable_authentication", False) + # Creates a temporary dir to hold sqlite temp files + # and patch tempile to use it as default dir. + self.useFixture(fixtures.NestedTempfile()) + + # NOTE(dviroel): Using file-backed database to support multiple + # native thread, since each one can have its own connection to + # the database. Files created by SQLite will be cleaned up + # by the NestedTempfile fixture. + fd, dbfile_path = tempfile.mkstemp(prefix="watcher_test_", + suffix=".db") + # close the file descriptor before SQLite connects + os.close(fd) + CONF.set_override( + "connection", f"sqlite:///{dbfile_path}", group="database") + + # Enable WAL journaling mode: "WAL provides more concurrency as + # readers do not block writers and a writer does not block readers." + # Note that WAL journal mode is persistent, if we close and reopen + # the database, it will come back in WAL mode. + # More info at: https://www.sqlite.org/wal.html + with sqlite3.connect(dbfile_path) as conn: + res = conn.execute("PRAGMA journal_mode=WAL") + self.assertEqual(res.fetchone()[0], 'wal') super().setUp() + # NOTE(dviroel): Creates a new enginefacade for each test, + # and use the fixture to replace the application level factory + # with the local one. This avoids issue with factory global flags + # that can avoid re-configuring the database. + local_enginefacade = enginefacade.transaction_context() + local_enginefacade.configure( + connection=CONF.database.connection, + sqlite_synchronous=CONF.database.sqlite_synchronous) + + self.useFixture( + test_fixtures.ReplaceEngineFacadeFixture( + enginefacade._context_manager, local_enginefacade)) + # Provision and configure a SQLite database for this test using # oslo_db's fixtures. self.useFixture(SqliteDatabaseFixture()) + # NOTE(dviroel): SQLite support only a single writer per database + # and we still miss the support retrying on a "Database is Locked" + # error. + self.useFixture(db_fixture.DatabaseWriteLock()) self.dbapi = dbapi.get_instance() self._id_gen = utils.id_generator()