Prevent leaked eventlets to send notifications

In out functional tests we run nova services as eventlets. Also those
services can spawn there own eventlets for RPC or other parallel
processing. The test case executor only sees and tracks the main
eventlet where the code of the test case is running. When that is
finishes the test executor considers the test case to be finished
regardless of the other spawned eventlets. This could lead to leaked
eventlets that are running in parallel with later test cases.

One way that it can cause trouble is via the global variables in
nova.rpc module. Those globals are re-initialized for each test case so
they are not directly leaking information between test cases. However if
a late eventlet calls nova.rpc.get_versioned_notifier() it will get a
totally usable FakeVersionedNotifier object regardless of which test
case this notifier is belongs to or which test case the eventlet belongs
to. This way the late eventlet can send a notification to the currently
running test case and therefore can make it fail.

The current case we saw is the following:

1) The test case
  nova.tests.functional.test_servers.ServersTestV219.test_description_errors
  creates a server but don't wait for it to reach terminal state (ACTIVE
  / ERROR). This test case finishes quickly but leaks running eventlets
  in the background waiting for some RPC call to return.
2) As the test case finished the cleanup code deletes the test case
   specific setup, including the DB.
3) The test executor moves forward and starts running another test case
4) 60 seconds later the leaked eventlet times out waiting for the RPC
   call to return and tries doing things, but fails as the DB is already
   gone. Then it tries to  report this as an error notification. It calls
   nova.rpc.get_versioned_notifier() and gets a fresh notifier that is
   connected to the currently running test case. Then emits the error
   notification there.
5) The currently running test case also waits for an error notification
   to be triggered by the currently running test code. But it gets the
   notification form the late eventlet first. As the content of the
   notification does not match with the expectations the currently
   running test case fails. The late eventlet prints a lot of
   error about the DB being gone making the troubleshooting pretty hard.

This patch proposes a way to fix this by marking each eventlet at spawn
time with the id of the test case that was directly or indirectly
started it.

Then when the NotificationFixture gets a notification it compares the
test case id stored in the calling eventlet with the id of the test case
initialized the NotificationFixture. If the two ids do not match then
the fixture ignores the notification and raises an exception to the
caller eventlet to make it terminate.

Change-Id: I012dcf63306bae624dc4f66aae6c6d96a20d4327
Closes-Bug: #1946339
(cherry picked from commit 61fc81a676)
This commit is contained in:
Balazs Gibizer 2021-10-14 18:09:18 +02:00
parent e6c6880465
commit f24e0c1da2
3 changed files with 122 additions and 2 deletions

View File

@ -177,6 +177,9 @@ class TestCase(base.BaseTestCase):
with fixtures.EnvironmentVariable('OS_LOG_CAPTURE', '0'): with fixtures.EnvironmentVariable('OS_LOG_CAPTURE', '0'):
super(TestCase, self).setUp() super(TestCase, self).setUp()
self.useFixture(
nova_fixtures.PropagateTestCaseIdToChildEventlets(self.id()))
# How many of which service we've started. {$service-name: $count} # How many of which service we've started. {$service-name: $count}
self._service_fixture_count = collections.defaultdict(int) self._service_fixture_count = collections.defaultdict(int)

View File

@ -14,6 +14,7 @@ import collections
import functools import functools
import threading import threading
import eventlet
import fixtures import fixtures
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
@ -76,6 +77,7 @@ class FakeNotifier(object):
def __init__( def __init__(
self, transport, publisher_id, serializer=None, parent=None, self, transport, publisher_id, serializer=None, parent=None,
test_case_id=None
): ):
self.transport = transport self.transport = transport
self.publisher_id = publisher_id self.publisher_id = publisher_id
@ -92,6 +94,8 @@ class FakeNotifier(object):
functools.partial(self._notify, priority.upper()), functools.partial(self._notify, priority.upper()),
) )
self.test_case_id = test_case_id
def prepare(self, publisher_id=None): def prepare(self, publisher_id=None):
if publisher_id is None: if publisher_id is None:
publisher_id = self.publisher_id publisher_id = self.publisher_id
@ -99,6 +103,7 @@ class FakeNotifier(object):
return self.__class__( return self.__class__(
self.transport, publisher_id, self.transport, publisher_id,
serializer=self._serializer, parent=self, serializer=self._serializer, parent=self,
test_case_id=self.test_case_id
) )
def _notify(self, priority, ctxt, event_type, payload): def _notify(self, priority, ctxt, event_type, payload):
@ -130,8 +135,10 @@ class FakeNotifier(object):
class FakeVersionedNotifier(FakeNotifier): class FakeVersionedNotifier(FakeNotifier):
def __init__( def __init__(
self, transport, publisher_id, serializer=None, parent=None, self, transport, publisher_id, serializer=None, parent=None,
test_case_id=None
): ):
super().__init__(transport, publisher_id, serializer) super().__init__(
transport, publisher_id, serializer, test_case_id=test_case_id)
if parent: if parent:
self.versioned_notifications = parent.versioned_notifications self.versioned_notifications = parent.versioned_notifications
else: else:
@ -142,7 +149,31 @@ class FakeVersionedNotifier(FakeNotifier):
else: else:
self.subscriptions = collections.defaultdict(_Sub) self.subscriptions = collections.defaultdict(_Sub)
@staticmethod
def _get_sender_test_case_id():
current = eventlet.getcurrent()
# NOTE(gibi) not all eventlet spawn is under our control, so there can
# be senders without test_case_id set, find the first ancestor that
# was spawned from nova.utils.spawn[_n] and therefore has the id set.
while not getattr(current, 'test_case_id', None):
current = current.parent
return current.test_case_id
def _notify(self, priority, ctxt, event_type, payload): def _notify(self, priority, ctxt, event_type, payload):
sender_test_case_id = self._get_sender_test_case_id()
# NOTE(gibi): this is here to prevent late notifications from already
# finished test cases to break the currently running test case. See
# more in https://bugs.launchpad.net/nova/+bug/1946339
if sender_test_case_id != self.test_case_id:
raise RuntimeError(
'FakeVersionedNotifier received %s notification emitted by %s '
'test case which is different from the currently running test '
'case %s. This notification is ignored. The sender test case '
'probably leaked a running eventlet that emitted '
'notifications after the test case finished. Now this eventlet'
'is terminated by raising this exception.' %
(event_type, sender_test_case_id, self.test_case_id))
payload = self._serializer.serialize_entity(ctxt, payload) payload = self._serializer.serialize_entity(ctxt, payload)
notification = { notification = {
'publisher_id': self.publisher_id, 'publisher_id': self.publisher_id,
@ -180,7 +211,9 @@ class NotificationFixture(fixtures.Fixture):
self.fake_versioned_notifier = FakeVersionedNotifier( self.fake_versioned_notifier = FakeVersionedNotifier(
rpc.NOTIFIER.transport, rpc.NOTIFIER.transport,
rpc.NOTIFIER.publisher_id, rpc.NOTIFIER.publisher_id,
serializer=getattr(rpc.NOTIFIER, '_serializer', None)) serializer=getattr(rpc.NOTIFIER, '_serializer', None),
test_case_id=self.test.id()
)
if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER: if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier) self.test.stub_out('nova.rpc.LEGACY_NOTIFIER', self.fake_notifier)
self.test.stub_out( self.test.stub_out(

View File

@ -18,10 +18,12 @@
import collections import collections
from contextlib import contextmanager from contextlib import contextmanager
import functools
import logging as std_logging import logging as std_logging
import os import os
import warnings import warnings
import eventlet
import fixtures import fixtures
import futurist import futurist
import mock import mock
@ -56,6 +58,7 @@ from nova import rpc
from nova.scheduler import weights from nova.scheduler import weights
from nova import service from nova import service
from nova.tests.functional.api import client from nova.tests.functional.api import client
from nova import utils
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -1516,3 +1519,84 @@ class GenericPoisonFixture(fixtures.Fixture):
except ImportError: except ImportError:
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
meth, poison_configure(meth, why))) meth, poison_configure(meth, why)))
class PropagateTestCaseIdToChildEventlets(fixtures.Fixture):
"""A fixture that adds the currently running test case id to each spawned
eventlet. This information then later used by the NotificationFixture to
detect if a notification was emitted by an eventlet that was spawned by a
previous test case so such late notification can be ignored. For more
background about what issues this can prevent see
https://bugs.launchpad.net/nova/+bug/1946339
"""
def __init__(self, test_case_id):
self.test_case_id = test_case_id
def setUp(self):
super().setUp()
# set the id on the main eventlet
c = eventlet.getcurrent()
c.test_case_id = self.test_case_id
orig_spawn = utils.spawn
def wrapped_spawn(func, *args, **kwargs):
# This is still runs before the eventlet.spawn so read the id for
# propagation
caller = eventlet.getcurrent()
# If there is no id set on us that means we were spawned with other
# than nova.utils.spawn or spawn_n so the id propagation chain got
# broken. We fall back to self.test_case_id from the fixture which
# is good enough
caller_test_case_id = getattr(
caller, 'test_case_id', None) or self.test_case_id
@functools.wraps(func)
def test_case_id_wrapper(*args, **kwargs):
# This runs after the eventlet.spawn in the new child.
# Propagate the id from our caller eventlet
current = eventlet.getcurrent()
current.test_case_id = caller_test_case_id
return func(*args, **kwargs)
# call the original spawn to create the child but with our
# new wrapper around its target
return orig_spawn(test_case_id_wrapper, *args, **kwargs)
# let's replace nova.utils.spawn with the wrapped one that injects
# our initialization to the child eventlet
self.useFixture(
fixtures.MonkeyPatch('nova.utils.spawn', wrapped_spawn))
# now do the same with spawn_n
orig_spawn_n = utils.spawn_n
def wrapped_spawn_n(func, *args, **kwargs):
# This is still runs before the eventlet.spawn so read the id for
# propagation
caller = eventlet.getcurrent()
# If there is no id set on us that means we were spawned with other
# than nova.utils.spawn or spawn_n so the id propagation chain got
# broken. We fall back to self.test_case_id from the fixture which
# is good enough
caller_test_case_id = getattr(
caller, 'test_case_id', None) or self.test_case_id
@functools.wraps(func)
def test_case_id_wrapper(*args, **kwargs):
# This runs after the eventlet.spawn in the new child.
# Propagate the id from our caller eventlet
current = eventlet.getcurrent()
current.test_case_id = caller_test_case_id
return func(*args, **kwargs)
# call the original spawn_n to create the child but with our
# new wrapper around its target
return orig_spawn_n(test_case_id_wrapper, *args, **kwargs)
# let's replace nova.utils.spawn_n with the wrapped one that injects
# our initialization to the child eventlet
self.useFixture(
fixtures.MonkeyPatch('nova.utils.spawn_n', wrapped_spawn_n))