Merge "raise coordination error if not registered"
This commit is contained in:
@@ -19,7 +19,7 @@ from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import tooz.coordination
|
||||
|
||||
from ceilometer.i18n import _LE, _LI
|
||||
from ceilometer.i18n import _LE, _LI, _LW
|
||||
from ceilometer import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@@ -154,7 +154,7 @@ class PartitionCoordinator(object):
|
||||
except tooz.coordination.GroupNotCreated:
|
||||
self.join_group(group_id)
|
||||
|
||||
def extract_my_subset(self, group_id, iterable):
|
||||
def extract_my_subset(self, group_id, iterable, attempt=0):
|
||||
"""Filters an iterable, returning only objects assigned to this agent.
|
||||
|
||||
We have a list of objects and get a list of active group members from
|
||||
@@ -168,11 +168,20 @@ class PartitionCoordinator(object):
|
||||
try:
|
||||
members = self._get_members(group_id)
|
||||
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
|
||||
if self._my_id not in members:
|
||||
raise tooz.coordination.MemberNotJoined(group_id, self._my_id)
|
||||
hr = utils.HashRing(members)
|
||||
filtered = [v for v in iterable
|
||||
if hr.get_node(str(v)) == self._my_id]
|
||||
LOG.debug('My subset: %s', [str(f) for f in filtered])
|
||||
return filtered
|
||||
except tooz.coordination.MemberNotJoined:
|
||||
if attempt >= 5:
|
||||
raise
|
||||
LOG.warning(_LW('Cannot extract tasks because agent failed to '
|
||||
'join group properly. Rejoining group.'))
|
||||
self.join_group(group_id)
|
||||
return self.extract_my_subset(group_id, iterable, attempt + 1)
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.exception(_LE('Error getting group membership info from '
|
||||
'coordination backend.'))
|
||||
|
||||
Reference in New Issue
Block a user