Merge "Raise error on timeout in wait_for_versioned_notifications"

This commit is contained in:
Zuul 2018-10-02 08:57:43 +00:00 committed by Gerrit Code Review
commit 965941f1e6
6 changed files with 118 additions and 14 deletions

View File

@ -369,6 +369,9 @@ class ProviderUsageBaseTestCase(test.TestCase, InstanceHelperMixin):
self.useFixture(nova_fixtures.NeutronFixture(self))
self.useFixture(nova_fixtures.AllServicesCurrent())
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
placement = self.useFixture(nova_fixtures.PlacementFixture())
self.placement_api = placement.api
api_fixture = self.useFixture(nova_fixtures.OSAPIFixture(

View File

@ -235,11 +235,11 @@ class NotificationSampleTestBase(test.TestCase,
if notification['event_type'] == event_type]
def _wait_for_notification(self, event_type, timeout=10.0):
notifications = fake_notifier.wait_for_versioned_notifications(
# NOTE(mdbooth): wait_for_versioned_notifications raises an exception
# if it times out since change I017d1a31. Consider removing this
# method.
fake_notifier.wait_for_versioned_notifications(
event_type, timeout=timeout)
self.assertTrue(
len(notifications) > 0,
'notification %s hasn\'t been received' % event_type)
def _wait_for_notifications(self, event_type, expected_count,
timeout=10.0):

View File

@ -48,6 +48,9 @@ class TestParallelEvacuationWithServerGroup(
# 2.14 is needed for evacuate without onSharedStorage flag
self.api.microversion = '2.14'
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
# the image fake backend needed for image discovery
nova.tests.unit.image.fake.stub_out_image_service(self)
self.addCleanup(nova.tests.unit.image.fake.FakeImageService_reset)
@ -70,8 +73,6 @@ class TestParallelEvacuationWithServerGroup(
manager_class = nova.compute.manager.ComputeManager
original_rebuild = manager_class._do_rebuild_instance
self.addCleanup(fake_notifier.reset)
def fake_rebuild(self_, context, instance, *args, **kwargs):
# Simulate that the rebuild request of one of the instances
# reaches the target compute manager significantly later so the
@ -132,8 +133,11 @@ class TestParallelEvacuationWithServerGroup(
self.api.post_server_action(server2['id'], post)
# make sure that the rebuild is started and then finished
# NOTE(mdbooth): We only get 1 rebuild.start notification here because
# we validate server group policy (and therefore fail) before emitting
# rebuild.start.
fake_notifier.wait_for_versioned_notifications(
'instance.rebuild.start', n_events=2)
'instance.rebuild.start', n_events=1)
server1 = self._wait_for_server_parameter(
self.api, server1, {'OS-EXT-STS:task_state': None})
server2 = self._wait_for_server_parameter(

View File

@ -3895,7 +3895,8 @@ class VolumeBackedServerTest(integrated_helpers.ProviderUsageBaseTestCase):
self.addCleanup(fake_notifier.reset)
self.api.post_server_action(server['id'], {'shelve': None})
self._wait_for_state_change(self.api, server, 'SHELVED_OFFLOADED')
fake_notifier.wait_for_versioned_notifications('shelve_offload.end')
fake_notifier.wait_for_versioned_notifications(
'instance.shelve_offload.end')
# The server should not have any allocations since it's not currently
# hosted on any compute service.
allocs = self._get_allocations_by_server_uuid(server['id'])

View File

@ -13,8 +13,8 @@
# under the License.
import collections
import copy
import functools
import pprint
import threading
import oslo_messaging as messaging
@ -38,16 +38,28 @@ class _Sub(object):
self._notifications.append(notification)
self._cond.notifyAll()
def wait_n(self, n, timeout=1.0):
def wait_n(self, n, event, timeout):
"""Wait until at least n notifications have been received, and return
them. May return less than n notifications if timeout is reached.
"""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n and not timer.expired():
while len(self._notifications) < n:
if timer.expired():
notifications = pprint.pformat(
{event: sub._notifications
for event, sub in VERSIONED_SUBS.items()})
raise AssertionError(
"Notification %(event)s hasn't been "
"received. Received:\n%(notifications)s" % {
'event': event,
'notifications': notifications,
})
self._cond.wait(timer.leftover())
return copy.copy(self._notifications)
# Return a copy of the notifications list
return list(self._notifications)
VERSIONED_SUBS = collections.defaultdict(_Sub)
@ -129,5 +141,5 @@ def stub_notifier(test):
None)))
def wait_for_versioned_notifications(event_type, n_events=1, timeout=1.0):
return VERSIONED_SUBS[event_type].wait_n(n_events, timeout=timeout)
def wait_for_versioned_notifications(event_type, n_events=1, timeout=10.0):
return VERSIONED_SUBS[event_type].wait_n(n_events, event_type, timeout)

View File

@ -0,0 +1,84 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from nova import context
from nova import exception_wrapper
from nova import rpc
from nova import test
from nova.tests.unit import fake_notifier
class FakeVersionedNotifierTestCase(test.NoDBTestCase):
def setUp(self):
super(FakeVersionedNotifierTestCase, self).setUp()
fake_notifier.stub_notifier(self)
self.addCleanup(fake_notifier.reset)
self.context = context.RequestContext()
_get_notifier = functools.partial(rpc.get_notifier, 'compute')
@exception_wrapper.wrap_exception(get_notifier=_get_notifier,
binary='nova-compute')
def _raise_exception(self, context):
raise test.TestingException
def _generate_exception_notification(self):
self.assertRaises(test.TestingException, self._raise_exception,
self.context)
def test_wait_for_versioned_notifications(self):
# Wait for a single notification which we emitted first
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception')
self.assertEqual(1, len(notifications))
def test_wait_for_versioned_notifications_fail(self):
# Wait for a single notification which is never sent
self.assertRaises(
AssertionError,
fake_notifier.wait_for_versioned_notifications,
'compute.exception', timeout=0.1)
def test_wait_for_versioned_notifications_n(self):
# Wait for 2 notifications which we emitted first
self._generate_exception_notification()
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception', 2)
self.assertEqual(2, len(notifications))
def test_wait_for_versioned_notifications_n_fail(self):
# Wait for 2 notifications when we only emitted one
self._generate_exception_notification()
self.assertRaises(
AssertionError,
fake_notifier.wait_for_versioned_notifications,
'compute.exception', 2, timeout=0.1)
def test_wait_for_versioned_notifications_too_many(self):
# Wait for a single notification when there are 2 in the queue
self._generate_exception_notification()
self._generate_exception_notification()
notifications = fake_notifier.wait_for_versioned_notifications(
'compute.exception')
self.assertEqual(2, len(notifications))