Replace Ceilometer coordination layer by tooz partition system

This replaces the custom made partitioning system using the hashring by the one
provided in tooz.

Change-Id: I2321c92315accc5e5972138e7673d3a665df891e
This commit is contained in:
Julien Danjou 2017-03-13 14:08:44 +01:00
parent 7290b4ec8c
commit 27604abd46
8 changed files with 125 additions and 470 deletions

View File

@ -32,9 +32,9 @@ from oslo_utils import timeutils
from six import moves
from six.moves.urllib import parse as urlparse
from stevedore import extension
from tooz import coordination
from ceilometer.agent import plugin_base
from ceilometer import coordination
from ceilometer import keystone_client
from ceilometer import messaging
from ceilometer import pipeline
@ -102,14 +102,16 @@ class Resources(object):
source_discovery = (self.agent_manager.discover(self._discovery,
discovery_cache)
if self._discovery else [])
static_resources = []
if self._resources:
static_resources_group = self.agent_manager.construct_group_id(
utils.hash_of_set(self._resources))
p_coord = self.agent_manager.partition_coordinator
static_resources = p_coord.extract_my_subset(
static_resources_group, self._resources)
return static_resources + source_discovery
return list(filter(
self.agent_manager.hashrings[
static_resources_group].belongs_to_self, self._resources
)) + source_discovery
return source_discovery
@staticmethod
def key(source_name, pollster):
@ -278,8 +280,8 @@ class AgentManager(cotyledon.Service):
if self.conf.coordination.backend_url:
# XXX uuid4().bytes ought to work, but it requires ascii for now
coordination_id = str(uuid.uuid4()).encode('ascii')
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf, coordination_id)
self.partition_coordinator = coordination.get_coordinator(
self.conf.coordination.backend_url, coordination_id)
else:
self.partition_coordinator = None
@ -355,8 +357,9 @@ class AgentManager(cotyledon.Service):
])
groups.update(static_resource_groups)
for group in groups:
self.partition_coordinator.join_group(group)
self.hashrings = dict(
(group, self.partition_coordinator.join_partitioned_group(group))
for group in groups)
def create_polling_task(self):
"""Create an initially empty polling task."""
@ -492,17 +495,17 @@ class AgentManager(cotyledon.Service):
continue
discovered = discoverer.discover(self, param)
if self.partition_coordinator:
partitioned = (
self.partition_coordinator.extract_my_subset(
self.construct_group_id(discoverer.group_id),
discovered)
)
else:
partitioned = discovered
resources.extend(partitioned)
discovered = list(filter(
self.hashrings[
self.construct_group_id(discoverer.group_id)
].belongs_to_self, discovered
))
resources.extend(discovered)
if discovery_cache is not None:
discovery_cache[url] = partitioned
discovery_cache[url] = discovered
except ka_exceptions.ClientException as e:
LOG.error('Skipping %(name)s, keystone issue: '
'%(exc)s', {'name': name, 'exc': e})

View File

@ -1,141 +0,0 @@
#
# Copyright 2014-2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from oslo_config import cfg
from oslo_log import log
import tenacity
import tooz.coordination
from tooz import hashring
LOG = log.getLogger(__name__)
OPTS = [
cfg.StrOpt('backend_url',
help='The backend URL to use for distributed coordination. If '
'left empty, per-deployment central agent and per-host '
'compute agent won\'t do workload '
'partitioning and will only function correctly if a '
'single instance of that service is running.'),
cfg.FloatOpt('check_watchers',
default=10.0,
help='Number of seconds between checks to see if group '
'membership has changed'),
cfg.IntOpt('retry_backoff',
default=1,
help='Retry backoff factor when retrying to connect with '
'coordination backend'),
cfg.IntOpt('max_retry_interval',
default=30,
help='Maximum number of seconds between retry to join '
'partitioning group')
]
class PartitionCoordinator(object):
"""Workload partitioning coordinator.
This class uses the `tooz` library to manage group membership.
Coordination errors and reconnects are handled under the hood, so the
service using the partition coordinator need not care whether the
coordination backend is down. The `extract_my_subset` will simply return an
empty iterable in this case.
"""
def __init__(self, conf, my_id):
self.conf = conf
self._my_id = my_id
self._coordinator = tooz.coordination.get_coordinator(
conf.coordination.backend_url, my_id)
def start(self):
try:
self._coordinator.start(start_heart=True)
LOG.info('Coordination backend started successfully.')
except tooz.coordination.ToozError:
LOG.exception('Error connecting to coordination backend.')
def stop(self):
try:
self._coordinator.stop()
except tooz.coordination.ToozError:
LOG.exception('Error connecting to coordination backend.')
finally:
del self._coordinator
def watch_group(self, namespace, callback):
self._coordinator.watch_join_group(namespace, callback)
self._coordinator.watch_leave_group(namespace, callback)
def run_watchers(self):
self._coordinator.run_watchers()
def join_group(self, group_id):
@tenacity.retry(
wait=tenacity.wait_exponential(
multiplier=self.conf.coordination.retry_backoff,
max=self.conf.coordination.max_retry_interval),
retry=tenacity.retry_never)
def _inner():
try:
self._coordinator.join_group_create(group_id)
except tooz.coordination.MemberAlreadyExist:
pass
except tooz.coordination.ToozError:
LOG.exception('Error joining partitioning group %s,'
' re-trying', group_id)
raise tenacity.TryAgain
LOG.info('Joined partitioning group %s', group_id)
return _inner()
def _get_members(self, group_id):
while True:
get_members_req = self._coordinator.get_members(group_id)
try:
return get_members_req.get()
except tooz.coordination.GroupNotCreated:
self.join_group(group_id)
def extract_my_subset(self, group_id, iterable):
"""Filters an iterable, returning only objects assigned to this agent.
We have a list of objects and get a list of active group members from
`tooz`. We then hash all the objects into buckets and return only
the ones that hashed into *our* bucket.
"""
try:
members = self._get_members(group_id)
hr = hashring.HashRing(members, partitions=100)
iterable = list(iterable)
filtered = [v for v in iterable
if self._my_id in hr.get_nodes(self.encode_task(v))]
LOG.debug('The universal set: %s, my subset: %s',
[six.text_type(f) for f in iterable],
[six.text_type(f) for f in filtered])
return filtered
except tooz.coordination.ToozError:
LOG.exception('Error getting group membership info from '
'coordination backend.')
return []
@staticmethod
def encode_task(value):
"""encode to bytes"""
return six.text_type(value).encode('utf-8')

View File

@ -1,4 +1,5 @@
#
# Copyright 2017 Red Hat, Inc.
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -25,9 +26,10 @@ from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
import six
from stevedore import extension
from tooz import coordination
from ceilometer import coordination
from ceilometer.event import endpoint as event_endpoint
from ceilometer.i18n import _
from ceilometer import messaging
@ -112,9 +114,14 @@ class NotificationService(cotyledon.Service):
super(NotificationService, self).__init__(worker_id)
self.startup_delay = worker_id
self.conf = conf
# XXX uuid4().bytes ought to work, but it requires ascii for now
self.coordination_id = (coordination_id or
str(uuid.uuid4()).encode('ascii'))
if self.conf.notification.workload_partitioning:
# XXX uuid4().bytes ought to work, but it requires ascii for now
coordination_id = (coordination_id or
str(uuid.uuid4()).encode('ascii'))
self.partition_coordinator = coordination.get_coordinator(
self.conf.coordination.backend_url, coordination_id)
else:
self.partition_coordinator = None
@classmethod
def _get_notifications_manager(cls, pm):
@ -168,7 +175,6 @@ class NotificationService(cotyledon.Service):
super(NotificationService, self).run()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
self.coord_lock = threading.Lock()
self.listeners = []
@ -184,9 +190,6 @@ class NotificationService(cotyledon.Service):
self.transport = messaging.get_transport(self.conf)
if self.conf.notification.workload_partitioning:
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf, self.coordination_id)
self.partition_coordinator.start()
else:
# FIXME(sileht): endpoint uses the notification_topics option
@ -204,9 +207,8 @@ class NotificationService(cotyledon.Service):
if self.conf.notification.workload_partitioning:
# join group after all manager set up is configured
self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
self.hashring = self.partition_coordinator.join_partitioned_group(
self.NOTIFICATION_NAMESPACE)
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
run_immediately=True)
@ -219,7 +221,6 @@ class NotificationService(cotyledon.Service):
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
# configure pipelines after all coordination is configured.
with self.coord_lock:
self._configure_pipeline_listener()
@ -275,14 +276,13 @@ class NotificationService(cotyledon.Service):
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport(self.conf)
partitioned = six.moves.range(
self.conf.notification.pipeline_processing_queues
)
if self.partition_coordinator:
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(self.conf.notification.pipeline_processing_queues))
else:
partitioned = range(
self.conf.notification.pipeline_processing_queues
)
partitioned = list(filter(
self.hashring.belongs_to_self, partitioned))
endpoints = []
targets = []

View File

@ -26,7 +26,6 @@ import ceilometer.compute.virt.inspector
import ceilometer.compute.virt.libvirt.utils
import ceilometer.compute.virt.vmware.inspector
import ceilometer.compute.virt.xenapi.inspector
import ceilometer.coordination
import ceilometer.dispatcher
import ceilometer.dispatcher.file
import ceilometer.dispatcher.gnocchi_opts
@ -93,7 +92,20 @@ def list_opts():
ceilometer.api.controllers.v2.root.API_OPTS)),
('collector', ceilometer.collector.OPTS),
('compute', ceilometer.compute.discovery.OPTS),
('coordination', ceilometer.coordination.OPTS),
('coordination', [
cfg.StrOpt(
'backend_url',
help='The backend URL to use for distributed coordination. If '
'left empty, per-deployment central agent and per-host '
'compute agent won\'t do workload '
'partitioning and will only function correctly if a '
'single instance of that service is running.'),
cfg.FloatOpt(
'check_watchers',
default=10.0,
help='Number of seconds between checks to see if group '
'membership has changed'),
]),
('database', ceilometer.storage.OPTS),
('dispatcher_file', ceilometer.dispatcher.file.OPTS),
('dispatcher_http', ceilometer.dispatcher.http.http_dispatcher_opts),

View File

@ -297,12 +297,14 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock())
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
def test_notification_threads(self, m_listener):
self.CONF.set_override('batch_size', 1, group='notification')
self.srv.run()
m_listener.assert_called_with(override_pool_size=None)
m_listener.reset_mock()
self.srv.terminate()
self.CONF.set_override('batch_size', 2, group='notification')
self.srv.run()
m_listener.assert_called_with(override_pool_size=1)
@ -324,22 +326,32 @@ class TestRealNotificationHA(BaseRealNotification):
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_retain_common_targets_on_refresh(self, mock_listener):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.run()
self.addCleanup(self.srv.terminate)
maybe = {"maybe": 0}
def _once_over_five(item):
maybe["maybe"] += 1
return maybe["maybe"] % 5 == 0
hashring = mock.MagicMock()
hashring.belongs_to_self = _once_over_five
self.srv.partition_coordinator = pc = mock.MagicMock()
pc.join_partitioned_group.return_value = hashring
self.srv.run()
self.addCleanup(self.srv.terminate)
listened_before = [target.topic for target in
mock_listener.call_args[0][1]]
self.assertEqual(4, len(listened_before))
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 3]):
self.srv._refresh_agent(None)
self.srv._refresh_agent(None)
listened_after = [target.topic for target in
mock_listener.call_args[0][1]]
self.assertEqual(4, len(listened_after))
common = set(listened_before) & set(listened_after)
for topic in common:
self.assertTrue(topic.endswith('1'))
self.assertEqual(
{'ceilometer-pipe-test_pipeline:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-4',
'ceilometer-pipe-event:test_event:test_sink-9',
'ceilometer-pipe-test_pipeline:test_sink-9'},
common)
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_notify_to_relevant_endpoint(self, mock_listener):
@ -463,14 +475,31 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
self.srv = notification.NotificationService(0, self.CONF, 'harry')
self.srv2 = notification.NotificationService(0, self.CONF, 'lloyd')
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv2.run()
self.addCleanup(self.srv2.terminate)
maybe = {"srv": 0, "srv2": -1}
def _sometimes_srv(item):
maybe["srv"] += 1
return (maybe["srv"] % 2) == 0
self.srv = notification.NotificationService(0, self.CONF)
self.srv.partition_coordinator = pc = mock.MagicMock()
hashring = mock.MagicMock()
hashring.belongs_to_self = _sometimes_srv
pc.join_partitioned_group.return_value = hashring
self.srv.run()
self.addCleanup(self.srv.terminate)
def _sometimes_srv2(item):
maybe["srv2"] += 1
return (maybe["srv2"] % 2) == 0
self.srv2 = notification.NotificationService(0, self.CONF)
self.srv2.partition_coordinator = pc = mock.MagicMock()
hashring = mock.MagicMock()
hashring.belongs_to_self = _sometimes_srv2
pc.join_partitioned_group.return_value = hashring
self.srv2.run()
self.addCleanup(self.srv2.terminate)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
@ -483,7 +512,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
self.expected_samples = 4
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
start = time.time()
while time.time() - start < 60:
while time.time() - start < 10:
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples):
break

View File

@ -32,7 +32,6 @@ from ceilometer import pipeline
from ceilometer import sample
from ceilometer import service
from ceilometer.tests import base
from ceilometer import utils
class TestSample(sample.Sample):
@ -248,16 +247,20 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.set_override("backend_url", "zake://", "coordination")
self.CONF.set_override(
'cfg_file',
self.path_get('etc/ceilometer/polling.yaml'), group='polling'
)
self.mgr = self.create_manager()
self.mgr.extensions = self.create_extension_list()
self.mgr.partition_coordinator = mock.MagicMock()
fake_subset = lambda _, x: x
p_coord = self.mgr.partition_coordinator
p_coord.extract_my_subset.side_effect = fake_subset
self.hashring = mock.MagicMock()
self.hashring.belongs_to_self = mock.MagicMock()
self.hashring.belongs_to_self.return_value = True
self.mgr.hashrings = mock.MagicMock()
self.mgr.hashrings.__getitem__.return_value = self.hashring
self.mgr.tg = mock.MagicMock()
self.polling_cfg = {
'sources': [{
@ -291,27 +294,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_polling')
def test_start(self, setup_polling):
self.mgr.setup_polling_tasks = mock.MagicMock()
mpc = self.mgr.partition_coordinator
self.mgr.run()
setup_polling.assert_called_once_with(self.CONF)
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
self.mgr.setup_polling_tasks.assert_called_once_with()
self.mgr.terminate()
mpc.stop.assert_called_once_with()
def test_join_partitioning_groups(self):
self.mgr.discoveries = self.create_discoveries()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
static_group_ids = [utils.hash_of_set(p['resources'])
for p in self.polling_cfg['sources']
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
@ -578,7 +564,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_discovery_partitioning(self):
self.mgr.discoveries = self.create_discoveries()
p_coord = self.mgr.partition_coordinator
self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
@ -586,17 +571,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discoveries
if hasattr(d.obj, 'resources')]
self.assertEqual(len(expected),
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
self.mgr.hashrings.__getitem__.assert_called_with(
'central-compute-another_group')
self.hashring.belongs_to_self.assert_not_called()
def test_static_resources_partitioning(self):
p_coord = self.mgr.partition_coordinator
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
self.polling_cfg['sources'][0]['resources'] = static_resources
@ -616,17 +595,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
# Only two groups need to be created, one for each polling,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
utils.hash_of_set(resources)),
resources)
for resources in [static_resources,
static_resources2]]
self.assertEqual(len(expected),
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
self.hashring.belongs_to_self.assert_has_calls([
mock.call('static_1'),
mock.call('static_2'),
mock.call('static_3'),
mock.call('static_4'),
], any_order=True)
@mock.patch('ceilometer.agent.manager.LOG')
def test_polling_and_notify_with_resources(self, LOG):

View File

@ -1,228 +0,0 @@
#
# Copyright 2014-2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
import tooz.coordination
from tooz import hashring
from ceilometer import coordination
from ceilometer import service
from ceilometer.tests import base
class MockToozCoordinator(tooz.coordination.CoordinationDriver):
def __init__(self, member_id, shared_storage):
super(MockToozCoordinator, self).__init__(member_id)
self._member_id = member_id
self._shared_storage = shared_storage
def create_group(self, group_id):
if group_id in self._shared_storage:
return MockAsyncError(
tooz.coordination.GroupAlreadyExist(group_id))
self._shared_storage[group_id] = {}
return MockAsyncResult(None)
def join_group(self, group_id, capabilities=b''):
if group_id not in self._shared_storage:
return MockAsyncError(
tooz.coordination.GroupNotCreated(group_id))
if self._member_id in self._shared_storage[group_id]:
return MockAsyncError(
tooz.coordination.MemberAlreadyExist(group_id,
self._member_id))
self._shared_storage[group_id][self._member_id] = {
"capabilities": capabilities,
}
return MockAsyncResult(None)
def get_members(self, group_id):
if group_id not in self._shared_storage:
return MockAsyncError(
tooz.coordination.GroupNotCreated(group_id))
return MockAsyncResult(self._shared_storage[group_id])
class MockToozCoordExceptionOnJoinRaiser(MockToozCoordinator):
def __init__(self, member_id, shared_storage, retry_count=None):
super(MockToozCoordExceptionOnJoinRaiser,
self).__init__(member_id, shared_storage)
self.tooz_error_count = retry_count
self.count = 0
def join_group_create(self, group_id, capabilities=b''):
if self.count == self.tooz_error_count:
return super(
MockToozCoordExceptionOnJoinRaiser, self).join_group_create(
group_id, capabilities)
else:
self.count += 1
raise tooz.coordination.ToozError('error')
class MockAsyncResult(tooz.coordination.CoordAsyncResult):
def __init__(self, result):
self.result = result
def get(self, timeout=0):
return self.result
@staticmethod
def done():
return True
class MockAsyncError(tooz.coordination.CoordAsyncResult):
def __init__(self, error):
self.error = error
def get(self, timeout=0):
raise self.error
@staticmethod
def done():
return True
class MockLoggingHandler(logging.Handler):
"""Mock logging handler to check for expected logs."""
def __init__(self, *args, **kwargs):
self.reset()
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.messages[record.levelname.lower()].append(record.getMessage())
def reset(self):
self.messages = {'debug': [],
'info': [],
'warning': [],
'error': [],
'critical': []}
class TestPartitioning(base.BaseTestCase):
def setUp(self):
super(TestPartitioning, self).setUp()
self.CONF = service.prepare_service([], [])
self.str_handler = MockLoggingHandler()
coordination.LOG.logger.addHandler(self.str_handler)
self.shared_storage = {}
def _get_new_started_coordinator(self, shared_storage, agent_id=None,
coordinator_cls=None, retry_count=None):
coordinator_cls = coordinator_cls or MockToozCoordinator
self.CONF.set_override('backend_url', 'xxx://yyy',
group='coordination')
with mock.patch('tooz.coordination.get_coordinator',
lambda _, member_id:
coordinator_cls(member_id, shared_storage,
retry_count) if retry_count else
coordinator_cls(member_id, shared_storage)):
pc = coordination.PartitionCoordinator(self.CONF, agent_id)
pc.start()
self.addCleanup(pc.stop)
return pc
def _usage_simulation(self, *agents_kwargs):
partition_coordinators = []
for kwargs in agents_kwargs:
partition_coordinator = self._get_new_started_coordinator(
self.shared_storage, kwargs['agent_id'], kwargs.get(
'coordinator_cls'))
partition_coordinator.join_group(kwargs['group_id'])
partition_coordinators.append(partition_coordinator)
for i, kwargs in enumerate(agents_kwargs):
all_resources = kwargs.get('all_resources', [])
expected_resources = kwargs.get('expected_resources', [])
actual_resources = partition_coordinators[i].extract_my_subset(
kwargs['group_id'], all_resources)
self.assertEqual(expected_resources, actual_resources)
def test_single_group(self):
agents = [dict(agent_id='agent1', group_id='group'),
dict(agent_id='agent2', group_id='group')]
self._usage_simulation(*agents)
self.assertEqual(['group'], sorted(self.shared_storage.keys()))
self.assertEqual(['agent1', 'agent2'],
sorted(self.shared_storage['group'].keys()))
def test_multiple_groups(self):
agents = [dict(agent_id='agent1', group_id='group1'),
dict(agent_id='agent2', group_id='group2')]
self._usage_simulation(*agents)
self.assertEqual(['group1', 'group2'],
sorted(self.shared_storage.keys()))
def test_partitioning(self):
all_resources = ['resource_%s' % i for i in range(1000)]
agents = ['agent_%s' % i for i in range(10)]
expected_resources = [list() for _ in range(len(agents))]
hr = hashring.HashRing(agents, partitions=100)
for r in all_resources:
encode = coordination.PartitionCoordinator.encode_task
key = agents.index(list(hr.get_nodes(encode(r)))[0])
expected_resources[key].append(r)
agents_kwargs = []
for i, agent in enumerate(agents):
agents_kwargs.append(dict(agent_id=agent,
group_id='group',
all_resources=all_resources,
expected_resources=expected_resources[i]))
self._usage_simulation(*agents_kwargs)
def test_coordination_backend_connection_fail_on_join(self):
coord = self._get_new_started_coordinator(
{'group': {}}, 'agent1', MockToozCoordExceptionOnJoinRaiser,
retry_count=2)
with mock.patch('tooz.coordination.get_coordinator',
return_value=MockToozCoordExceptionOnJoinRaiser):
coord.join_group(group_id='group')
expected_errors = ['Error joining partitioning group group,'
' re-trying',
'Error joining partitioning group group,'
' re-trying']
self.assertEqual(expected_errors, self.str_handler.messages['error'])
def test_partitioning_with_unicode(self):
all_resources = [u'\u0634\u0628\u06a9\u0647',
u'\u0627\u0647\u0644',
u'\u0645\u062d\u0628\u0627\u0646']
agents = ['agent_%s' % i for i in range(2)]
expected_resources = [list() for _ in range(len(agents))]
hr = hashring.HashRing(agents, partitions=100)
for r in all_resources:
encode = coordination.PartitionCoordinator.encode_task
key = agents.index(list(hr.get_nodes(encode(r)))[0])
expected_resources[key].append(r)
agents_kwargs = []
for i, agent in enumerate(agents):
agents_kwargs.append(dict(agent_id=agent,
group_id='group',
all_resources=all_resources,
expected_resources=expected_resources[i]))
self._usage_simulation(*agents_kwargs)

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
Ceilometer now leverages the latest distribution mechanism provided by the
tooz library. Therefore the options `coordination.retry_backoff` and
`coordination.max_retry_interval` do not exist anymore.