diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index e1aa510b8..3c1e40490 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -14,6 +14,7 @@ # under the License. import threading +import time import mock import testscenarios @@ -26,53 +27,55 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -class RestartableListenerThread(object): - def __init__(self, listener): - self.listener = listener +class RestartableServerThread(object): + def __init__(self, server): + self.server = server self.thread = None def start(self): if self.thread is None: - self.thread = threading.Thread(target=self.listener.start) + self.thread = threading.Thread(target=self.server.start) self.thread.daemon = True self.thread.start() def stop(self): if self.thread is not None: - self.listener.stop() - self.listener.wait() - self.thread.join() + # Check start() does nothing with a running listener + self.server.start() + self.server.stop() + self.server.wait() + self.thread.join(timeout=15) + ret = self.thread.isAlive() self.thread = None - - def wait_end(self): - self.thread.join(timeout=15) - return self.thread.isAlive() + return ret + return True class ListenerSetupMixin(object): - class ListenerTracker(object): - def __init__(self, expect_messages): - self._expect_messages = expect_messages + class ThreadTracker(object): + def __init__(self): self._received_msgs = 0 - self.listeners = [] + self.threads = [] + self.lock = threading.Lock() def info(self, ctxt, publisher_id, event_type, payload, metadata): - self._received_msgs += 1 - if self._expect_messages == self._received_msgs: - self.stop() + # NOTE(sileht): this run into an other thread + with self.lock: + self._received_msgs += 1 - def wait_for(self, expect_messages): - while expect_messages != self._received_msgs: - yield + def wait_for_messages(self, expect_messages): + while self._received_msgs < expect_messages: + time.sleep(0.01) def stop(self): - for listener in self.listeners: - # Check start() does nothing with a running listener - listener.start() - listener.stop() - listener.wait() - self.listeners = [] + for thread in self.threads: + thread.stop() + self.threads = [] + + def start(self, thread): + self.threads.append(thread) + thread.start() def setUp(self): self.trackers = {} @@ -83,7 +86,7 @@ class ListenerSetupMixin(object): self.trackers[pool].stop() self.trackers = {} - def _setup_listener(self, transport, endpoints, expect_messages, + def _setup_listener(self, transport, endpoints, targets=None, pool=None): if pool is None: @@ -95,16 +98,18 @@ class ListenerSetupMixin(object): targets = [oslo_messaging.Target(topic='testtopic')] tracker = self.trackers.setdefault( - tracker_name, self.ListenerTracker(expect_messages)) + tracker_name, self.ThreadTracker()) listener = oslo_messaging.get_notification_listener( transport, targets=targets, endpoints=[tracker] + endpoints, allow_requeue=True, pool=pool) - tracker.listeners.append(listener) - thread = RestartableListenerThread(listener) - thread.start() + thread = RestartableServerThread(listener) + tracker.start(thread) return thread + def wait_for_messages(self, expect_messages, tracker_name='__default__'): + self.trackers[tracker_name].wait_for_messages(expect_messages) + def _setup_notifier(self, transport, topic='testtopic', publisher_id='testpublisher'): return oslo_messaging.Notifier(transport, topic=topic, @@ -168,12 +173,13 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint = mock.Mock() endpoint.info.return_value = None - listener_thread = self._setup_listener(transport, [endpoint], 1) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test message') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test message', @@ -186,14 +192,15 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint.info.return_value = None targets = [oslo_messaging.Target(topic="topic1"), oslo_messaging.Target(topic="topic2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic='topic1') notifier.info({'ctxt': '1'}, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topic='topic2') notifier.info({'ctxt': '2'}, 'an_event.start2', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', @@ -213,7 +220,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): exchange="exchange1"), oslo_messaging.Target(topic="topic", exchange="exchange2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic="topic") @@ -236,7 +243,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): notifier.info({'ctxt': '2'}, 'an_event.start', 'test message exchange2') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start', @@ -255,11 +263,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2 = mock.Mock() endpoint2.info.return_value = oslo_messaging.NotificationResult.HANDLED listener_thread = self._setup_listener(transport, - [endpoint1, endpoint2], 1) + [endpoint1, endpoint2]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint1.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test', { @@ -282,12 +291,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): return oslo_messaging.NotificationResult.HANDLED endpoint.info.side_effect = side_effect_requeue - listener_thread = self._setup_listener(transport, - [endpoint], 2) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({}, 'testpublisher', 'an_event.start', 'test', @@ -304,17 +313,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2.info.return_value = None targets = [oslo_messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 2, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 2, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") notifier = self._setup_notifier(transport, topic="topic") notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0') notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1') - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(2, "pool1") + self.wait_for_messages(2, "pool2") + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) def mocked_endpoint_call(i): return mock.call({'ctxt': '%d' % i}, 'testpublisher', @@ -337,11 +348,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint3.info.return_value = None targets = [oslo_messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 100, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 100, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") - listener3_thread = self._setup_listener(transport, [endpoint3], 100, + listener3_thread = self._setup_listener(transport, [endpoint3], targets=targets, pool="pool2") def mocked_endpoint_call(i): @@ -356,7 +367,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(25) + self.wait_for_messages(25, 'pool2') listener2_thread.stop() for i in range(0, 25): @@ -364,7 +375,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(50) + self.wait_for_messages(50, 'pool2') listener2_thread.start() listener3_thread.stop() @@ -373,7 +384,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(75) + self.wait_for_messages(75, 'pool2') listener3_thread.start() for i in range(0, 25): @@ -381,9 +392,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.assertFalse(listener3_thread.wait_end()) - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(100, 'pool1') + self.wait_for_messages(100, 'pool2') + + self.assertFalse(listener3_thread.stop()) + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) self.assertEqual(100, endpoint1.info.call_count) endpoint1.info.assert_has_calls(mocked_endpoint1_calls) diff --git a/tests/notify/test_listener.py b/tests/notify/test_listener.py index 9317eaeff..984d186a0 100644 --- a/tests/notify/test_listener.py +++ b/tests/notify/test_listener.py @@ -14,6 +14,7 @@ # under the License. import threading +import time import mock import testscenarios @@ -26,55 +27,55 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -class RestartableListenerThread(object): - def __init__(self, listener): - self.listener = listener +class RestartableServerThread(object): + def __init__(self, server): + self.server = server self.thread = None def start(self): if self.thread is None: - self.thread = threading.Thread(target=self.listener.start) + self.thread = threading.Thread(target=self.server.start) self.thread.daemon = True self.thread.start() def stop(self): if self.thread is not None: - self.listener.stop() - self.listener.wait() - self.thread.join() + # Check start() does nothing with a running listener + self.server.start() + self.server.stop() + self.server.wait() + self.thread.join(timeout=15) + ret = self.thread.isAlive() self.thread = None - - def wait_end(self): - self.thread.join(timeout=15) - return self.thread.isAlive() + return ret + return True class ListenerSetupMixin(object): - class ListenerTracker(object): - def __init__(self, expect_messages): - self._expect_messages = expect_messages + class ThreadTracker(object): + def __init__(self): self._received_msgs = 0 - self.listeners = [] + self.threads = [] + self.lock = threading.Lock() def info(self, ctxt, publisher_id, event_type, payload, metadata): - self._received_msgs += 1 - if self._expect_messages == self._received_msgs: - self.stop() + # NOTE(sileht): this run into an other thread + with self.lock: + self._received_msgs += 1 - def wait_for(self, expect_messages): - print('expecting %d messages have %d' % - (expect_messages, self._received_msgs)) - while expect_messages != self._received_msgs: - yield + def wait_for_messages(self, expect_messages): + while self._received_msgs < expect_messages: + time.sleep(0.01) def stop(self): - for listener in self.listeners: - # Check start() does nothing with a running listener - listener.start() - listener.stop() - listener.wait() - self.listeners = [] + for thread in self.threads: + thread.stop() + self.threads = [] + + def start(self, thread): + self.threads.append(thread) + thread.start() def setUp(self): self.trackers = {} @@ -85,7 +86,7 @@ class ListenerSetupMixin(object): self.trackers[pool].stop() self.trackers = {} - def _setup_listener(self, transport, endpoints, expect_messages, + def _setup_listener(self, transport, endpoints, targets=None, pool=None): if pool is None: @@ -97,16 +98,18 @@ class ListenerSetupMixin(object): targets = [messaging.Target(topic='testtopic')] tracker = self.trackers.setdefault( - tracker_name, self.ListenerTracker(expect_messages)) + tracker_name, self.ThreadTracker()) listener = messaging.get_notification_listener( transport, targets=targets, endpoints=[tracker] + endpoints, allow_requeue=True, pool=pool) - tracker.listeners.append(listener) - thread = RestartableListenerThread(listener) - thread.start() + thread = RestartableServerThread(listener) + tracker.start(thread) return thread + def wait_for_messages(self, expect_messages, tracker_name='__default__'): + self.trackers[tracker_name].wait_for_messages(expect_messages) + def _setup_notifier(self, transport, topic='testtopic', publisher_id='testpublisher'): return messaging.Notifier(transport, topic=topic, @@ -169,12 +172,13 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint = mock.Mock() endpoint.info.return_value = None - listener_thread = self._setup_listener(transport, [endpoint], 1) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test message') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test message', @@ -187,14 +191,15 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint.info.return_value = None targets = [messaging.Target(topic="topic1"), messaging.Target(topic="topic2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic='topic1') notifier.info({'ctxt': '1'}, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topic='topic2') notifier.info({'ctxt': '2'}, 'an_event.start2', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', @@ -214,7 +219,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): exchange="exchange1"), messaging.Target(topic="topic", exchange="exchange2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic="topic") @@ -237,7 +242,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): notifier.info({'ctxt': '2'}, 'an_event.start', 'test message exchange2') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start', @@ -256,11 +262,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2 = mock.Mock() endpoint2.info.return_value = messaging.NotificationResult.HANDLED listener_thread = self._setup_listener(transport, - [endpoint1, endpoint2], 1) + [endpoint1, endpoint2]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint1.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test', { @@ -283,12 +290,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): return messaging.NotificationResult.HANDLED endpoint.info.side_effect = side_effect_requeue - listener_thread = self._setup_listener(transport, - [endpoint], 2) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({}, 'testpublisher', 'an_event.start', 'test', @@ -305,17 +312,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2.info.return_value = None targets = [messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 2, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 2, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") notifier = self._setup_notifier(transport, topic="topic") notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0') notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1') - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(2, "pool1") + self.wait_for_messages(2, "pool2") + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) def mocked_endpoint_call(i): return mock.call({'ctxt': '%d' % i}, 'testpublisher', @@ -338,11 +347,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint3.info.return_value = None targets = [messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 100, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 100, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") - listener3_thread = self._setup_listener(transport, [endpoint3], 100, + listener3_thread = self._setup_listener(transport, [endpoint3], targets=targets, pool="pool2") def mocked_endpoint_call(i): @@ -357,7 +366,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(25) + self.wait_for_messages(25, 'pool2') listener2_thread.stop() for i in range(0, 25): @@ -365,7 +374,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(50) + self.wait_for_messages(50, 'pool2') listener2_thread.start() listener3_thread.stop() @@ -374,7 +383,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(75) + self.wait_for_messages(75, 'pool2') listener3_thread.start() for i in range(0, 25): @@ -382,9 +391,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.assertFalse(listener3_thread.wait_end()) - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(100, 'pool1') + self.wait_for_messages(100, 'pool2') + + self.assertFalse(listener3_thread.stop()) + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) self.assertEqual(100, endpoint1.info.call_count) endpoint1.info.assert_has_calls(mocked_endpoint1_calls)