diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 7ae4ed4e57..a3fbffcd3f 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -222,7 +222,8 @@ class NotificationService(cotyledon.Service): self.hashring = self.partition_coordinator.join_partitioned_group( self.NOTIFICATION_NAMESPACE) - @periodics.periodic(spacing=self.conf.coordination.check_watchers) + @periodics.periodic(spacing=self.conf.coordination.check_watchers, + run_immediately=True) def run_watchers(): self.partition_coordinator.run_watchers() if self.group_state != self.hashring.ring.nodes: @@ -234,7 +235,6 @@ class NotificationService(cotyledon.Service): futures.ThreadPoolExecutor(max_workers=10)) self.periodic.add(run_watchers) utils.spawn_thread(self.periodic.start) - self._refresh_agent() def _configure_main_queue_listeners(self, pipe_manager, event_pipe_manager): diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index 93a0127a33..9634304db6 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -82,6 +82,20 @@ TEST_NOTICE_PAYLOAD = { } +class BaseNotificationTest(tests_base.BaseTestCase): + def run_service(self, srv): + srv.run() + self.addCleanup(srv.terminate) + if srv.conf.notification.workload_partitioning: + start = time.time() + while time.time() - start < 10: + if srv.group_state: # ensure pipeline is set if HA + break + time.sleep(0.1) + else: + self.fail('Did not start pipeline queues') + + class _FakeNotificationPlugin(pipeline.NotificationEndpoint): event_types = ['fake.event'] @@ -89,7 +103,7 @@ class _FakeNotificationPlugin(pipeline.NotificationEndpoint): return [] -class TestNotification(tests_base.BaseTestCase): +class TestNotification(BaseNotificationTest): def setUp(self): super(TestNotification, self).setUp() @@ -118,8 +132,7 @@ class TestNotification(tests_base.BaseTestCase): with mock.patch.object(self.srv, '_get_notifications_manager') as get_nm: get_nm.side_effect = self.fake_get_notifications_manager - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) self.fake_event_endpoint = fake_event_endpoint_class.return_value def test_start_multiple_listeners(self): @@ -150,14 +163,13 @@ class TestNotification(tests_base.BaseTestCase): def test_unique_consumers(self, mock_listener): self.CONF.set_override('notification_control_exchanges', ['dup'] * 2, group='notification') - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) # 1 target, 1 listener self.assertEqual(1, len(mock_listener.call_args_list[0][0][1])) self.assertEqual(1, len(self.srv.listeners)) -class BaseRealNotification(tests_base.BaseTestCase): +class BaseRealNotification(BaseNotificationTest): def setup_pipeline(self, counter_names): pipeline = yaml.dump({ 'sources': [{ @@ -219,9 +231,7 @@ class BaseRealNotification(tests_base.BaseTestCase): self.publisher = test_publisher.TestPublisher(self.CONF, "") def _check_notification_service(self): - self.srv.run() - self.addCleanup(self.srv.terminate) - + self.run_service(self.srv) notifier = messaging.get_notifier(self.transport, "compute.vagrant-precise") notifier.info({}, 'compute.instance.create.end', @@ -252,8 +262,7 @@ class TestRealNotification(BaseRealNotification): @mock.patch('ceilometer.publisher.test.TestPublisher') def test_notification_service_error_topic(self, fake_publisher_cls): fake_publisher_cls.return_value = self.publisher - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) notifier = messaging.get_notifier(self.transport, 'compute.vagrant-precise') notifier.error({}, 'compute.instance.error', @@ -300,10 +309,7 @@ class TestRealNotificationHA(BaseRealNotification): mock.MagicMock(), # pipeline listener mock.MagicMock(), # refresh pipeline listener ] - - self.srv.run() - self.addCleanup(self.srv.terminate) - + self.run_service(self.srv) listener = self.srv.pipeline_listener self.srv._refresh_agent() self.assertIsNot(listener, self.srv.pipeline_listener) @@ -320,8 +326,7 @@ class TestRealNotificationHA(BaseRealNotification): hashring.belongs_to_self = _once_over_five self.srv.partition_coordinator = pc = mock.MagicMock() pc.join_partitioned_group.return_value = hashring - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) topics = [target.topic for target in mock_listener.call_args[0][1]] self.assertEqual(4, len(topics)) self.assertEqual( @@ -333,8 +338,7 @@ class TestRealNotificationHA(BaseRealNotification): @mock.patch('oslo_messaging.get_batch_notification_listener') def test_notify_to_relevant_endpoint(self, mock_listener): - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) targets = mock_listener.call_args[0][1] self.assertIsNotEmpty(targets) @@ -360,8 +364,7 @@ class TestRealNotificationHA(BaseRealNotification): @mock.patch('oslo_messaging.Notifier.sample') def test_broadcast_to_relevant_pipes_only(self, mock_notifier): - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) for endpoint in self.srv.listeners[0].dispatcher.endpoints: if (hasattr(endpoint, 'filter_rule') and not endpoint.filter_rule.match(None, None, 'nonmatching.end', @@ -397,7 +400,7 @@ class TestRealNotificationHA(BaseRealNotification): mock_notifier.call_args_list[2][1]['event_type']) -class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): +class TestRealNotificationMultipleAgents(BaseNotificationTest): def setup_pipeline(self, transformers): pipeline = yaml.dump({ 'sources': [{ @@ -466,8 +469,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): hashring_srv1.belongs_to_self = _sometimes_srv hashring_srv1.ring.nodes = {'id1': mock.Mock()} pc.join_partitioned_group.return_value = hashring_srv1 - self.srv.run() - self.addCleanup(self.srv.terminate) + self.run_service(self.srv) def _sometimes_srv2(item): maybe["srv2"] += 1 @@ -480,8 +482,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()} self.srv.hashring.ring.nodes = hashring.ring.nodes.copy() pc.join_partitioned_group.return_value = hashring - self.srv2.run() - self.addCleanup(self.srv2.terminate) + self.run_service(self.srv2) notifier = messaging.get_notifier(self.transport, "compute.vagrant-precise")