From dd06bf9277774c56121be0b4878c8973f38e761d Mon Sep 17 00:00:00 2001 From: liusheng Date: Wed, 27 Apr 2016 10:33:30 +0800 Subject: [PATCH] Fix and improve the partition coordinator * Fix the partition coordinator to distribute tasks properly. * Improve the partition coordination mechanism in retry logic, exception handling, and log messages, etc. Refer to the Ceilometer's changes: - Icf60381e30f3baf986cf9e008e133287765d9827 - I6a48cf38b24a00a0db94d3dea0c6746b52526026 - Ic0b6b62dace88e4e1ce7932024350bb211efb9ef - I8100160a3aa83a190c4110e6e8be9b26aef8fd1c - I2aed2241ded798464089b3eec5e1394422a45844 Closes-Bug: #1575530 Change-Id: I5729ae3080898e8a6d92889f8c520174dc371113 --- aodh/coordination.py | 96 +++++++++++++++---- aodh/evaluator/__init__.py | 9 +- aodh/tests/unit/test_coordination.py | 11 +-- aodh/tests/unit/test_evaluator.py | 6 +- ...rdinator-improvement-ff1c257f69f120ac.yaml | 6 ++ requirements.txt | 2 +- 6 files changed, 96 insertions(+), 34 deletions(-) create mode 100644 releasenotes/notes/partition-coordinator-improvement-ff1c257f69f120ac.yaml diff --git a/aodh/coordination.py b/aodh/coordination.py index ea2f58732..666e39253 100644 --- a/aodh/coordination.py +++ b/aodh/coordination.py @@ -19,10 +19,11 @@ import uuid from oslo_config import cfg from oslo_log import log +import retrying import six import tooz.coordination -from aodh.i18n import _LE, _LI +from aodh.i18n import _LE, _LI, _LW LOG = log.getLogger(__name__) @@ -40,11 +41,40 @@ OPTS = [ cfg.FloatOpt('check_watchers', default=10.0, help='Number of seconds between checks to see if group ' - 'membership has changed') - + 'membership has changed'), + cfg.IntOpt('retry_backoff', + default=1, + help='Retry backoff factor when retrying to connect with' + ' coordination backend'), + cfg.IntOpt('max_retry_interval', + default=30, + help='Maximum number of seconds between retry to join ' + 'partitioning group') ] +class ErrorJoiningPartitioningGroup(Exception): + def __init__(self): + super(ErrorJoiningPartitioningGroup, self).__init__(_LE( + 'Error occurred when joining partitioning group')) + + +class MemberNotInGroupError(Exception): + def __init__(self, group_id, members, my_id): + super(MemberNotInGroupError, self).__init__(_LE( + 'Group ID: %{group_id}s, Members: %{members}s, Me: %{me}s: ' + 'Current agent is not part of group and cannot take tasks') % + {'group_id': group_id, 'members': members, 'me': my_id}) + + +def retry_on_error_joining_partition(exception): + return isinstance(exception, ErrorJoiningPartitioningGroup) + + +def retry_on_member_not_in_group(exception): + return isinstance(exception, MemberNotInGroupError) + + class HashRing(object): def __init__(self, nodes, replicas=100): @@ -89,12 +119,12 @@ class PartitionCoordinator(object): empty iterable in this case. """ - def __init__(self, backend_url, my_id=None): - self.backend_url = backend_url + def __init__(self, conf, my_id=None): + self.conf = conf + self.backend_url = self.conf.coordination.backend_url self._coordinator = None self._groups = set() self._my_id = my_id or str(uuid.uuid4()) - self._started = False def start(self): if self.backend_url: @@ -102,10 +132,8 @@ class PartitionCoordinator(object): self._coordinator = tooz.coordination.get_coordinator( self.backend_url, self._my_id) self._coordinator.start() - self._started = True LOG.info(_LI('Coordination backend started successfully.')) except tooz.coordination.ToozError: - self._started = False LOG.exception(_LE('Error connecting to coordination backend.')) def stop(self): @@ -121,14 +149,13 @@ class PartitionCoordinator(object): LOG.exception(_LE('Error connecting to coordination backend.')) finally: self._coordinator = None - self._started = False def is_active(self): return self._coordinator is not None def heartbeat(self): if self._coordinator: - if not self._started: + if not self._coordinator.is_started: # re-connect self.start() try: @@ -147,14 +174,23 @@ class PartitionCoordinator(object): self._coordinator.run_watchers() def join_group(self, group_id): - if not self._coordinator or not self._started or not group_id: + if (not self._coordinator or not self._coordinator.is_started + or not group_id): return - while True: + + retry_backoff = self.conf.coordination.retry_backoff * 1000 + max_retry_interval = self.conf.coordination.max_retry_interval * 1000 + + @retrying.retry( + wait_exponential_multiplier=retry_backoff, + wait_exponential_max=max_retry_interval, + retry_on_exception=retry_on_error_joining_partition, + wrap_exception=True) + def _inner(): try: join_req = self._coordinator.join_group(group_id) join_req.get() LOG.info(_LI('Joined partitioning group %s'), group_id) - break except tooz.coordination.MemberAlreadyExist: return except tooz.coordination.GroupNotCreated: @@ -163,7 +199,14 @@ class PartitionCoordinator(object): create_grp_req.get() except tooz.coordination.GroupAlreadyExist: pass - self._groups.add(group_id) + raise ErrorJoiningPartitioningGroup() + except tooz.coordination.ToozError: + LOG.exception(_LE('Error joining partitioning group %s,' + ' re-trying'), group_id) + raise ErrorJoiningPartitioningGroup() + self._groups.add(group_id) + + return _inner() def leave_group(self, group_id): if group_id not in self._groups: @@ -184,7 +227,9 @@ class PartitionCoordinator(object): except tooz.coordination.GroupNotCreated: self.join_group(group_id) - def extract_my_subset(self, group_id, iterable): + @retrying.retry(stop_max_attempt_number=5, wait_random_max=2000, + retry_on_exception=retry_on_member_not_in_group) + def extract_my_subset(self, group_id, universal_set): """Filters an iterable, returning only objects assigned to this agent. We have a list of objects and get a list of active group members from @@ -192,17 +237,26 @@ class PartitionCoordinator(object): the ones that hashed into *our* bucket. """ if not group_id: - return iterable + return universal_set if group_id not in self._groups: self.join_group(group_id) try: members = self._get_members(group_id) - LOG.debug('Members of group: %s', members) + LOG.debug('Members of group: %s, Me: %s', members, self._my_id) + if self._my_id not in members: + LOG.warning(_LW('Cannot extract tasks because agent failed to ' + 'join group properly. Rejoining group.')) + self.join_group(group_id) + members = self._get_members(group_id) + if self._my_id not in members: + raise MemberNotInGroupError(group_id, members, self._my_id) + LOG.debug('Members of group: %s, Me: %s', members, self._my_id) hr = HashRing(members) - filtered = [v for v in iterable - if hr.get_node(str(v)) == self._my_id] - LOG.debug('My subset: %s', filtered) - return filtered + LOG.debug('Universal set: %s', universal_set) + my_subset = [v for v in universal_set + if hr.get_node(str(v)) == self._my_id] + LOG.debug('My subset: %s', my_subset) + return my_subset except tooz.coordination.ToozError: LOG.exception(_LE('Error getting group membership info from ' 'coordination backend.')) diff --git a/aodh/evaluator/__init__.py b/aodh/evaluator/__init__.py index 1990aaa71..be4fcbc41 100644 --- a/aodh/evaluator/__init__.py +++ b/aodh/evaluator/__init__.py @@ -229,7 +229,7 @@ class AlarmEvaluationService(os_service.Service): self.storage_conn = None self._load_evaluators() self.partition_coordinator = coordination.PartitionCoordinator( - self.conf.coordination.backend_url) + self.conf) self.partition_coordinator.start() self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) @@ -283,5 +283,8 @@ class AlarmEvaluationService(os_service.Service): # those alarms. all_alarms = self._storage_conn.get_alarms(enabled=True, exclude=dict(type='event')) - return self.partition_coordinator.extract_my_subset( - self.PARTITIONING_GROUP_NAME, all_alarms) + all_alarms = list(all_alarms) + all_alarm_ids = [a.alarm_id for a in all_alarms] + selected = self.partition_coordinator.extract_my_subset( + self.PARTITIONING_GROUP_NAME, all_alarm_ids) + return list(filter(lambda a: a.alarm_id in selected, all_alarms)) diff --git a/aodh/tests/unit/test_coordination.py b/aodh/tests/unit/test_coordination.py index 3b5d6e5cb..2751d257e 100644 --- a/aodh/tests/unit/test_coordination.py +++ b/aodh/tests/unit/test_coordination.py @@ -26,9 +26,10 @@ class MockToozCoordinator(object): def __init__(self, member_id, shared_storage): self._member_id = member_id self._groups = shared_storage + self.is_started = False def start(self): - pass + self.is_started = True def stop(self): pass @@ -154,8 +155,7 @@ class TestPartitioning(base.BaseTestCase): with mock.patch('tooz.coordination.get_coordinator', lambda _, member_id: coordinator_cls(member_id, shared_storage)): - pc = coordination.PartitionCoordinator( - self.CONF.coordination.backend_url, agent_id) + pc = coordination.PartitionCoordinator(self.CONF, agent_id) pc.start() return pc @@ -244,7 +244,7 @@ class TestPartitioning(base.BaseTestCase): def test_group_id_none(self): coord = self._get_new_started_coordinator({}, 'a') - self.assertTrue(coord._started) + self.assertTrue(coord._coordinator.is_started) with mock.patch.object(coord._coordinator, 'join_group') as mocked: coord.join_group(None) @@ -255,9 +255,8 @@ class TestPartitioning(base.BaseTestCase): def test_stop(self): coord = self._get_new_started_coordinator({}, 'a') - self.assertTrue(coord._started) + self.assertTrue(coord._coordinator.is_started) coord.join_group("123") coord.stop() self.assertIsEmpty(coord._groups) - self.assertFalse(coord._started) self.assertIsNone(coord._coordinator) diff --git a/aodh/tests/unit/test_evaluator.py b/aodh/tests/unit/test_evaluator.py index 3ea7d9877..c79270a7b 100644 --- a/aodh/tests/unit/test_evaluator.py +++ b/aodh/tests/unit/test_evaluator.py @@ -86,8 +86,8 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): @mock.patch('aodh.storage.get_connection_from_config') @mock.patch('aodh.coordination.PartitionCoordinator') def test_evaluation_cycle(self, m_pc, m_conn, m_em): - alarm = mock.Mock(type='threshold') - m_pc.return_value.extract_my_subset.return_value = [alarm] + alarm = mock.Mock(type='threshold', alarm_id="alarm_id1") + m_pc.return_value.extract_my_subset.return_value = ["alarm_id1"] m_pc.return_value.is_active.return_value = False m_conn.return_value.get_alarms.return_value = [alarm] m_em.return_value = self.evaluators @@ -100,7 +100,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase): target = self.svc.partition_coordinator.extract_my_subset target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME, - [alarm]) + ["alarm_id1"]) self.threshold_eval.evaluate.assert_called_once_with(alarm) @mock.patch('stevedore.extension.ExtensionManager') diff --git a/releasenotes/notes/partition-coordinator-improvement-ff1c257f69f120ac.yaml b/releasenotes/notes/partition-coordinator-improvement-ff1c257f69f120ac.yaml new file mode 100644 index 000000000..d87e0f6c9 --- /dev/null +++ b/releasenotes/notes/partition-coordinator-improvement-ff1c257f69f120ac.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - > + [`bug 1575530 `_] + Patch was added to fix and improve the partition coordinator, make sure + the input tasks can be correctly distributed to partition members. diff --git a/requirements.txt b/requirements.txt index d59b0b5ab..468bf9724 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ pytz>=2013.6 requests>=2.5.2 six>=1.9.0 stevedore>=1.5.0 # Apache-2.0 -tooz>=0.16.0 # Apache-2.0 +tooz>=1.28.0 # Apache-2.0 Werkzeug>=0.7 # BSD License WebOb>=1.2.3 WSME>=0.8