stop double refreshing on start
just let the periodic job decide if it needs to refresh. Change-Id: I300967d926ea4b8b415aac4744fc7bd183b4cca4 Closes-Bug: #1730849
This commit is contained in:
parent
6a97e62c54
commit
a638ceb8c4
|
@ -222,7 +222,8 @@ class NotificationService(cotyledon.Service):
|
||||||
self.hashring = self.partition_coordinator.join_partitioned_group(
|
self.hashring = self.partition_coordinator.join_partitioned_group(
|
||||||
self.NOTIFICATION_NAMESPACE)
|
self.NOTIFICATION_NAMESPACE)
|
||||||
|
|
||||||
@periodics.periodic(spacing=self.conf.coordination.check_watchers)
|
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
|
||||||
|
run_immediately=True)
|
||||||
def run_watchers():
|
def run_watchers():
|
||||||
self.partition_coordinator.run_watchers()
|
self.partition_coordinator.run_watchers()
|
||||||
if self.group_state != self.hashring.ring.nodes:
|
if self.group_state != self.hashring.ring.nodes:
|
||||||
|
@ -234,7 +235,6 @@ class NotificationService(cotyledon.Service):
|
||||||
futures.ThreadPoolExecutor(max_workers=10))
|
futures.ThreadPoolExecutor(max_workers=10))
|
||||||
self.periodic.add(run_watchers)
|
self.periodic.add(run_watchers)
|
||||||
utils.spawn_thread(self.periodic.start)
|
utils.spawn_thread(self.periodic.start)
|
||||||
self._refresh_agent()
|
|
||||||
|
|
||||||
def _configure_main_queue_listeners(self, pipe_manager,
|
def _configure_main_queue_listeners(self, pipe_manager,
|
||||||
event_pipe_manager):
|
event_pipe_manager):
|
||||||
|
|
|
@ -82,6 +82,20 @@ TEST_NOTICE_PAYLOAD = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class BaseNotificationTest(tests_base.BaseTestCase):
|
||||||
|
def run_service(self, srv):
|
||||||
|
srv.run()
|
||||||
|
self.addCleanup(srv.terminate)
|
||||||
|
if srv.conf.notification.workload_partitioning:
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < 10:
|
||||||
|
if srv.group_state: # ensure pipeline is set if HA
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
else:
|
||||||
|
self.fail('Did not start pipeline queues')
|
||||||
|
|
||||||
|
|
||||||
class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
|
class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
|
||||||
event_types = ['fake.event']
|
event_types = ['fake.event']
|
||||||
|
|
||||||
|
@ -89,7 +103,7 @@ class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
class TestNotification(tests_base.BaseTestCase):
|
class TestNotification(BaseNotificationTest):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestNotification, self).setUp()
|
super(TestNotification, self).setUp()
|
||||||
|
@ -118,8 +132,7 @@ class TestNotification(tests_base.BaseTestCase):
|
||||||
with mock.patch.object(self.srv,
|
with mock.patch.object(self.srv,
|
||||||
'_get_notifications_manager') as get_nm:
|
'_get_notifications_manager') as get_nm:
|
||||||
get_nm.side_effect = self.fake_get_notifications_manager
|
get_nm.side_effect = self.fake_get_notifications_manager
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
self.fake_event_endpoint = fake_event_endpoint_class.return_value
|
self.fake_event_endpoint = fake_event_endpoint_class.return_value
|
||||||
|
|
||||||
def test_start_multiple_listeners(self):
|
def test_start_multiple_listeners(self):
|
||||||
|
@ -150,14 +163,13 @@ class TestNotification(tests_base.BaseTestCase):
|
||||||
def test_unique_consumers(self, mock_listener):
|
def test_unique_consumers(self, mock_listener):
|
||||||
self.CONF.set_override('notification_control_exchanges', ['dup'] * 2,
|
self.CONF.set_override('notification_control_exchanges', ['dup'] * 2,
|
||||||
group='notification')
|
group='notification')
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
# 1 target, 1 listener
|
# 1 target, 1 listener
|
||||||
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
|
self.assertEqual(1, len(mock_listener.call_args_list[0][0][1]))
|
||||||
self.assertEqual(1, len(self.srv.listeners))
|
self.assertEqual(1, len(self.srv.listeners))
|
||||||
|
|
||||||
|
|
||||||
class BaseRealNotification(tests_base.BaseTestCase):
|
class BaseRealNotification(BaseNotificationTest):
|
||||||
def setup_pipeline(self, counter_names):
|
def setup_pipeline(self, counter_names):
|
||||||
pipeline = yaml.dump({
|
pipeline = yaml.dump({
|
||||||
'sources': [{
|
'sources': [{
|
||||||
|
@ -219,9 +231,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
|
||||||
self.publisher = test_publisher.TestPublisher(self.CONF, "")
|
self.publisher = test_publisher.TestPublisher(self.CONF, "")
|
||||||
|
|
||||||
def _check_notification_service(self):
|
def _check_notification_service(self):
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
|
|
||||||
notifier = messaging.get_notifier(self.transport,
|
notifier = messaging.get_notifier(self.transport,
|
||||||
"compute.vagrant-precise")
|
"compute.vagrant-precise")
|
||||||
notifier.info({}, 'compute.instance.create.end',
|
notifier.info({}, 'compute.instance.create.end',
|
||||||
|
@ -252,8 +262,7 @@ class TestRealNotification(BaseRealNotification):
|
||||||
@mock.patch('ceilometer.publisher.test.TestPublisher')
|
@mock.patch('ceilometer.publisher.test.TestPublisher')
|
||||||
def test_notification_service_error_topic(self, fake_publisher_cls):
|
def test_notification_service_error_topic(self, fake_publisher_cls):
|
||||||
fake_publisher_cls.return_value = self.publisher
|
fake_publisher_cls.return_value = self.publisher
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
notifier = messaging.get_notifier(self.transport,
|
notifier = messaging.get_notifier(self.transport,
|
||||||
'compute.vagrant-precise')
|
'compute.vagrant-precise')
|
||||||
notifier.error({}, 'compute.instance.error',
|
notifier.error({}, 'compute.instance.error',
|
||||||
|
@ -300,10 +309,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||||
mock.MagicMock(), # pipeline listener
|
mock.MagicMock(), # pipeline listener
|
||||||
mock.MagicMock(), # refresh pipeline listener
|
mock.MagicMock(), # refresh pipeline listener
|
||||||
]
|
]
|
||||||
|
self.run_service(self.srv)
|
||||||
self.srv.run()
|
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
|
|
||||||
listener = self.srv.pipeline_listener
|
listener = self.srv.pipeline_listener
|
||||||
self.srv._refresh_agent()
|
self.srv._refresh_agent()
|
||||||
self.assertIsNot(listener, self.srv.pipeline_listener)
|
self.assertIsNot(listener, self.srv.pipeline_listener)
|
||||||
|
@ -320,8 +326,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||||
hashring.belongs_to_self = _once_over_five
|
hashring.belongs_to_self = _once_over_five
|
||||||
self.srv.partition_coordinator = pc = mock.MagicMock()
|
self.srv.partition_coordinator = pc = mock.MagicMock()
|
||||||
pc.join_partitioned_group.return_value = hashring
|
pc.join_partitioned_group.return_value = hashring
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
topics = [target.topic for target in mock_listener.call_args[0][1]]
|
topics = [target.topic for target in mock_listener.call_args[0][1]]
|
||||||
self.assertEqual(4, len(topics))
|
self.assertEqual(4, len(topics))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
@ -333,8 +338,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||||
|
|
||||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||||
def test_notify_to_relevant_endpoint(self, mock_listener):
|
def test_notify_to_relevant_endpoint(self, mock_listener):
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
|
|
||||||
targets = mock_listener.call_args[0][1]
|
targets = mock_listener.call_args[0][1]
|
||||||
self.assertIsNotEmpty(targets)
|
self.assertIsNotEmpty(targets)
|
||||||
|
@ -360,8 +364,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||||
|
|
||||||
@mock.patch('oslo_messaging.Notifier.sample')
|
@mock.patch('oslo_messaging.Notifier.sample')
|
||||||
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
|
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
|
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
|
||||||
if (hasattr(endpoint, 'filter_rule') and
|
if (hasattr(endpoint, 'filter_rule') and
|
||||||
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
|
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
|
||||||
|
@ -397,7 +400,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||||
mock_notifier.call_args_list[2][1]['event_type'])
|
mock_notifier.call_args_list[2][1]['event_type'])
|
||||||
|
|
||||||
|
|
||||||
class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
class TestRealNotificationMultipleAgents(BaseNotificationTest):
|
||||||
def setup_pipeline(self, transformers):
|
def setup_pipeline(self, transformers):
|
||||||
pipeline = yaml.dump({
|
pipeline = yaml.dump({
|
||||||
'sources': [{
|
'sources': [{
|
||||||
|
@ -466,8 +469,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
||||||
hashring_srv1.belongs_to_self = _sometimes_srv
|
hashring_srv1.belongs_to_self = _sometimes_srv
|
||||||
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
|
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
|
||||||
pc.join_partitioned_group.return_value = hashring_srv1
|
pc.join_partitioned_group.return_value = hashring_srv1
|
||||||
self.srv.run()
|
self.run_service(self.srv)
|
||||||
self.addCleanup(self.srv.terminate)
|
|
||||||
|
|
||||||
def _sometimes_srv2(item):
|
def _sometimes_srv2(item):
|
||||||
maybe["srv2"] += 1
|
maybe["srv2"] += 1
|
||||||
|
@ -480,8 +482,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
||||||
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
|
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
|
||||||
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
|
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
|
||||||
pc.join_partitioned_group.return_value = hashring
|
pc.join_partitioned_group.return_value = hashring
|
||||||
self.srv2.run()
|
self.run_service(self.srv2)
|
||||||
self.addCleanup(self.srv2.terminate)
|
|
||||||
|
|
||||||
notifier = messaging.get_notifier(self.transport,
|
notifier = messaging.get_notifier(self.transport,
|
||||||
"compute.vagrant-precise")
|
"compute.vagrant-precise")
|
||||||
|
|
Loading…
Reference in New Issue