diff --git a/nova/tests/functional/notification_sample_tests/notification_sample_base.py b/nova/tests/functional/notification_sample_tests/notification_sample_base.py index f82bb6490ae7..21e2ac5b4d00 100644 --- a/nova/tests/functional/notification_sample_tests/notification_sample_base.py +++ b/nova/tests/functional/notification_sample_tests/notification_sample_base.py @@ -188,3 +188,10 @@ class NotificationSampleTestBase(test.TestCase, return [notification for notification in fake_notifier.VERSIONED_NOTIFICATIONS if notification['event_type'] == event_type] + + def _wait_for_notification(self, event_type, timeout=1.0): + received = fake_notifier.wait_for_versioned_notification( + event_type, timeout) + self.assertTrue( + received, + 'notification %s hasn\'t been received' % event_type) diff --git a/nova/tests/unit/fake_notifier.py b/nova/tests/unit/fake_notifier.py index f938496e1126..5f5979b33f59 100644 --- a/nova/tests/unit/fake_notifier.py +++ b/nova/tests/unit/fake_notifier.py @@ -14,12 +14,15 @@ import collections import functools +import threading import oslo_messaging as messaging from oslo_serialization import jsonutils from nova import rpc + +SUBSCRIBERS = collections.defaultdict(threading.Event) NOTIFICATIONS = [] VERSIONED_NOTIFICATIONS = [] @@ -27,6 +30,7 @@ VERSIONED_NOTIFICATIONS = [] def reset(): del NOTIFICATIONS[:] del VERSIONED_NOTIFICATIONS[:] + SUBSCRIBERS.clear() FakeMessage = collections.namedtuple('Message', @@ -69,10 +73,12 @@ class FakeNotifier(object): class FakeVersionedNotifier(FakeNotifier): def _notify(self, priority, ctxt, event_type, payload): payload = self._serializer.serialize_entity(ctxt, payload) - VERSIONED_NOTIFICATIONS.append({'publisher_id': self.publisher_id, - 'priority': priority, - 'event_type': event_type, - 'payload': payload}) + notification = {'publisher_id': self.publisher_id, + 'priority': priority, + 'event_type': event_type, + 'payload': payload} + VERSIONED_NOTIFICATIONS.append(notification) + _notify_subscribers(notification) def stub_notifier(test): @@ -90,3 +96,16 @@ def stub_notifier(test): serializer=getattr(rpc.NOTIFIER, '_serializer', None))) + + +def wait_for_versioned_notification(event_type, timeout=1.0): + # NOTE: The event stored in SUBSCRIBERS is not used for synchronizing + # the access to shared state as there is no parallel access to + # SUBSCRIBERS because the only parallelism is due to eventlet. + # The event is simply used to avoid polling the list of received + # notifications + return SUBSCRIBERS[event_type].wait(timeout) + + +def _notify_subscribers(notification): + SUBSCRIBERS[notification['event_type']].set()