Merge "support polling free notification testing"
This commit is contained in:
commit
9be228a42b
@ -188,3 +188,10 @@ class NotificationSampleTestBase(test.TestCase,
|
|||||||
return [notification for notification
|
return [notification for notification
|
||||||
in fake_notifier.VERSIONED_NOTIFICATIONS
|
in fake_notifier.VERSIONED_NOTIFICATIONS
|
||||||
if notification['event_type'] == event_type]
|
if notification['event_type'] == event_type]
|
||||||
|
|
||||||
|
def _wait_for_notification(self, event_type, timeout=1.0):
|
||||||
|
received = fake_notifier.wait_for_versioned_notification(
|
||||||
|
event_type, timeout)
|
||||||
|
self.assertTrue(
|
||||||
|
received,
|
||||||
|
'notification %s hasn\'t been received' % event_type)
|
||||||
|
@ -14,12 +14,15 @@
|
|||||||
|
|
||||||
import collections
|
import collections
|
||||||
import functools
|
import functools
|
||||||
|
import threading
|
||||||
|
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
from nova import rpc
|
from nova import rpc
|
||||||
|
|
||||||
|
|
||||||
|
SUBSCRIBERS = collections.defaultdict(threading.Event)
|
||||||
NOTIFICATIONS = []
|
NOTIFICATIONS = []
|
||||||
VERSIONED_NOTIFICATIONS = []
|
VERSIONED_NOTIFICATIONS = []
|
||||||
|
|
||||||
@ -27,6 +30,7 @@ VERSIONED_NOTIFICATIONS = []
|
|||||||
def reset():
|
def reset():
|
||||||
del NOTIFICATIONS[:]
|
del NOTIFICATIONS[:]
|
||||||
del VERSIONED_NOTIFICATIONS[:]
|
del VERSIONED_NOTIFICATIONS[:]
|
||||||
|
SUBSCRIBERS.clear()
|
||||||
|
|
||||||
|
|
||||||
FakeMessage = collections.namedtuple('Message',
|
FakeMessage = collections.namedtuple('Message',
|
||||||
@ -69,10 +73,12 @@ class FakeNotifier(object):
|
|||||||
class FakeVersionedNotifier(FakeNotifier):
|
class FakeVersionedNotifier(FakeNotifier):
|
||||||
def _notify(self, priority, ctxt, event_type, payload):
|
def _notify(self, priority, ctxt, event_type, payload):
|
||||||
payload = self._serializer.serialize_entity(ctxt, payload)
|
payload = self._serializer.serialize_entity(ctxt, payload)
|
||||||
VERSIONED_NOTIFICATIONS.append({'publisher_id': self.publisher_id,
|
notification = {'publisher_id': self.publisher_id,
|
||||||
'priority': priority,
|
'priority': priority,
|
||||||
'event_type': event_type,
|
'event_type': event_type,
|
||||||
'payload': payload})
|
'payload': payload}
|
||||||
|
VERSIONED_NOTIFICATIONS.append(notification)
|
||||||
|
_notify_subscribers(notification)
|
||||||
|
|
||||||
|
|
||||||
def stub_notifier(test):
|
def stub_notifier(test):
|
||||||
@ -90,3 +96,16 @@ def stub_notifier(test):
|
|||||||
serializer=getattr(rpc.NOTIFIER,
|
serializer=getattr(rpc.NOTIFIER,
|
||||||
'_serializer',
|
'_serializer',
|
||||||
None)))
|
None)))
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_versioned_notification(event_type, timeout=1.0):
|
||||||
|
# NOTE: The event stored in SUBSCRIBERS is not used for synchronizing
|
||||||
|
# the access to shared state as there is no parallel access to
|
||||||
|
# SUBSCRIBERS because the only parallelism is due to eventlet.
|
||||||
|
# The event is simply used to avoid polling the list of received
|
||||||
|
# notifications
|
||||||
|
return SUBSCRIBERS[event_type].wait(timeout)
|
||||||
|
|
||||||
|
|
||||||
|
def _notify_subscribers(notification):
|
||||||
|
SUBSCRIBERS[notification['event_type']].set()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user