Merge "Remove the `eventlet.Timeout` testcase capture"

This commit is contained in:
Zuul
2025-10-30 09:59:37 +00:00
committed by Gerrit Code Review
5 changed files with 15 additions and 45 deletions

View File

@@ -17,7 +17,6 @@
"""
import abc
import contextlib
import functools
import inspect
import logging
@@ -28,7 +27,6 @@ import threading
from unittest import mock
import warnings
import eventlet.timeout
import fixtures
from neutron_lib.callbacks import manager as registry_manager
from neutron_lib.db import api as db_api
@@ -176,8 +174,6 @@ def _catch_errors(f):
def func(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except eventlet.Timeout as e:
self.fail('Execution of this test timed out: %s' % e)
except db_exceptions.DBReferenceError:
# TODO(ralonsoh): fix the ``DBReferenceError`` issues in the functional
# tests. This is retrying the test execution once more. If it fails
@@ -200,10 +196,6 @@ class _CatchErrorsMetaclass(abc.ABCMeta):
setattr(cls, name, _catch_errors(method))
# Test worker cannot survive eventlet's Timeout exception, which effectively
# kills the whole worker, with all test cases scheduled to it. This metaclass
# makes all test cases convert Timeout exceptions into unittest friendly
# failure mode (self.fail).
class DietTestCase(base.BaseTestCase, metaclass=_CatchErrorsMetaclass):
"""Same great taste, less filling.
@@ -305,13 +297,6 @@ class DietTestCase(base.BaseTestCase, metaclass=_CatchErrorsMetaclass):
# This makes sys.exit(0) still a failure
self.force_failure = True
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def assertOrderedEqual(self, expected, actual):
expect_val = self.sort_dict_lists(expected)
actual_val = self.sort_dict_lists(actual)

View File

@@ -227,18 +227,17 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
if ha:
self.wait_until_ha_router_has_state(router, 'primary')
with self.assert_max_execution_time(100):
assert_num_of_conntrack_rules(0)
assert_num_of_conntrack_rules(0)
self.assertTrue(netcat.test_connectivity())
assert_num_of_conntrack_rules(1)
self.assertTrue(netcat.test_connectivity())
assert_num_of_conntrack_rules(1)
clean_fips(router)
router.process()
assert_num_of_conntrack_rules(0)
clean_fips(router)
router.process()
assert_num_of_conntrack_rules(0)
with testtools.ExpectedException(RuntimeError):
netcat.test_connectivity()
with testtools.ExpectedException(RuntimeError):
netcat.test_connectivity()
def _test_update_floatingip_statuses(self, router_info):
router = self.manage_router(self.agent, router_info)

View File

@@ -116,12 +116,10 @@ class TestAsyncProcess(base.BaseTestCase):
thread_exit_event = threading.Event()
# Ensure the test times out eventually if the watcher loops endlessly
with self.assert_max_execution_time():
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(
callback, kill_event, thread_exit_event)
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(
callback, kill_event, thread_exit_event)
if not kill_event.is_set():
func.assert_called_once_with()

View File

@@ -14,21 +14,12 @@
from unittest import mock
from ovsdbapp import exceptions
from ovsdbapp.schema.open_vswitch import impl_idl
import testtools
from neutron.tests import base
class TransactionTestCase(base.BaseTestCase):
def test_commit_raises_exception_on_timeout(self):
transaction = impl_idl.OvsVsctlTransaction(mock.sentinel,
mock.Mock(), 1)
with self.assert_max_execution_time(10):
with testtools.ExpectedException(exceptions.TimeoutException):
transaction.commit()
def test_post_commit_does_not_raise_exception(self):
with mock.patch.object(impl_idl.OvsVsctlTransaction,
"do_post_commit", side_effect=Exception):

View File

@@ -83,12 +83,9 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
tenant,
rbac_db_models.ACCESS_SHARED)
self.plugin.create_rbac_policy(self.context, policy)
# Give server maximum of 10 seconds to make sure we don't hit DB
# retry mechanism when resource already exists
with self.assert_max_execution_time(10):
with testtools.ExpectedException(
ext_rbac.DuplicateRbacPolicy):
self.plugin.create_rbac_policy(self.context, policy)
with testtools.ExpectedException(
ext_rbac.DuplicateRbacPolicy):
self.plugin.create_rbac_policy(self.context, policy)
def test_update_network_rbac_external_valid(self):
orig_target = 'test-tenant-2'