introduce global greenpool
This change add a global greenpool which is used to manage the greenthreads created via nova.utils.spawn(_n). A test fixture is also added to use an isolated greenpool which will raise an exception if a greenthread is leaked. the fixture will optionally raise if greenlets are leaked. This is enabled by unit test by default and is configurable for functional tests. This change removes all greenthread leaks from the unit and functional tests that were detected. 7 functional tests still leak greenlets but they have no obvious cause. as such greenlet leaks are not treated as errors for funtional tests by default. Greenthread leaks are always treated as errors. Set NOVA_RAISE_ON_GREENLET_LEAK=1|true|yes when invoking tox to make greenlet leaks an error for functional tests. Change-Id: I73b4684744b340bfb80da08537a745167ddea106
This commit is contained in:
parent
927eeb54f2
commit
d71d2dc219
@ -60,6 +60,14 @@ entry.
|
||||
cfg.StrOpt(
|
||||
'tempdir',
|
||||
help='Explicitly specify the temporary working directory.'),
|
||||
cfg.IntOpt(
|
||||
'default_green_pool_size',
|
||||
default=1000,
|
||||
min=100,
|
||||
help='''
|
||||
The total number of coroutines that can be run via nova's default
|
||||
greenthread pool concurrently, defaults to 1000, min value is 100.
|
||||
'''),
|
||||
]
|
||||
|
||||
|
||||
|
@ -192,6 +192,7 @@ class TestCase(base.BaseTestCase):
|
||||
self._service_fixture_count = collections.defaultdict(int)
|
||||
|
||||
self.useFixture(nova_fixtures.OpenStackSDKFixture())
|
||||
self.useFixture(nova_fixtures.IsolatedGreenPoolFixture(self.id()))
|
||||
|
||||
self.useFixture(log_fixture.get_logging_handle_error_fixture())
|
||||
|
||||
|
59
nova/tests/fixtures/nova.py
vendored
59
nova/tests/fixtures/nova.py
vendored
@ -42,6 +42,7 @@ import oslo_messaging as messaging
|
||||
from oslo_messaging import conffixture as messaging_conffixture
|
||||
from oslo_privsep import daemon as privsep_daemon
|
||||
from oslo_utils.fixture import uuidsentinel
|
||||
from oslo_utils import strutils
|
||||
from requests import adapters
|
||||
from sqlalchemy import exc as sqla_exc
|
||||
from wsgi_intercept import interceptor
|
||||
@ -1135,6 +1136,64 @@ class IndirectionAPIFixture(fixtures.Fixture):
|
||||
self.addCleanup(self.cleanup)
|
||||
|
||||
|
||||
class IsolatedGreenPoolFixture(fixtures.Fixture):
|
||||
"""isolate each test to a dedicated greenpool.
|
||||
|
||||
Replace the default shared greenpool with a pre test greenpool
|
||||
and wait for all greenthreads to finish in test cleanup.
|
||||
"""
|
||||
|
||||
def __init__(self, test):
|
||||
self.test_case_id = test
|
||||
|
||||
def _setUp(self):
|
||||
self.greenpool = eventlet.greenpool.GreenPool()
|
||||
|
||||
def _get_default_green_pool():
|
||||
return self.greenpool
|
||||
# NOTE(sean-k-mooney): greenpools use eventlet.spawn and
|
||||
# eventlet.spawn_n so we can't stub out all calls to those functions.
|
||||
# Instead since nova only creates greenthreads directly via nova.utils
|
||||
# we stub out the default green pool. This will not capture
|
||||
# Greenthreads created via the standard lib threading module.
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'nova.utils._get_default_green_pool', _get_default_green_pool))
|
||||
self.addCleanup(self.do_cleanup)
|
||||
|
||||
def do_cleanup(self):
|
||||
running = self.greenpool.running()
|
||||
if running:
|
||||
# kill all greenthreads in the pool before raising to prevent
|
||||
# them from interfering with other tests.
|
||||
for gt in list(self.greenpool.coroutines_running):
|
||||
if isinstance(gt, eventlet.greenthread.GreenThread):
|
||||
gt.kill()
|
||||
# reset the global greenpool just in case.
|
||||
utils.DEFAULT_GREEN_POOL = eventlet.greenpool.GreenPool()
|
||||
if any(
|
||||
isinstance(gt, eventlet.greenthread.GreenThread)
|
||||
for gt in self.greenpool.coroutines_running
|
||||
):
|
||||
raise RuntimeError(
|
||||
f'detected leaked greenthreads in {self.test_case_id}')
|
||||
elif (len(self.greenpool.coroutines_running) > 0 and
|
||||
strutils.bool_from_string(os.getenv(
|
||||
"NOVA_RAISE_ON_GREENLET_LEAK", "0")
|
||||
)):
|
||||
raise RuntimeError(
|
||||
f'detected leaked greenlets in {self.test_case_id}')
|
||||
else:
|
||||
self.addDetail(
|
||||
'IsolatedGreenPoolFixture',
|
||||
f'no leaked greenthreads detected in {self.test_case_id} '
|
||||
'but some greenlets were running when the test finished.'
|
||||
'They cannot be killed so they may interact with '
|
||||
'other tests if they raise exceptions. '
|
||||
'These greenlets were likely created by spawn_n and'
|
||||
'and therefore are not expected to return or raise.'
|
||||
)
|
||||
|
||||
|
||||
class _FakeGreenThread(object):
|
||||
def __init__(self, func, *args, **kwargs):
|
||||
self._result = func(*args, **kwargs)
|
||||
|
@ -1244,6 +1244,10 @@ class _IntegratedTestBase(test.TestCase, PlacementInstanceHelperMixin):
|
||||
def _setup_conductor_service(self):
|
||||
return self.start_service('conductor')
|
||||
|
||||
def _stop_computes(self):
|
||||
for compute in self.computes.values():
|
||||
compute.stop()
|
||||
|
||||
def _setup_services(self):
|
||||
# NOTE(danms): Set the global MQ connection to that of our first cell
|
||||
# for any cells-ignorant code. Normally this is defaulted in the tests
|
||||
@ -1252,8 +1256,11 @@ class _IntegratedTestBase(test.TestCase, PlacementInstanceHelperMixin):
|
||||
self.flags(transport_url=self.cell_mappings['cell1'].transport_url)
|
||||
|
||||
self.conductor = self._setup_conductor_service()
|
||||
self.addCleanup(self.conductor.stop)
|
||||
self.scheduler = self._setup_scheduler_service()
|
||||
self.addCleanup(self.scheduler.stop)
|
||||
self.compute = self._setup_compute_service()
|
||||
self.addCleanup(self._stop_computes)
|
||||
|
||||
self.api_fixture = self.useFixture(
|
||||
nova_fixtures.OSAPIFixture(
|
||||
|
@ -327,6 +327,7 @@ class TestWSGIService(test.NoDBTestCase):
|
||||
mock_get.return_value = None
|
||||
test_service = service.WSGIService("test_service")
|
||||
test_service.start()
|
||||
self.addCleanup(test_service.stop)
|
||||
self.assertTrue(mock_create.called)
|
||||
|
||||
@mock.patch('nova.objects.Service.get_by_host_and_binary')
|
||||
@ -334,14 +335,15 @@ class TestWSGIService(test.NoDBTestCase):
|
||||
def test_service_start_does_not_create_record(self, mock_create, mock_get):
|
||||
test_service = service.WSGIService("test_service")
|
||||
test_service.start()
|
||||
self.addCleanup(test_service.stop)
|
||||
self.assertFalse(mock_create.called)
|
||||
|
||||
@mock.patch('nova.objects.Service.get_by_host_and_binary')
|
||||
def test_service_random_port(self, mock_get):
|
||||
test_service = service.WSGIService("test_service")
|
||||
test_service.start()
|
||||
self.addCleanup(test_service.stop)
|
||||
self.assertNotEqual(0, test_service.port)
|
||||
test_service.stop()
|
||||
|
||||
def test_workers_set_default(self):
|
||||
test_service = service.WSGIService("osapi_compute")
|
||||
@ -367,9 +369,9 @@ class TestWSGIService(test.NoDBTestCase):
|
||||
CONF.set_default("test_service_listen", "::1")
|
||||
test_service = service.WSGIService("test_service")
|
||||
test_service.start()
|
||||
self.addCleanup(test_service.stop)
|
||||
self.assertEqual("::1", test_service.host)
|
||||
self.assertNotEqual(0, test_service.port)
|
||||
test_service.stop()
|
||||
|
||||
@mock.patch('nova.objects.Service.get_by_host_and_binary')
|
||||
def test_reset_pool_size_to_default(self, mock_get):
|
||||
@ -383,6 +385,7 @@ class TestWSGIService(test.NoDBTestCase):
|
||||
# Resetting pool size to default
|
||||
test_service.reset()
|
||||
test_service.start()
|
||||
self.addCleanup(test_service.stop)
|
||||
self.assertEqual(test_service.server._pool.size,
|
||||
CONF.wsgi.default_pool_size)
|
||||
|
||||
|
@ -18,7 +18,6 @@ import os.path
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
import eventlet
|
||||
import fixtures
|
||||
from keystoneauth1 import adapter as ks_adapter
|
||||
from keystoneauth1.identity import base as ks_identity
|
||||
@ -772,8 +771,8 @@ class SpawnNTestCase(test.NoDBTestCase):
|
||||
|
||||
def fake(arg):
|
||||
pass
|
||||
|
||||
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
|
||||
pool = utils._get_default_green_pool()
|
||||
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
|
||||
getattr(utils, self.spawn_name)(fake, 'test')
|
||||
self.assertIsNone(common_context.get_current())
|
||||
|
||||
@ -790,7 +789,8 @@ class SpawnNTestCase(test.NoDBTestCase):
|
||||
def fake(context, kwarg1=None):
|
||||
pass
|
||||
|
||||
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
|
||||
pool = utils._get_default_green_pool()
|
||||
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
|
||||
getattr(utils, self.spawn_name)(fake, ctxt, kwarg1='test')
|
||||
self.assertEqual(ctxt, common_context.get_current())
|
||||
|
||||
@ -810,7 +810,8 @@ class SpawnNTestCase(test.NoDBTestCase):
|
||||
def fake(context, kwarg1=None):
|
||||
pass
|
||||
|
||||
with mock.patch.object(eventlet, self.spawn_name, _fake_spawn):
|
||||
pool = utils._get_default_green_pool()
|
||||
with mock.patch.object(pool, self.spawn_name, _fake_spawn):
|
||||
getattr(utils, self.spawn_name)(fake, ctxt_passed, kwarg1='test')
|
||||
self.assertEqual(ctxt, common_context.get_current())
|
||||
|
||||
|
@ -192,6 +192,7 @@ class TestWSGIServer(test.NoDBTestCase):
|
||||
# Resetting pool size to default
|
||||
server.reset()
|
||||
server.start()
|
||||
self.addCleanup(server.stop)
|
||||
self.assertEqual(server._pool.size, CONF.wsgi.default_pool_size)
|
||||
|
||||
def test_client_socket_timeout(self):
|
||||
@ -199,30 +200,28 @@ class TestWSGIServer(test.NoDBTestCase):
|
||||
|
||||
# mocking eventlet spawn method to check it is called with
|
||||
# configured 'client_socket_timeout' value.
|
||||
with mock.patch.object(eventlet,
|
||||
'spawn') as mock_spawn:
|
||||
with mock.patch('nova.utils.spawn') as mock_spawn:
|
||||
server = nova.wsgi.Server("test_app", None,
|
||||
host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
self.addCleanup(server.stop)
|
||||
_, kwargs = mock_spawn.call_args
|
||||
self.assertEqual(CONF.wsgi.client_socket_timeout,
|
||||
kwargs['socket_timeout'])
|
||||
server.stop()
|
||||
|
||||
def test_keep_alive(self):
|
||||
self.flags(keep_alive=False, group='wsgi')
|
||||
|
||||
# mocking eventlet spawn method to check it is called with
|
||||
# configured 'keep_alive' value.
|
||||
with mock.patch.object(eventlet,
|
||||
'spawn') as mock_spawn:
|
||||
with mock.patch('nova.utils.spawn') as mock_spawn:
|
||||
server = nova.wsgi.Server("test_app", None,
|
||||
host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
self.addCleanup(server.stop)
|
||||
_, kwargs = mock_spawn.call_args
|
||||
self.assertEqual(CONF.wsgi.keep_alive,
|
||||
kwargs['keepalive'])
|
||||
server.stop()
|
||||
|
||||
|
||||
@testtools.skip("bug/1482633: test hangs on Python 3")
|
||||
|
@ -462,6 +462,8 @@ class HostTestCase(test.NoDBTestCase):
|
||||
h.initialize()
|
||||
|
||||
h.get_connection()
|
||||
h._dispatch_conn_event()
|
||||
|
||||
event.wait()
|
||||
# This test will timeout if it fails. Success is implicit in a
|
||||
# timely return from wait(), indicating that the connection event
|
||||
|
@ -80,6 +80,17 @@ _FILE_CACHE = {}
|
||||
|
||||
_SERVICE_TYPES = service_types.ServiceTypes()
|
||||
|
||||
DEFAULT_GREEN_POOL = None
|
||||
|
||||
|
||||
def _get_default_green_pool():
|
||||
global DEFAULT_GREEN_POOL
|
||||
if DEFAULT_GREEN_POOL is None:
|
||||
DEFAULT_GREEN_POOL = eventlet.greenpool.GreenPool(
|
||||
CONF.default_green_pool_size
|
||||
)
|
||||
return DEFAULT_GREEN_POOL
|
||||
|
||||
|
||||
# NOTE(mikal): this seems to have to stay for now to handle os-brick
|
||||
# requirements. This makes me a sad panda.
|
||||
@ -634,7 +645,6 @@ def _serialize_profile_info():
|
||||
|
||||
def pass_context(runner, func, *args, **kwargs):
|
||||
"""Generalised passthrough method
|
||||
|
||||
It will grab the context from the threadlocal store and add it to
|
||||
the store on the runner. This allows for continuity in logging the
|
||||
context when using this method to spawn a new thread through the
|
||||
@ -667,11 +677,11 @@ def spawn(func, *args, **kwargs):
|
||||
context when using this method to spawn a new thread.
|
||||
"""
|
||||
|
||||
return pass_context(eventlet.spawn, func, *args, **kwargs)
|
||||
return pass_context(_get_default_green_pool().spawn, func, *args, **kwargs)
|
||||
|
||||
|
||||
def spawn_n(func, *args, **kwargs):
|
||||
"""Passthrough method for eventlet.spawn_n.
|
||||
"""Passthrough method for eventlet.greenpool.spawn_n.
|
||||
|
||||
This utility exists so that it can be stubbed for testing without
|
||||
interfering with the service spawns.
|
||||
@ -680,7 +690,8 @@ def spawn_n(func, *args, **kwargs):
|
||||
the store on the new thread. This allows for continuity in logging the
|
||||
context when using this method to spawn a new thread.
|
||||
"""
|
||||
pass_context(eventlet.spawn_n, func, *args, **kwargs)
|
||||
|
||||
pass_context(_get_default_green_pool().spawn_n, func, *args, **kwargs)
|
||||
|
||||
|
||||
def tpool_execute(func, *args, **kwargs):
|
||||
|
@ -498,6 +498,9 @@ class Host(object):
|
||||
LOG.debug("Starting green dispatch thread")
|
||||
utils.spawn(self._dispatch_thread)
|
||||
|
||||
LOG.debug("Starting connection event dispatch thread")
|
||||
utils.spawn(self._conn_event_thread)
|
||||
|
||||
def _get_new_connection(self):
|
||||
# call with _wrapped_conn_lock held
|
||||
LOG.debug('Connecting to libvirt: %s', self._uri)
|
||||
@ -617,9 +620,6 @@ class Host(object):
|
||||
libvirt.virEventRegisterDefaultImpl()
|
||||
self._init_events()
|
||||
|
||||
LOG.debug("Starting connection event dispatch thread")
|
||||
utils.spawn(self._conn_event_thread)
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,
|
||||
|
17
tox.ini
17
tox.ini
@ -30,6 +30,16 @@ extras =
|
||||
passenv =
|
||||
OS_DEBUG
|
||||
GENERATE_HASHES
|
||||
# Note(sean-k-mooney):
|
||||
# leaking greanthreads between tests is a bug so we should fail if it happens
|
||||
# however unlike greenthreads, greenlets cannot be killed via a test fixture.
|
||||
# greenlet leaks will be annotated in the test details but will not cause a
|
||||
# failure. if you want to make them raise set
|
||||
# NOVA_RAISE_ON_GREENLET_LEAK=1|true|yes
|
||||
# All simiple leaks of green threads have been resolved the remaining 7
|
||||
# functional test failures where greenlet leaks happen are non trivial
|
||||
# to address as there is no obvious cause so they are ignored for now.
|
||||
NOVA_RAISE_ON_GREENLET_LEAK
|
||||
# NOTE(sean-k-mooney) optimization is enabled by default and when enabled
|
||||
# asserts are complied out. Disable optimization to allow asserts in
|
||||
# nova to fire in unit and functional tests. This can be useful for
|
||||
@ -42,6 +52,13 @@ commands =
|
||||
env TEST_OSPROFILER=1 stestr run --combine --no-discover 'nova.tests.unit.test_profiler'
|
||||
stestr slowest
|
||||
|
||||
[testenv:{unit,py3,py38,py39,py310,py311}]
|
||||
setenv =
|
||||
{[testenv]setenv}
|
||||
# we do not have any greenlet leaks in unit tests so enforce that
|
||||
# by making greenlet leaks a failure.
|
||||
NOVA_RAISE_ON_GREENLET_LEAK=True
|
||||
|
||||
[testenv:functional{,-py38,-py39,-py310,-py311}]
|
||||
description =
|
||||
Run functional tests.
|
||||
|
Loading…
Reference in New Issue
Block a user