diff --git a/ceilometer/agent/base.py b/ceilometer/agent/base.py index c493a4fc..bbfbe6bb 100644 --- a/ceilometer/agent/base.py +++ b/ceilometer/agent/base.py @@ -285,6 +285,11 @@ class AgentManager(os_service.Service): self.tg.add_timer(cfg.CONF.coordination.heartbeat, self.partition_coordinator.heartbeat) + def stop(self): + if self.partition_coordinator: + self.partition_coordinator.stop() + super(AgentManager, self).stop() + @staticmethod def interval_task(task): task.poll_and_publish() diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py index 79276212..9dbbdcef 100644 --- a/ceilometer/coordination.py +++ b/ceilometer/coordination.py @@ -78,6 +78,21 @@ class PartitionCoordinator(object): self._started = False LOG.exception(_LE('Error connecting to coordination backend.')) + def stop(self): + if not self._coordinator: + return + + for group in list(self._groups): + self.leave_group(group) + + try: + self._coordinator.stop() + except tooz.coordination.ToozError: + LOG.exception(_LE('Error connecting to coordination backend.')) + finally: + self._coordinator = None + self._started = False + def is_active(self): return self._coordinator is not None @@ -121,8 +136,11 @@ class PartitionCoordinator(object): self._groups.add(group_id) def leave_group(self, group_id): + if group_id not in self._groups: + return if self._coordinator: self._coordinator.leave_group(group_id) + self._groups.remove(group_id) LOG.info(_LI('Left partitioning group %s'), group_id) def _get_members(self, group_id): diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 2b96455e..315e56b6 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -70,6 +70,12 @@ class NotificationService(os_service.Service): NOTIFICATION_NAMESPACE = 'ceilometer.notification' NOTIFICATION_IPC = 'ceilometer-pipe' + def __init__(self, *args, **kwargs): + super(NotificationService, self).__init__(*args, **kwargs) + self.partition_coordinator = None + self.listeners = self.pipeline_listeners = [] + self.group_id = None + @classmethod def _get_notifications_manager(cls, transporter): return extension.ExtensionManager( @@ -201,6 +207,7 @@ class NotificationService(os_service.Service): self.pipeline_listeners.append(listener) def stop(self): - self.partition_coordinator.leave_group(self.group_id) + if self.partition_coordinator: + self.partition_coordinator.stop() utils.kill_listeners(self.listeners + self.pipeline_listeners) super(NotificationService, self).stop() diff --git a/ceilometer/tests/test_coordination.py b/ceilometer/tests/test_coordination.py index 9ff5df8f..06158d63 100644 --- a/ceilometer/tests/test_coordination.py +++ b/ceilometer/tests/test_coordination.py @@ -32,6 +32,9 @@ class MockToozCoordinator(object): def start(self): pass + def stop(self): + pass + def heartbeat(self): pass @@ -55,6 +58,9 @@ class MockToozCoordinator(object): } return MockAsyncResult(None) + def leave_group(self, group_id): + return MockAsyncResult(None) + def get_members(self, group_id): if group_id not in self._groups: return MockAsyncError( @@ -221,3 +227,23 @@ class TestPartitioning(base.BaseTestCase): coord.heartbeat() for e in expected_errors: self.assertNotIn(e, self.str_handler.messages['error']) + + def test_group_id_none(self): + coord = self._get_new_started_coordinator({}, 'a') + self.assertTrue(coord._started) + + with mock.patch.object(coord._coordinator, 'join_group') as mocked: + coord.join_group(None) + self.assertEqual(0, mocked.call_count) + with mock.patch.object(coord._coordinator, 'leave_group') as mocked: + coord.leave_group(None) + self.assertEqual(0, mocked.call_count) + + def test_stop(self): + coord = self._get_new_started_coordinator({}, 'a') + self.assertTrue(coord._started) + coord.join_group("123") + coord.stop() + self.assertIsEmpty(coord._groups) + self.assertFalse(coord._started) + self.assertIsNone(coord._coordinator) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index 42f51704..4b6801d6 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -28,6 +28,7 @@ from ceilometer.compute.notifications import instance from ceilometer import messaging from ceilometer import notification from ceilometer.openstack.common import fileutils +import ceilometer.openstack.common.service from ceilometer.publisher import test as test_publisher from ceilometer.tests import base as tests_base @@ -250,6 +251,14 @@ class TestRealNotification(BaseRealNotification): fake_coord.return_value = fake_coord1 self._check_notification_service() + @mock.patch.object(ceilometer.openstack.common.service.Service, 'stop') + def test_notification_service_start_abnormal(self, mocked): + try: + self.srv.stop() + except Exception: + pass + self.assertEqual(1, mocked.call_count) + class TestRealNotificationHA(BaseRealNotification):