Fix and improve the partition coordinator

* Fix the partition coordinator to distribute tasks properly.

* Improve the partition coordination mechanism in retry logic, exception
  handling, and log messages, etc. Refer to the Ceilometer's changes:

- Icf60381e30f3baf986cf9e008e133287765d9827
- I6a48cf38b24a00a0db94d3dea0c6746b52526026
- Ic0b6b62dace88e4e1ce7932024350bb211efb9ef
- I8100160a3aa83a190c4110e6e8be9b26aef8fd1c
- I2aed2241ded798464089b3eec5e1394422a45844

Closes-Bug: #1575530
Change-Id: I5729ae3080898e8a6d92889f8c520174dc371113
This commit is contained in:
liusheng 2016-04-27 10:33:30 +08:00
parent c93768cc68
commit dd06bf9277
6 changed files with 96 additions and 34 deletions

View File

@ -19,10 +19,11 @@ import uuid
from oslo_config import cfg
from oslo_log import log
import retrying
import six
import tooz.coordination
from aodh.i18n import _LE, _LI
from aodh.i18n import _LE, _LI, _LW
LOG = log.getLogger(__name__)
@ -40,11 +41,40 @@ OPTS = [
cfg.FloatOpt('check_watchers',
default=10.0,
help='Number of seconds between checks to see if group '
'membership has changed')
'membership has changed'),
cfg.IntOpt('retry_backoff',
default=1,
help='Retry backoff factor when retrying to connect with'
' coordination backend'),
cfg.IntOpt('max_retry_interval',
default=30,
help='Maximum number of seconds between retry to join '
'partitioning group')
]
class ErrorJoiningPartitioningGroup(Exception):
def __init__(self):
super(ErrorJoiningPartitioningGroup, self).__init__(_LE(
'Error occurred when joining partitioning group'))
class MemberNotInGroupError(Exception):
def __init__(self, group_id, members, my_id):
super(MemberNotInGroupError, self).__init__(_LE(
'Group ID: %{group_id}s, Members: %{members}s, Me: %{me}s: '
'Current agent is not part of group and cannot take tasks') %
{'group_id': group_id, 'members': members, 'me': my_id})
def retry_on_error_joining_partition(exception):
return isinstance(exception, ErrorJoiningPartitioningGroup)
def retry_on_member_not_in_group(exception):
return isinstance(exception, MemberNotInGroupError)
class HashRing(object):
def __init__(self, nodes, replicas=100):
@ -89,12 +119,12 @@ class PartitionCoordinator(object):
empty iterable in this case.
"""
def __init__(self, backend_url, my_id=None):
self.backend_url = backend_url
def __init__(self, conf, my_id=None):
self.conf = conf
self.backend_url = self.conf.coordination.backend_url
self._coordinator = None
self._groups = set()
self._my_id = my_id or str(uuid.uuid4())
self._started = False
def start(self):
if self.backend_url:
@ -102,10 +132,8 @@ class PartitionCoordinator(object):
self._coordinator = tooz.coordination.get_coordinator(
self.backend_url, self._my_id)
self._coordinator.start()
self._started = True
LOG.info(_LI('Coordination backend started successfully.'))
except tooz.coordination.ToozError:
self._started = False
LOG.exception(_LE('Error connecting to coordination backend.'))
def stop(self):
@ -121,14 +149,13 @@ class PartitionCoordinator(object):
LOG.exception(_LE('Error connecting to coordination backend.'))
finally:
self._coordinator = None
self._started = False
def is_active(self):
return self._coordinator is not None
def heartbeat(self):
if self._coordinator:
if not self._started:
if not self._coordinator.is_started:
# re-connect
self.start()
try:
@ -147,14 +174,23 @@ class PartitionCoordinator(object):
self._coordinator.run_watchers()
def join_group(self, group_id):
if not self._coordinator or not self._started or not group_id:
if (not self._coordinator or not self._coordinator.is_started
or not group_id):
return
while True:
retry_backoff = self.conf.coordination.retry_backoff * 1000
max_retry_interval = self.conf.coordination.max_retry_interval * 1000
@retrying.retry(
wait_exponential_multiplier=retry_backoff,
wait_exponential_max=max_retry_interval,
retry_on_exception=retry_on_error_joining_partition,
wrap_exception=True)
def _inner():
try:
join_req = self._coordinator.join_group(group_id)
join_req.get()
LOG.info(_LI('Joined partitioning group %s'), group_id)
break
except tooz.coordination.MemberAlreadyExist:
return
except tooz.coordination.GroupNotCreated:
@ -163,7 +199,14 @@ class PartitionCoordinator(object):
create_grp_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
self._groups.add(group_id)
raise ErrorJoiningPartitioningGroup()
except tooz.coordination.ToozError:
LOG.exception(_LE('Error joining partitioning group %s,'
' re-trying'), group_id)
raise ErrorJoiningPartitioningGroup()
self._groups.add(group_id)
return _inner()
def leave_group(self, group_id):
if group_id not in self._groups:
@ -184,7 +227,9 @@ class PartitionCoordinator(object):
except tooz.coordination.GroupNotCreated:
self.join_group(group_id)
def extract_my_subset(self, group_id, iterable):
@retrying.retry(stop_max_attempt_number=5, wait_random_max=2000,
retry_on_exception=retry_on_member_not_in_group)
def extract_my_subset(self, group_id, universal_set):
"""Filters an iterable, returning only objects assigned to this agent.
We have a list of objects and get a list of active group members from
@ -192,17 +237,26 @@ class PartitionCoordinator(object):
the ones that hashed into *our* bucket.
"""
if not group_id:
return iterable
return universal_set
if group_id not in self._groups:
self.join_group(group_id)
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s', members)
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
if self._my_id not in members:
LOG.warning(_LW('Cannot extract tasks because agent failed to '
'join group properly. Rejoining group.'))
self.join_group(group_id)
members = self._get_members(group_id)
if self._my_id not in members:
raise MemberNotInGroupError(group_id, members, self._my_id)
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', filtered)
return filtered
LOG.debug('Universal set: %s', universal_set)
my_subset = [v for v in universal_set
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', my_subset)
return my_subset
except tooz.coordination.ToozError:
LOG.exception(_LE('Error getting group membership info from '
'coordination backend.'))

View File

@ -229,7 +229,7 @@ class AlarmEvaluationService(os_service.Service):
self.storage_conn = None
self._load_evaluators()
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf.coordination.backend_url)
self.conf)
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
@ -283,5 +283,8 @@ class AlarmEvaluationService(os_service.Service):
# those alarms.
all_alarms = self._storage_conn.get_alarms(enabled=True,
exclude=dict(type='event'))
return self.partition_coordinator.extract_my_subset(
self.PARTITIONING_GROUP_NAME, all_alarms)
all_alarms = list(all_alarms)
all_alarm_ids = [a.alarm_id for a in all_alarms]
selected = self.partition_coordinator.extract_my_subset(
self.PARTITIONING_GROUP_NAME, all_alarm_ids)
return list(filter(lambda a: a.alarm_id in selected, all_alarms))

View File

@ -26,9 +26,10 @@ class MockToozCoordinator(object):
def __init__(self, member_id, shared_storage):
self._member_id = member_id
self._groups = shared_storage
self.is_started = False
def start(self):
pass
self.is_started = True
def stop(self):
pass
@ -154,8 +155,7 @@ class TestPartitioning(base.BaseTestCase):
with mock.patch('tooz.coordination.get_coordinator',
lambda _, member_id:
coordinator_cls(member_id, shared_storage)):
pc = coordination.PartitionCoordinator(
self.CONF.coordination.backend_url, agent_id)
pc = coordination.PartitionCoordinator(self.CONF, agent_id)
pc.start()
return pc
@ -244,7 +244,7 @@ class TestPartitioning(base.BaseTestCase):
def test_group_id_none(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
self.assertTrue(coord._coordinator.is_started)
with mock.patch.object(coord._coordinator, 'join_group') as mocked:
coord.join_group(None)
@ -255,9 +255,8 @@ class TestPartitioning(base.BaseTestCase):
def test_stop(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
self.assertTrue(coord._coordinator.is_started)
coord.join_group("123")
coord.stop()
self.assertIsEmpty(coord._groups)
self.assertFalse(coord._started)
self.assertIsNone(coord._coordinator)

View File

@ -86,8 +86,8 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
@mock.patch('aodh.storage.get_connection_from_config')
@mock.patch('aodh.coordination.PartitionCoordinator')
def test_evaluation_cycle(self, m_pc, m_conn, m_em):
alarm = mock.Mock(type='threshold')
m_pc.return_value.extract_my_subset.return_value = [alarm]
alarm = mock.Mock(type='threshold', alarm_id="alarm_id1")
m_pc.return_value.extract_my_subset.return_value = ["alarm_id1"]
m_pc.return_value.is_active.return_value = False
m_conn.return_value.get_alarms.return_value = [alarm]
m_em.return_value = self.evaluators
@ -100,7 +100,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
target = self.svc.partition_coordinator.extract_my_subset
target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME,
[alarm])
["alarm_id1"])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
@mock.patch('stevedore.extension.ExtensionManager')

View File

@ -0,0 +1,6 @@
---
fixes:
- >
[`bug 1575530 <https://bugs.launchpad.net/aodh/+bug/1575530>`_]
Patch was added to fix and improve the partition coordinator, make sure
the input tasks can be correctly distributed to partition members.

View File

@ -29,7 +29,7 @@ pytz>=2013.6
requests>=2.5.2
six>=1.9.0
stevedore>=1.5.0 # Apache-2.0
tooz>=0.16.0 # Apache-2.0
tooz>=1.28.0 # Apache-2.0
Werkzeug>=0.7 # BSD License
WebOb>=1.2.3
WSME>=0.8