Raise error on timeout in wait_for_versioned_notifications

fake_notifier.wait_for_versioned_notifications previously returned
None if it timed out. This is counter-intuitive, and several callers
were failing to check its return. Every one of these represents a race
as it means we can continue past a barrier without its condition
having been met.

This change makes it default to safe by raising an exception if it
times out. Note that it's possible this may subsequently result in new
non-deterministic errors in functional. I would consider this a
feature, as these were previously hidden. They should be addressed
individually.

This change highlights several deterministically incorrect uses of
wait_for_versioned_notifications which were previously always silently
timing out. These are all fixed.

We also increase the default timeout from 1 to 10 seconds as we seem
to hit the 1 second timeout in practise, e.g.:

  http://logs.openstack.org/46/578846/14/check/nova-tox-functional/8a444c1/job-output.txt.gz

Change-Id: I017d1a31139c9300642dd706eadc265f7c954ca8
This commit is contained in:
Matthew Booth 2018-09-22 10:22:40 +01:00
parent 1792e2f5e9
commit c333e7e12e
6 changed files with 118 additions and 14 deletions

View File

@ -369,6 +369,9 @@ class ProviderUsageBaseTestCase(test.TestCase, InstanceHelperMixin):
self.useFixture(nova_fixtures.NeutronFixture(self))
self.useFixture(nova_fixtures.AllServicesCurrent())
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
placement = self.useFixture(nova_fixtures.PlacementFixture())
self.placement_api = placement.api
api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(

View File

@ -235,11 +235,11 @@ class NotificationSampleTestBase(test.TestCase,
if notification['event_type'] == event_type]
def _wait_for_notification(self, event_type, timeout=10.0):
notifications = fake_notifier.wait_for_versioned_notifications(
# NOTE(mdbooth): wait_for_versioned_notifications raises an exception
# if it times out since change I017d1a31. Consider removing this
# method.
fake_notifier.wait_for_versioned_notifications(
event_type, timeout=timeout)
self.assertTrue(
len(notifications) > 0,
'notification %s hasn\'t been received' % event_type)
def _wait_for_notifications(self, event_type, expected_count,
timeout=10.0):

View File

@ -48,6 +48,9 @@ class TestParallelEvacuationWithServerGroup(
# 2.14 is needed for evacuate without onSharedStorage flag
self.api.microversion = '2.14'
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
# the image fake backend needed for image discovery
nova.tests.unit.image.fake.stub_out_image_service(self)
self.addCleanup(nova.tests.unit.image.fake.FakeImageService_reset)
@ -70,8 +73,6 @@ class TestParallelEvacuationWithServerGroup(
manager_class = nova.compute.manager.ComputeManager
original_rebuild = manager_class._do_rebuild_instance
self.addCleanup(fake_notifier.reset)
def fake_rebuild(self_, context, instance, *args, **kwargs):
# Simulate that the rebuild request of one of the instances
# reaches the target compute manager significantly later so the
@ -132,8 +133,11 @@ class TestParallelEvacuationWithServerGroup(
self.api.post_server_action(server2['id'], post)
# make sure that the rebuild is started and then finished
# NOTE(mdbooth): We only get 1 rebuild.start notification here because
# we validate server group policy (and therefore fail) before emitting
# rebuild.start.
fake_notifier.wait_for_versioned_notifications(
'instance.rebuild.start', n_events=2)
'instance.rebuild.start', n_events=1)
server1 = self._wait_for_server_parameter(
self.api, server1, {'OS-EXT-STS:task_state': None})
server2 = self._wait_for_server_parameter(

View File

@ -3895,7 +3895,8 @@ class VolumeBackedServerTest(integrated_helpers.ProviderUsageBaseTestCase):
self.addCleanup(fake_notifier.reset)
self.api.post_server_action(server['id'], {'shelve': None})
self._wait_for_state_change(self.api, server, 'SHELVED_OFFLOADED')
fake_notifier.wait_for_versioned_notifications('shelve_offload.end')
fake_notifier.wait_for_versioned_notifications(
'instance.shelve_offload.end')
# The server should not have any allocations since it's not currently
# hosted on any compute service.
allocs = self._get_allocations_by_server_uuid(server['id'])

View File

@ -13,8 +13,8 @@
# under the License.
import collections
import copy
import functools
import pprint
import threading
import oslo_messaging as messaging
@ -38,16 +38,28 @@ class _Sub(object):
self._notifications.append(notification)
self._cond.notifyAll()
def wait_n(self, n, timeout=1.0):
def wait_n(self, n, event, timeout):
"""Wait until at least n notifications have been received, and return
them. May return less than n notifications if timeout is reached.
"""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n and not timer.expired():
while len(self._notifications) < n:
if timer.expired():
notifications = pprint.pformat(
{event: sub._notifications
for event, sub in VERSIONED_SUBS.items()})
raise AssertionError(
"Notification %(event)s hasn't been "
"received. Received:\n%(notifications)s" % {
'event': event,
'notifications': notifications,
})
self._cond.wait(timer.leftover())
return copy.copy(self._notifications)
# Return a copy of the notifications list
return list(self._notifications)
VERSIONED_SUBS = collections.defaultdict(_Sub)
@ -129,5 +141,5 @@ def stub_notifier(test):
None)))
def wait_for_versioned_notifications(event_type, n_events=1, timeout=1.0):
return VERSIONED_SUBS[event_type].wait_n(n_events, timeout=timeout)
def wait_for_versioned_notifications(event_type, n_events=1, timeout=10.0):
return VERSIONED_SUBS[event_type].wait_n(n_events, event_type, timeout)

View File

@ -0,0 +1,84 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from nova import context
from nova import exception_wrapper
from nova import rpc
from nova import test
from nova.tests.unit import fake_notifier
class FakeVersionedNotifierTestCase(test.NoDBTestCase):
def setUp(self):
super(FakeVersionedNotifierTestCase, self).setUp()
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
self.context = context.RequestContext()
_get_notifier = functools.partial(rpc.get_notifier, 'compute')
@exception_wrapper.wrap_exception(get_notifier=_get_notifier,
binary='nova-compute')
def _raise_exception(self, context):
raise test.TestingException
def _generate_exception_notification(self):
self.assertRaises(test.TestingException, self._raise_exception,
self.context)
def test_wait_for_versioned_notifications(self):
# Wait for a single notification which we emitted first
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception')
self.assertEqual(1, len(notifications))
def test_wait_for_versioned_notifications_fail(self):
# Wait for a single notification which is never sent
self.assertRaises(
AssertionError,
fake_notifier.wait_for_versioned_notifications,
'compute.exception', timeout=0.1)
def test_wait_for_versioned_notifications_n(self):
# Wait for 2 notifications which we emitted first
self._generate_exception_notification()
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception', 2)
self.assertEqual(2, len(notifications))
def test_wait_for_versioned_notifications_n_fail(self):
# Wait for 2 notifications when we only emitted one
self._generate_exception_notification()
self.assertRaises(
AssertionError,
fake_notifier.wait_for_versioned_notifications,
'compute.exception', 2, timeout=0.1)
def test_wait_for_versioned_notifications_too_many(self):
# Wait for a single notification when there are 2 in the queue
self._generate_exception_notification()
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception')
self.assertEqual(2, len(notifications))