c333e7e12e
fake_notifier.wait_for_versioned_notifications previously returned None if it timed out. This is counter-intuitive, and several callers were failing to check its return. Every one of these represents a race as it means we can continue past a barrier without its condition having been met. This change makes it default to safe by raising an exception if it times out. Note that it's possible this may subsequently result in new non-deterministic errors in functional. I would consider this a feature, as these were previously hidden. They should be addressed individually. This change highlights several deterministically incorrect uses of wait_for_versioned_notifications which were previously always silently timing out. These are all fixed. We also increase the default timeout from 1 to 10 seconds as we seem to hit the 1 second timeout in practise, e.g.: http://logs.openstack.org/46/578846/14/check/nova-tox-functional/8a444c1/job-output.txt.gz Change-Id: I017d1a31139c9300642dd706eadc265f7c954ca8
146 lines
5.4 KiB
Python
146 lines
5.4 KiB
Python
# Copyright 2013 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import functools
|
|
import pprint
|
|
import threading
|
|
|
|
import oslo_messaging as messaging
|
|
from oslo_serialization import jsonutils
|
|
from oslo_utils import timeutils
|
|
|
|
from nova import rpc
|
|
|
|
|
|
class _Sub(object):
|
|
"""Allow a subscriber to efficiently wait for an event to occur, and
|
|
retrieve events which have occured.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._cond = threading.Condition()
|
|
self._notifications = []
|
|
|
|
def received(self, notification):
|
|
with self._cond:
|
|
self._notifications.append(notification)
|
|
self._cond.notifyAll()
|
|
|
|
def wait_n(self, n, event, timeout):
|
|
"""Wait until at least n notifications have been received, and return
|
|
them. May return less than n notifications if timeout is reached.
|
|
"""
|
|
|
|
with timeutils.StopWatch(timeout) as timer:
|
|
with self._cond:
|
|
while len(self._notifications) < n:
|
|
if timer.expired():
|
|
notifications = pprint.pformat(
|
|
{event: sub._notifications
|
|
for event, sub in VERSIONED_SUBS.items()})
|
|
raise AssertionError(
|
|
"Notification %(event)s hasn't been "
|
|
"received. Received:\n%(notifications)s" % {
|
|
'event': event,
|
|
'notifications': notifications,
|
|
})
|
|
self._cond.wait(timer.leftover())
|
|
|
|
# Return a copy of the notifications list
|
|
return list(self._notifications)
|
|
|
|
|
|
VERSIONED_SUBS = collections.defaultdict(_Sub)
|
|
VERSIONED_NOTIFICATIONS = []
|
|
NOTIFICATIONS = []
|
|
|
|
|
|
def reset():
|
|
del NOTIFICATIONS[:]
|
|
del VERSIONED_NOTIFICATIONS[:]
|
|
VERSIONED_SUBS.clear()
|
|
|
|
|
|
FakeMessage = collections.namedtuple('Message',
|
|
['publisher_id', 'priority',
|
|
'event_type', 'payload', 'context'])
|
|
|
|
|
|
class FakeNotifier(object):
|
|
|
|
def __init__(self, transport, publisher_id, serializer=None):
|
|
self.transport = transport
|
|
self.publisher_id = publisher_id
|
|
self._serializer = serializer or messaging.serializer.NoOpSerializer()
|
|
|
|
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
|
|
setattr(self, priority,
|
|
functools.partial(self._notify, priority.upper()))
|
|
|
|
def prepare(self, publisher_id=None):
|
|
if publisher_id is None:
|
|
publisher_id = self.publisher_id
|
|
return self.__class__(self.transport, publisher_id,
|
|
serializer=self._serializer)
|
|
|
|
def _notify(self, priority, ctxt, event_type, payload):
|
|
payload = self._serializer.serialize_entity(ctxt, payload)
|
|
# NOTE(sileht): simulate the kombu serializer
|
|
# this permit to raise an exception if something have not
|
|
# been serialized correctly
|
|
jsonutils.to_primitive(payload)
|
|
# NOTE(melwitt): Try to serialize the context, as the rpc would.
|
|
# An exception will be raised if something is wrong
|
|
# with the context.
|
|
self._serializer.serialize_context(ctxt)
|
|
msg = FakeMessage(self.publisher_id, priority, event_type,
|
|
payload, ctxt)
|
|
NOTIFICATIONS.append(msg)
|
|
|
|
def is_enabled(self):
|
|
return True
|
|
|
|
|
|
class FakeVersionedNotifier(FakeNotifier):
|
|
def _notify(self, priority, ctxt, event_type, payload):
|
|
payload = self._serializer.serialize_entity(ctxt, payload)
|
|
notification = {'publisher_id': self.publisher_id,
|
|
'priority': priority,
|
|
'event_type': event_type,
|
|
'payload': payload}
|
|
VERSIONED_NOTIFICATIONS.append(notification)
|
|
VERSIONED_SUBS[event_type].received(notification)
|
|
|
|
|
|
def stub_notifier(test):
|
|
test.stub_out('oslo_messaging.Notifier', FakeNotifier)
|
|
if rpc.LEGACY_NOTIFIER and rpc.NOTIFIER:
|
|
test.stub_out('nova.rpc.LEGACY_NOTIFIER',
|
|
FakeNotifier(rpc.LEGACY_NOTIFIER.transport,
|
|
rpc.LEGACY_NOTIFIER.publisher_id,
|
|
serializer=getattr(rpc.LEGACY_NOTIFIER,
|
|
'_serializer',
|
|
None)))
|
|
test.stub_out('nova.rpc.NOTIFIER',
|
|
FakeVersionedNotifier(rpc.NOTIFIER.transport,
|
|
rpc.NOTIFIER.publisher_id,
|
|
serializer=getattr(rpc.NOTIFIER,
|
|
'_serializer',
|
|
None)))
|
|
|
|
|
|
def wait_for_versioned_notifications(event_type, n_events=1, timeout=10.0):
|
|
return VERSIONED_SUBS[event_type].wait_n(n_events, event_type, timeout)
|