Eventlet removal
This change removes all usage of eventlet from cyborg. Currently cyborg imports eventlet and enables monky patching however it does not explicitly use eventlet This change removes eventlet as a depency and opts into the threading backend for oslo service and the threading rpc server. Assisted-By: claude code opus 4.6 Change-Id: Ib1774df50a37c42c30aaad3648a28ea0823e0057 Signed-off-by: Sean Mooney <work@seanmooney.info>
This commit is contained in:
@@ -13,9 +13,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
"""Cyborg command entry point initialization.
|
||||
|
||||
Initializes the oslo.service threading backend before any service is
|
||||
started. All ``cyborg-*`` entry points import from ``cyborg.cmd``,
|
||||
so this module runs before any service code.
|
||||
"""
|
||||
|
||||
import oslo_i18n as i18n
|
||||
import oslo_service.backend as service
|
||||
|
||||
eventlet.monkey_patch()
|
||||
|
||||
service.init_backend(service.BackendType.THREADING)
|
||||
i18n.install('cyborg')
|
||||
|
||||
@@ -105,12 +105,9 @@ def get_server(target, endpoints, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return messaging.get_rpc_server(TRANSPORT,
|
||||
target,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
serializer=serializer,
|
||||
access_policy=access_policy)
|
||||
return messaging.get_rpc_server(
|
||||
TRANSPORT, target, endpoints,
|
||||
serializer=serializer, access_policy=access_policy)
|
||||
|
||||
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
|
||||
+3
-32
@@ -15,9 +15,8 @@
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor as CFThreadPoolExecutor
|
||||
from concurrent import futures
|
||||
from functools import wraps
|
||||
import queue
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@@ -220,28 +219,6 @@ class Singleton(_Singleton('SingletonMeta', (object,), {})):
|
||||
pass
|
||||
|
||||
|
||||
class ThreadPoolExecutor(CFThreadPoolExecutor):
|
||||
"""Derived from concurrent.futures.ThreadPoolExecutor"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initializes a new ThreadPoolExecutor instance.
|
||||
|
||||
Args:
|
||||
max_workers: The maximum number of threads that can be used to
|
||||
execute the given calls.
|
||||
thread_name_prefix: An optional name prefix to give our threads.
|
||||
initializer: A callable used to initialize worker threads.
|
||||
initargs: A tuple of arguments to pass to the initializer.
|
||||
"""
|
||||
super(ThreadPoolExecutor, self).__init__(*args, **kwargs)
|
||||
# NOTE(Shaohe): py37/38 will use SimpleQueue as _work_queue, it will
|
||||
# cause hang the main thread with eventlet.monkey_patch. Change it
|
||||
# to queue._PySimpleQueue
|
||||
if hasattr(queue, "SimpleQueue") and not isinstance(
|
||||
self._work_queue, queue._PySimpleQueue):
|
||||
self._work_queue = queue._PySimpleQueue()
|
||||
|
||||
|
||||
class ThreadWorks(Singleton):
|
||||
"""Passthrough method for ThreadPoolExecutor.
|
||||
|
||||
@@ -252,13 +229,7 @@ class ThreadWorks(Singleton):
|
||||
|
||||
def __init__(self, pool_size=CONF.thread_pool_size):
|
||||
"""Singleton ThreadWorks init."""
|
||||
# Ref: https://pythonhosted.org/futures/
|
||||
# NOTE(Shaohe) We can let eventlet greening ThreadPoolExecutor
|
||||
# eventlet.patcher.monkey_patch(os=False, socket=True,
|
||||
# select=True, thread=True)
|
||||
# futures = eventlet.import_patched('concurrent.futures')
|
||||
# ThreadPoolExecutor = futures.ThreadPoolExecutor
|
||||
self.executor = ThreadPoolExecutor(max_workers=pool_size)
|
||||
self.executor = futures.ThreadPoolExecutor(max_workers=pool_size)
|
||||
self.masters = {}
|
||||
|
||||
def spawn(self, func, *args, **kwargs):
|
||||
@@ -270,7 +241,7 @@ class ThreadWorks(Singleton):
|
||||
|
||||
def spawn_master(self, func, *args, **kwargs):
|
||||
"""Start a new thread for a job."""
|
||||
executor = ThreadPoolExecutor()
|
||||
executor = futures.ThreadPoolExecutor()
|
||||
# TODO(Shaohe) every submit func should be wrapped with exception catch
|
||||
job = executor.submit(func, *args, **kwargs)
|
||||
LOG.debug("Spawn master job. func: %s is with parameters args: %s, "
|
||||
|
||||
+2
-2
@@ -66,7 +66,7 @@ class RequestContext(context.RequestContext):
|
||||
'yes' indicates deleted records are visible,
|
||||
'only' indicates that *only* deleted records are visible.
|
||||
|
||||
:param overwrite: Set to False to ensure that the greenthread local
|
||||
:param overwrite: Set to False to ensure that the thread-local
|
||||
copy of the index is not overwritten.
|
||||
|
||||
:param instance_lock_checked: This is not used and will be removed
|
||||
@@ -157,7 +157,7 @@ def get_context():
|
||||
"""A helper method to get a blank context.
|
||||
|
||||
Note that overwrite is False here so this context will not update the
|
||||
greenthread-local stored context that is used when logging.
|
||||
thread-local stored context that is used when logging.
|
||||
"""
|
||||
return RequestContext(user_id=None,
|
||||
project_id=None,
|
||||
|
||||
@@ -23,8 +23,6 @@ from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
from oslotest import base
|
||||
|
||||
import contextlib
|
||||
import eventlet
|
||||
import testtools
|
||||
|
||||
from cyborg.common import config as cyborg_config
|
||||
@@ -86,10 +84,6 @@ class TestCase(base.BaseTestCase):
|
||||
return root
|
||||
|
||||
|
||||
# Test worker cannot survive eventlet's Timeout exception, which effectively
|
||||
# kills the whole worker, with all test cases scheduled to it. This metaclass
|
||||
# makes all test cases convert Timeout exceptions into unittest friendly
|
||||
# failure mode (self.fail).
|
||||
class DietTestCase(base.BaseTestCase):
|
||||
"""Same great taste, less filling.
|
||||
|
||||
@@ -135,13 +129,6 @@ class DietTestCase(base.BaseTestCase):
|
||||
# This makes sys.exit(0) still a failure
|
||||
self.force_failure = True
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assert_max_execution_time(self, max_execution_time=5):
|
||||
with eventlet.Timeout(max_execution_time, False):
|
||||
yield
|
||||
return
|
||||
self.fail('Execution of this test timed out')
|
||||
|
||||
def assertOrderedEqual(self, expected, actual):
|
||||
expect_val = self.sort_dict_lists(expected)
|
||||
actual_val = self.sort_dict_lists(actual)
|
||||
|
||||
@@ -21,17 +21,12 @@
|
||||
:platform: Unix
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
import oslo_service.backend as service
|
||||
|
||||
from cyborg import objects
|
||||
|
||||
service.init_backend(service.BackendType.THREADING)
|
||||
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
# Make sure this is done after eventlet monkey patching otherwise
|
||||
# the threading.local() store used in oslo_messaging will be initialized to
|
||||
# threadlocal storage rather than greenthread local. This will cause context
|
||||
# sets and deletes in that storage to clobber each other.
|
||||
# Make sure we have all of the objects loaded. We do this
|
||||
# at module import time, because we may be using mock decorators in our
|
||||
# tests that run at import time.
|
||||
|
||||
@@ -15,45 +15,26 @@
|
||||
|
||||
"""Cyborg DB test base class."""
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
import tempfile
|
||||
|
||||
import alembic.migration as alembic_migration
|
||||
from alembic.script import ScriptDirectory
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import test_fixtures
|
||||
|
||||
from cyborg.db import api as dbapi
|
||||
from cyborg.db.sqlalchemy import api as sqlalchemy_api
|
||||
from cyborg.db.sqlalchemy import migration
|
||||
from cyborg.db.sqlalchemy import models
|
||||
from cyborg.tests import base
|
||||
from cyborg.tests.unit.db_lock_fixture import DatabaseWriteLock
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
_DB_CACHE = None
|
||||
|
||||
|
||||
class Database(fixtures.Fixture):
|
||||
|
||||
def __init__(self, engine, db_migrate, sql_connection):
|
||||
self.sql_connection = sql_connection
|
||||
|
||||
self.engine = engine
|
||||
self.engine.dispose()
|
||||
conn = self.engine.connect()
|
||||
self.setup_sqlite(db_migrate)
|
||||
|
||||
self._DB = ''.join(line for line in conn.connection.iterdump())
|
||||
self.engine.dispose()
|
||||
|
||||
def setup_sqlite(self, db_migrate):
|
||||
if db_migrate.version():
|
||||
return
|
||||
models.Base.metadata.create_all(self.engine)
|
||||
db_migrate.stamp('head')
|
||||
|
||||
def setUp(self):
|
||||
super(Database, self).setUp()
|
||||
|
||||
conn = self.engine.connect()
|
||||
conn.connection.executescript(self._DB)
|
||||
self.addCleanup(self.engine.dispose)
|
||||
|
||||
|
||||
class DbTestCase(base.TestCase):
|
||||
@@ -61,11 +42,45 @@ class DbTestCase(base.TestCase):
|
||||
def setUp(self):
|
||||
super(DbTestCase, self).setUp()
|
||||
|
||||
self.dbapi = dbapi.get_instance()
|
||||
# Create a temporary directory for SQLite temp files;
|
||||
# NestedTempfile patches tempfile to use it as the default.
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
|
||||
global _DB_CACHE
|
||||
if not _DB_CACHE:
|
||||
engine = sqlalchemy_api.main_context_manager.writer.get_engine()
|
||||
_DB_CACHE = Database(engine, migration,
|
||||
sql_connection=CONF.database.connection)
|
||||
self.useFixture(_DB_CACHE)
|
||||
# File-backed SQLite so each thread gets its own connection.
|
||||
fd, dbfile_path = tempfile.mkstemp(
|
||||
prefix="cyborg_test_", suffix=".db")
|
||||
os.close(fd)
|
||||
CONF.set_override(
|
||||
"connection", "sqlite:///%s" % dbfile_path,
|
||||
group="database")
|
||||
|
||||
# WAL mode: readers don't block writers, writer doesn't
|
||||
# block readers.
|
||||
with sqlite3.connect(dbfile_path) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
# Fresh enginefacade per test, replacing the app-level one.
|
||||
local_enginefacade = enginefacade.transaction_context()
|
||||
local_enginefacade.configure(
|
||||
connection=CONF.database.connection,
|
||||
sqlite_synchronous=CONF.database.sqlite_synchronous)
|
||||
self.useFixture(
|
||||
test_fixtures.ReplaceEngineFacadeFixture(
|
||||
sqlalchemy_api.main_context_manager,
|
||||
local_enginefacade))
|
||||
|
||||
# Build schema from models directly, bypassing Alembic's env.py
|
||||
# which would create its own engine via the global enginefacade.
|
||||
engine = local_enginefacade.writer.get_engine()
|
||||
models.Base.metadata.create_all(engine)
|
||||
alembic_cfg = migration._alembic_config()
|
||||
script = ScriptDirectory.from_config(alembic_cfg)
|
||||
with engine.connect() as conn:
|
||||
context = alembic_migration.MigrationContext.configure(conn)
|
||||
context.stamp(script, 'head')
|
||||
conn.commit()
|
||||
|
||||
# SQLite only supports one writer; serialize write txns.
|
||||
self.useFixture(DatabaseWriteLock())
|
||||
|
||||
self.dbapi = dbapi.get_instance()
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
# Copyright 2026 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Threading lock fixture for database write operations in Cyborg tests."""
|
||||
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
import fixtures
|
||||
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
|
||||
|
||||
_DB_WRITE_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class DatabaseWriteLock(fixtures.Fixture):
|
||||
"""Serialize writer transactions across threads in tests.
|
||||
|
||||
SQLite supports only a single writer at a time. WAL mode allows
|
||||
concurrent readers, so only writer transactions need serialization.
|
||||
|
||||
Patches _TransactionContextManager._transaction_scope to acquire
|
||||
a process-global lock around writer transactions.
|
||||
"""
|
||||
|
||||
def _setUp(self):
|
||||
original = (
|
||||
enginefacade._TransactionContextManager._transaction_scope)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _locked_scope(tcm_self, context):
|
||||
if tcm_self._mode is enginefacade._WRITER:
|
||||
_DB_WRITE_LOCK.acquire()
|
||||
try:
|
||||
with original(tcm_self, context) as resource:
|
||||
yield resource
|
||||
finally:
|
||||
_DB_WRITE_LOCK.release()
|
||||
else:
|
||||
with original(tcm_self, context) as resource:
|
||||
yield resource
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
enginefacade._TransactionContextManager,
|
||||
'_transaction_scope',
|
||||
_locked_scope))
|
||||
@@ -0,0 +1,8 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
Cyborg no longer uses eventlet. The eventlet dependency has been
|
||||
removed and Cyborg now uses the native threading backend for
|
||||
oslo.service and the threading RPC server. No configuration
|
||||
changes are required. Packagers should ensure that
|
||||
``oslo.service[threading]`` is installed as a dependency.
|
||||
+1
-2
@@ -5,14 +5,13 @@
|
||||
pbr>=0.11,!=2.1.0 # Apache-2.0
|
||||
pecan>=1.0.0,!=1.0.2,!=1.0.3,!=1.0.4,!=1.2 # BSD
|
||||
WSME>=0.10.1 # MIT
|
||||
eventlet>=0.27.0 # MIT
|
||||
oslo.i18n>=1.5.0 # Apache-2.0
|
||||
oslo.config>=1.1.0,!=4.3.0,!=4.4.0 # Apache-2.0
|
||||
oslo.log>=5.0.0 # Apache-2.0
|
||||
oslo.context>=2.9.0 # Apache-2.0
|
||||
oslo.messaging>=14.1.0 # Apache-2.0
|
||||
oslo.concurrency>=3.26.0 # Apache-2.0
|
||||
oslo.service>=1.0.0,!=1.28.1 # Apache-2.0
|
||||
oslo.service[threading]>=4.4.1 # Apache-2.0
|
||||
oslo.db>=10.0.0 # Apache-2.0
|
||||
os-resource-classes>=0.5.0 # Apache-2.0
|
||||
oslo.upgradecheck>=0.1.0 # Apache-2.0
|
||||
|
||||
Reference in New Issue
Block a user