From b888ee3ebf46b008dada1e825b24badb7053d7e8 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Thu, 8 Jan 2015 14:26:45 +0100 Subject: [PATCH] Fixes test_two_pools_three_listener The Notification Listener Tracker class is not threadsafe, so when a test stop an already stopped listener this one can be restarted, due to concurrency access of the threads list and concurrency execution of the start/stop/wait method of the notification listener. This result of a lockup of the test or a listener can continue to got unepxected message. This change fixes that by never stop the tread with the tracker callback but only manually with the test. This test also rename some 'Listener' to 'Server', to not mismatch the driver listener from the notification listener. Closes-bug: #1410902 Change-Id: I4777c7dd0ba71c61850d36641e85f33f9461e9c1 --- oslo_messaging/tests/notify/test_listener.py | 126 ++++++++++-------- tests/notify/test_listener.py | 128 ++++++++++--------- 2 files changed, 140 insertions(+), 114 deletions(-) diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index e1aa510b8..3c1e40490 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -14,6 +14,7 @@ # under the License. import threading +import time import mock import testscenarios @@ -26,53 +27,55 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -class RestartableListenerThread(object): - def __init__(self, listener): - self.listener = listener +class RestartableServerThread(object): + def __init__(self, server): + self.server = server self.thread = None def start(self): if self.thread is None: - self.thread = threading.Thread(target=self.listener.start) + self.thread = threading.Thread(target=self.server.start) self.thread.daemon = True self.thread.start() def stop(self): if self.thread is not None: - self.listener.stop() - self.listener.wait() - self.thread.join() + # Check start() does nothing with a running listener + self.server.start() + self.server.stop() + self.server.wait() + self.thread.join(timeout=15) + ret = self.thread.isAlive() self.thread = None - - def wait_end(self): - self.thread.join(timeout=15) - return self.thread.isAlive() + return ret + return True class ListenerSetupMixin(object): - class ListenerTracker(object): - def __init__(self, expect_messages): - self._expect_messages = expect_messages + class ThreadTracker(object): + def __init__(self): self._received_msgs = 0 - self.listeners = [] + self.threads = [] + self.lock = threading.Lock() def info(self, ctxt, publisher_id, event_type, payload, metadata): - self._received_msgs += 1 - if self._expect_messages == self._received_msgs: - self.stop() + # NOTE(sileht): this run into an other thread + with self.lock: + self._received_msgs += 1 - def wait_for(self, expect_messages): - while expect_messages != self._received_msgs: - yield + def wait_for_messages(self, expect_messages): + while self._received_msgs < expect_messages: + time.sleep(0.01) def stop(self): - for listener in self.listeners: - # Check start() does nothing with a running listener - listener.start() - listener.stop() - listener.wait() - self.listeners = [] + for thread in self.threads: + thread.stop() + self.threads = [] + + def start(self, thread): + self.threads.append(thread) + thread.start() def setUp(self): self.trackers = {} @@ -83,7 +86,7 @@ class ListenerSetupMixin(object): self.trackers[pool].stop() self.trackers = {} - def _setup_listener(self, transport, endpoints, expect_messages, + def _setup_listener(self, transport, endpoints, targets=None, pool=None): if pool is None: @@ -95,16 +98,18 @@ class ListenerSetupMixin(object): targets = [oslo_messaging.Target(topic='testtopic')] tracker = self.trackers.setdefault( - tracker_name, self.ListenerTracker(expect_messages)) + tracker_name, self.ThreadTracker()) listener = oslo_messaging.get_notification_listener( transport, targets=targets, endpoints=[tracker] + endpoints, allow_requeue=True, pool=pool) - tracker.listeners.append(listener) - thread = RestartableListenerThread(listener) - thread.start() + thread = RestartableServerThread(listener) + tracker.start(thread) return thread + def wait_for_messages(self, expect_messages, tracker_name='__default__'): + self.trackers[tracker_name].wait_for_messages(expect_messages) + def _setup_notifier(self, transport, topic='testtopic', publisher_id='testpublisher'): return oslo_messaging.Notifier(transport, topic=topic, @@ -168,12 +173,13 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint = mock.Mock() endpoint.info.return_value = None - listener_thread = self._setup_listener(transport, [endpoint], 1) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test message') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test message', @@ -186,14 +192,15 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint.info.return_value = None targets = [oslo_messaging.Target(topic="topic1"), oslo_messaging.Target(topic="topic2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic='topic1') notifier.info({'ctxt': '1'}, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topic='topic2') notifier.info({'ctxt': '2'}, 'an_event.start2', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', @@ -213,7 +220,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): exchange="exchange1"), oslo_messaging.Target(topic="topic", exchange="exchange2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic="topic") @@ -236,7 +243,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): notifier.info({'ctxt': '2'}, 'an_event.start', 'test message exchange2') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start', @@ -255,11 +263,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2 = mock.Mock() endpoint2.info.return_value = oslo_messaging.NotificationResult.HANDLED listener_thread = self._setup_listener(transport, - [endpoint1, endpoint2], 1) + [endpoint1, endpoint2]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint1.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test', { @@ -282,12 +291,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): return oslo_messaging.NotificationResult.HANDLED endpoint.info.side_effect = side_effect_requeue - listener_thread = self._setup_listener(transport, - [endpoint], 2) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({}, 'testpublisher', 'an_event.start', 'test', @@ -304,17 +313,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2.info.return_value = None targets = [oslo_messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 2, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 2, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") notifier = self._setup_notifier(transport, topic="topic") notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0') notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1') - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(2, "pool1") + self.wait_for_messages(2, "pool2") + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) def mocked_endpoint_call(i): return mock.call({'ctxt': '%d' % i}, 'testpublisher', @@ -337,11 +348,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint3.info.return_value = None targets = [oslo_messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 100, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 100, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") - listener3_thread = self._setup_listener(transport, [endpoint3], 100, + listener3_thread = self._setup_listener(transport, [endpoint3], targets=targets, pool="pool2") def mocked_endpoint_call(i): @@ -356,7 +367,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(25) + self.wait_for_messages(25, 'pool2') listener2_thread.stop() for i in range(0, 25): @@ -364,7 +375,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(50) + self.wait_for_messages(50, 'pool2') listener2_thread.start() listener3_thread.stop() @@ -373,7 +384,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(75) + self.wait_for_messages(75, 'pool2') listener3_thread.start() for i in range(0, 25): @@ -381,9 +392,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.assertFalse(listener3_thread.wait_end()) - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(100, 'pool1') + self.wait_for_messages(100, 'pool2') + + self.assertFalse(listener3_thread.stop()) + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) self.assertEqual(100, endpoint1.info.call_count) endpoint1.info.assert_has_calls(mocked_endpoint1_calls) diff --git a/tests/notify/test_listener.py b/tests/notify/test_listener.py index 9317eaeff..984d186a0 100644 --- a/tests/notify/test_listener.py +++ b/tests/notify/test_listener.py @@ -14,6 +14,7 @@ # under the License. import threading +import time import mock import testscenarios @@ -26,55 +27,55 @@ from oslo_messaging.tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios -class RestartableListenerThread(object): - def __init__(self, listener): - self.listener = listener +class RestartableServerThread(object): + def __init__(self, server): + self.server = server self.thread = None def start(self): if self.thread is None: - self.thread = threading.Thread(target=self.listener.start) + self.thread = threading.Thread(target=self.server.start) self.thread.daemon = True self.thread.start() def stop(self): if self.thread is not None: - self.listener.stop() - self.listener.wait() - self.thread.join() + # Check start() does nothing with a running listener + self.server.start() + self.server.stop() + self.server.wait() + self.thread.join(timeout=15) + ret = self.thread.isAlive() self.thread = None - - def wait_end(self): - self.thread.join(timeout=15) - return self.thread.isAlive() + return ret + return True class ListenerSetupMixin(object): - class ListenerTracker(object): - def __init__(self, expect_messages): - self._expect_messages = expect_messages + class ThreadTracker(object): + def __init__(self): self._received_msgs = 0 - self.listeners = [] + self.threads = [] + self.lock = threading.Lock() def info(self, ctxt, publisher_id, event_type, payload, metadata): - self._received_msgs += 1 - if self._expect_messages == self._received_msgs: - self.stop() + # NOTE(sileht): this run into an other thread + with self.lock: + self._received_msgs += 1 - def wait_for(self, expect_messages): - print('expecting %d messages have %d' % - (expect_messages, self._received_msgs)) - while expect_messages != self._received_msgs: - yield + def wait_for_messages(self, expect_messages): + while self._received_msgs < expect_messages: + time.sleep(0.01) def stop(self): - for listener in self.listeners: - # Check start() does nothing with a running listener - listener.start() - listener.stop() - listener.wait() - self.listeners = [] + for thread in self.threads: + thread.stop() + self.threads = [] + + def start(self, thread): + self.threads.append(thread) + thread.start() def setUp(self): self.trackers = {} @@ -85,7 +86,7 @@ class ListenerSetupMixin(object): self.trackers[pool].stop() self.trackers = {} - def _setup_listener(self, transport, endpoints, expect_messages, + def _setup_listener(self, transport, endpoints, targets=None, pool=None): if pool is None: @@ -97,16 +98,18 @@ class ListenerSetupMixin(object): targets = [messaging.Target(topic='testtopic')] tracker = self.trackers.setdefault( - tracker_name, self.ListenerTracker(expect_messages)) + tracker_name, self.ThreadTracker()) listener = messaging.get_notification_listener( transport, targets=targets, endpoints=[tracker] + endpoints, allow_requeue=True, pool=pool) - tracker.listeners.append(listener) - thread = RestartableListenerThread(listener) - thread.start() + thread = RestartableServerThread(listener) + tracker.start(thread) return thread + def wait_for_messages(self, expect_messages, tracker_name='__default__'): + self.trackers[tracker_name].wait_for_messages(expect_messages) + def _setup_notifier(self, transport, topic='testtopic', publisher_id='testpublisher'): return messaging.Notifier(transport, topic=topic, @@ -169,12 +172,13 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint = mock.Mock() endpoint.info.return_value = None - listener_thread = self._setup_listener(transport, [endpoint], 1) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test message') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test message', @@ -187,14 +191,15 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint.info.return_value = None targets = [messaging.Target(topic="topic1"), messaging.Target(topic="topic2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic='topic1') notifier.info({'ctxt': '1'}, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topic='topic2') notifier.info({'ctxt': '2'}, 'an_event.start2', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', @@ -214,7 +219,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): exchange="exchange1"), messaging.Target(topic="topic", exchange="exchange2")] - listener_thread = self._setup_listener(transport, [endpoint], 2, + listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topic="topic") @@ -237,7 +242,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): notifier.info({'ctxt': '2'}, 'an_event.start', 'test message exchange2') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start', @@ -256,11 +262,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2 = mock.Mock() endpoint2.info.return_value = messaging.NotificationResult.HANDLED listener_thread = self._setup_listener(transport, - [endpoint1, endpoint2], 1) + [endpoint1, endpoint2]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(1) + self.assertFalse(listener_thread.stop()) endpoint1.info.assert_called_once_with( {}, 'testpublisher', 'an_event.start', 'test', { @@ -283,12 +290,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): return messaging.NotificationResult.HANDLED endpoint.info.side_effect = side_effect_requeue - listener_thread = self._setup_listener(transport, - [endpoint], 2) + listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) notifier.info({}, 'an_event.start', 'test') - self.assertFalse(listener_thread.wait_end()) + self.wait_for_messages(2) + self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ mock.call({}, 'testpublisher', 'an_event.start', 'test', @@ -305,17 +312,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint2.info.return_value = None targets = [messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 2, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 2, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") notifier = self._setup_notifier(transport, topic="topic") notifier.info({'ctxt': '0'}, 'an_event.start', 'test message0') notifier.info({'ctxt': '1'}, 'an_event.start', 'test message1') - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(2, "pool1") + self.wait_for_messages(2, "pool2") + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) def mocked_endpoint_call(i): return mock.call({'ctxt': '%d' % i}, 'testpublisher', @@ -338,11 +347,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint3.info.return_value = None targets = [messaging.Target(topic="topic")] - listener1_thread = self._setup_listener(transport, [endpoint1], 100, + listener1_thread = self._setup_listener(transport, [endpoint1], targets=targets, pool="pool1") - listener2_thread = self._setup_listener(transport, [endpoint2], 100, + listener2_thread = self._setup_listener(transport, [endpoint2], targets=targets, pool="pool2") - listener3_thread = self._setup_listener(transport, [endpoint3], 100, + listener3_thread = self._setup_listener(transport, [endpoint3], targets=targets, pool="pool2") def mocked_endpoint_call(i): @@ -357,7 +366,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(25) + self.wait_for_messages(25, 'pool2') listener2_thread.stop() for i in range(0, 25): @@ -365,7 +374,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(50) + self.wait_for_messages(50, 'pool2') listener2_thread.start() listener3_thread.stop() @@ -374,7 +383,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.trackers['pool2'].wait_for(75) + self.wait_for_messages(75, 'pool2') listener3_thread.start() for i in range(0, 25): @@ -382,9 +391,12 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): 'test message%d' % i) mocked_endpoint1_calls.append(mocked_endpoint_call(i)) - self.assertFalse(listener3_thread.wait_end()) - self.assertFalse(listener2_thread.wait_end()) - self.assertFalse(listener1_thread.wait_end()) + self.wait_for_messages(100, 'pool1') + self.wait_for_messages(100, 'pool2') + + self.assertFalse(listener3_thread.stop()) + self.assertFalse(listener2_thread.stop()) + self.assertFalse(listener1_thread.stop()) self.assertEqual(100, endpoint1.info.call_count) endpoint1.info.assert_has_calls(mocked_endpoint1_calls)