correctly leave group when process is stopped

ceilometer.notification.NotificationService.stop() use attribute before
it is initialized, it will raise exception when start() fails.

This patch fixes it by putting initialization in __init__().

Note, also adds coordinator.stop() when service is stopped for
polling agents.

Change-Id: Ied2f086e1f50950b430095ae7ee89036fd4a89d9
Closes-Bug: #1418793
This commit is contained in:
ZhiQiang Fan 2015-02-06 16:25:01 +08:00
parent b7633bdbdd
commit e5e4ac9997
5 changed files with 66 additions and 1 deletions

View File

@ -285,6 +285,11 @@ class AgentManager(os_service.Service):
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
def stop(self):
if self.partition_coordinator:
self.partition_coordinator.stop()
super(AgentManager, self).stop()
@staticmethod
def interval_task(task):
task.poll_and_publish()

View File

@ -78,6 +78,21 @@ class PartitionCoordinator(object):
self._started = False
LOG.exception(_LE('Error connecting to coordination backend.'))
def stop(self):
if not self._coordinator:
return
for group in list(self._groups):
self.leave_group(group)
try:
self._coordinator.stop()
except tooz.coordination.ToozError:
LOG.exception(_LE('Error connecting to coordination backend.'))
finally:
self._coordinator = None
self._started = False
def is_active(self):
return self._coordinator is not None
@ -121,8 +136,11 @@ class PartitionCoordinator(object):
self._groups.add(group_id)
def leave_group(self, group_id):
if group_id not in self._groups:
return
if self._coordinator:
self._coordinator.leave_group(group_id)
self._groups.remove(group_id)
LOG.info(_LI('Left partitioning group %s'), group_id)
def _get_members(self, group_id):

View File

@ -70,6 +70,12 @@ class NotificationService(os_service.Service):
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, *args, **kwargs):
super(NotificationService, self).__init__(*args, **kwargs)
self.partition_coordinator = None
self.listeners = self.pipeline_listeners = []
self.group_id = None
@classmethod
def _get_notifications_manager(cls, transporter):
return extension.ExtensionManager(
@ -201,6 +207,7 @@ class NotificationService(os_service.Service):
self.pipeline_listeners.append(listener)
def stop(self):
self.partition_coordinator.leave_group(self.group_id)
if self.partition_coordinator:
self.partition_coordinator.stop()
utils.kill_listeners(self.listeners + self.pipeline_listeners)
super(NotificationService, self).stop()

View File

@ -32,6 +32,9 @@ class MockToozCoordinator(object):
def start(self):
pass
def stop(self):
pass
def heartbeat(self):
pass
@ -55,6 +58,9 @@ class MockToozCoordinator(object):
}
return MockAsyncResult(None)
def leave_group(self, group_id):
return MockAsyncResult(None)
def get_members(self, group_id):
if group_id not in self._groups:
return MockAsyncError(
@ -221,3 +227,23 @@ class TestPartitioning(base.BaseTestCase):
coord.heartbeat()
for e in expected_errors:
self.assertNotIn(e, self.str_handler.messages['error'])
def test_group_id_none(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
with mock.patch.object(coord._coordinator, 'join_group') as mocked:
coord.join_group(None)
self.assertEqual(0, mocked.call_count)
with mock.patch.object(coord._coordinator, 'leave_group') as mocked:
coord.leave_group(None)
self.assertEqual(0, mocked.call_count)
def test_stop(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
coord.join_group("123")
coord.stop()
self.assertIsEmpty(coord._groups)
self.assertFalse(coord._started)
self.assertIsNone(coord._coordinator)

View File

@ -28,6 +28,7 @@ from ceilometer.compute.notifications import instance
from ceilometer import messaging
from ceilometer import notification
from ceilometer.openstack.common import fileutils
import ceilometer.openstack.common.service
from ceilometer.publisher import test as test_publisher
from ceilometer.tests import base as tests_base
@ -250,6 +251,14 @@ class TestRealNotification(BaseRealNotification):
fake_coord.return_value = fake_coord1
self._check_notification_service()
@mock.patch.object(ceilometer.openstack.common.service.Service, 'stop')
def test_notification_service_start_abnormal(self, mocked):
try:
self.srv.stop()
except Exception:
pass
self.assertEqual(1, mocked.call_count)
class TestRealNotificationHA(BaseRealNotification):