Merge "Enable Applier parallel engine in native thread mode"

This commit is contained in:
Zuul
2026-02-19 17:28:27 +00:00
committed by Gerrit Code Review
4 changed files with 133 additions and 18 deletions

View File

@@ -236,13 +236,6 @@ them is shown below.
return flow
.. note::
When running in native threading mode, the default workflow engine Taskflow
will be configure with a serial engine, which will execute the actions
sequentially, due to a limitation of the current implementation of watcher
services.
In the applier tasks are contained in a :class:`~.TaskFlowActionContainer`
which allows them to trigger events in the workflow engine. This way the
workflow engine can halt or take other actions while the action plan is being

View File

@@ -109,18 +109,16 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
decider=self.decider)
e = None
engine_type = "parallel"
if eventlet_helper.is_patched():
executor_type = "greenthreaded"
engine_type = "parallel"
e = engines.load(
flow, executor=executor_type, engine=engine_type,
max_workers=self.config.max_workers)
else:
# Serial engine does not use an executor internally
LOG.info("Using Taskflow serial engine when running "
LOG.info("Using Taskflow parallel engine when running "
"in native threading mode.")
engine_type = "serial"
e = engines.load(flow, engine=engine_type)
executor_type = "threaded"
e = engines.load(
flow, executor=executor_type, engine=engine_type,
max_workers=self.config.max_workers)
e.run()

81
watcher/tests/fixtures/db_lock.py vendored Normal file
View File

@@ -0,0 +1,81 @@
# Copyright 2026 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Threading lock fixture for database write operations in Watcher tests."""
import contextlib
import threading
import fixtures
from watcher.db.sqlalchemy import api as db_api
class DatabaseWriteLock(fixtures.Fixture):
"""Fixture that adds a threading lock to database write operations.
This fixture ensures that multiple threads cannot write to the database
simultaneously by adding a global lock around the _session_for_write
function. This is useful for tests that use threading or concurrent
operations to avoid race conditions and database conflicts.
The lock is applied by wrapping Watcher's db_api._session_for_write
function, ensuring that all database writes are serialized.
"""
def __init__(self, lock_timeout=None):
"""Initialize the DatabaseWriteLock fixture.
:param lock_timeout: Optional timeout for acquiring the lock (seconds).
If None, the lock will block indefinitely.
:raises ValueError: If lock_timeout is not None and not a positive
number.
"""
super().__init__()
if lock_timeout is not None and lock_timeout <= 0:
raise ValueError(
"lock_timeout must be None or a positive number, "
f"got {lock_timeout}")
self.lock_timeout = lock_timeout
self._db_write_lock = threading.RLock() # Reentrant lock
def _setUp(self):
"""Set up the database write lock."""
# Store the original _session_for_write function
original_session_for_write = db_api._session_for_write
@contextlib.contextmanager
def locked_session_for_write():
"""Wrapper that adds locking around _session_for_write."""
# Acquire the lock before entering the database session
acquired = self._db_write_lock.acquire(
blocking=True,
timeout=self.lock_timeout if self.lock_timeout else -1)
if not acquired:
raise RuntimeError(
f"Failed to acquire database write lock within "
f"{self.lock_timeout} seconds")
try:
# Enter the original writer session context
with original_session_for_write() as session:
yield session
finally:
# Release the lock after exiting the database session
self._db_write_lock.release()
# Patch _session_for_write in watcher's db api
self.useFixture(fixtures.MockPatchObject(
db_api, '_session_for_write', locked_session_for_write))

View File

@@ -14,7 +14,11 @@
# under the License.
"""Watcher DB test base class."""
import os
import sqlite3
import tempfile
import fixtures
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_fixtures
@@ -22,6 +26,7 @@ from oslo_db.sqlalchemy import test_fixtures
from watcher.db import api as dbapi
from watcher.db.sqlalchemy import migration
from watcher.tests.fixtures import db_lock as db_fixture
from watcher.tests.unit import base
from watcher.tests.unit.db import utils
@@ -56,15 +61,53 @@ class DbTestCase(base.TestCase):
return next(self._id_gen)
def setUp(self):
cfg.CONF.set_override("enable_authentication", False)
# To use in-memory SQLite DB
cfg.CONF.set_override("connection", "sqlite://", group="database")
CONF.set_override("enable_authentication", False)
# Creates a temporary dir to hold sqlite temp files
# and patch tempile to use it as default dir.
self.useFixture(fixtures.NestedTempfile())
# NOTE(dviroel): Using file-backed database to support multiple
# native thread, since each one can have its own connection to
# the database. Files created by SQLite will be cleaned up
# by the NestedTempfile fixture.
fd, dbfile_path = tempfile.mkstemp(prefix="watcher_test_",
suffix=".db")
# close the file descriptor before SQLite connects
os.close(fd)
CONF.set_override(
"connection", f"sqlite:///{dbfile_path}", group="database")
# Enable WAL journaling mode: "WAL provides more concurrency as
# readers do not block writers and a writer does not block readers."
# Note that WAL journal mode is persistent, if we close and reopen
# the database, it will come back in WAL mode.
# More info at: https://www.sqlite.org/wal.html
with sqlite3.connect(dbfile_path) as conn:
res = conn.execute("PRAGMA journal_mode=WAL")
self.assertEqual(res.fetchone()[0], 'wal')
super().setUp()
# NOTE(dviroel): Creates a new enginefacade for each test,
# and use the fixture to replace the application level factory
# with the local one. This avoids issue with factory global flags
# that can avoid re-configuring the database.
local_enginefacade = enginefacade.transaction_context()
local_enginefacade.configure(
connection=CONF.database.connection,
sqlite_synchronous=CONF.database.sqlite_synchronous)
self.useFixture(
test_fixtures.ReplaceEngineFacadeFixture(
enginefacade._context_manager, local_enginefacade))
# Provision and configure a SQLite database for this test using
# oslo_db's fixtures.
self.useFixture(SqliteDatabaseFixture())
# NOTE(dviroel): SQLite support only a single writer per database
# and we still miss the support retrying on a "Database is Locked"
# error.
self.useFixture(db_fixture.DatabaseWriteLock())
self.dbapi = dbapi.get_instance()
self._id_gen = utils.id_generator()