Merge "Replace Ceilometer coordination layer by tooz partition system"
This commit is contained in:
commit
2726419a11
@ -32,9 +32,9 @@ from oslo_utils import timeutils
|
||||
from six import moves
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from stevedore import extension
|
||||
from tooz import coordination
|
||||
|
||||
from ceilometer.agent import plugin_base
|
||||
from ceilometer import coordination
|
||||
from ceilometer import keystone_client
|
||||
from ceilometer import messaging
|
||||
from ceilometer import pipeline
|
||||
@ -102,14 +102,16 @@ class Resources(object):
|
||||
source_discovery = (self.agent_manager.discover(self._discovery,
|
||||
discovery_cache)
|
||||
if self._discovery else [])
|
||||
static_resources = []
|
||||
|
||||
if self._resources:
|
||||
static_resources_group = self.agent_manager.construct_group_id(
|
||||
utils.hash_of_set(self._resources))
|
||||
p_coord = self.agent_manager.partition_coordinator
|
||||
static_resources = p_coord.extract_my_subset(
|
||||
static_resources_group, self._resources)
|
||||
return static_resources + source_discovery
|
||||
return list(filter(
|
||||
self.agent_manager.hashrings[
|
||||
static_resources_group].belongs_to_self, self._resources
|
||||
)) + source_discovery
|
||||
|
||||
return source_discovery
|
||||
|
||||
@staticmethod
|
||||
def key(source_name, pollster):
|
||||
@ -278,8 +280,8 @@ class AgentManager(cotyledon.Service):
|
||||
if self.conf.coordination.backend_url:
|
||||
# XXX uuid4().bytes ought to work, but it requires ascii for now
|
||||
coordination_id = str(uuid.uuid4()).encode('ascii')
|
||||
self.partition_coordinator = coordination.PartitionCoordinator(
|
||||
self.conf, coordination_id)
|
||||
self.partition_coordinator = coordination.get_coordinator(
|
||||
self.conf.coordination.backend_url, coordination_id)
|
||||
else:
|
||||
self.partition_coordinator = None
|
||||
|
||||
@ -355,8 +357,9 @@ class AgentManager(cotyledon.Service):
|
||||
])
|
||||
groups.update(static_resource_groups)
|
||||
|
||||
for group in groups:
|
||||
self.partition_coordinator.join_group(group)
|
||||
self.hashrings = dict(
|
||||
(group, self.partition_coordinator.join_partitioned_group(group))
|
||||
for group in groups)
|
||||
|
||||
def create_polling_task(self):
|
||||
"""Create an initially empty polling task."""
|
||||
@ -492,17 +495,17 @@ class AgentManager(cotyledon.Service):
|
||||
continue
|
||||
|
||||
discovered = discoverer.discover(self, param)
|
||||
|
||||
if self.partition_coordinator:
|
||||
partitioned = (
|
||||
self.partition_coordinator.extract_my_subset(
|
||||
self.construct_group_id(discoverer.group_id),
|
||||
discovered)
|
||||
)
|
||||
else:
|
||||
partitioned = discovered
|
||||
resources.extend(partitioned)
|
||||
discovered = list(filter(
|
||||
self.hashrings[
|
||||
self.construct_group_id(discoverer.group_id)
|
||||
].belongs_to_self, discovered
|
||||
))
|
||||
|
||||
resources.extend(discovered)
|
||||
if discovery_cache is not None:
|
||||
discovery_cache[url] = partitioned
|
||||
discovery_cache[url] = discovered
|
||||
except ka_exceptions.ClientException as e:
|
||||
LOG.error('Skipping %(name)s, keystone issue: '
|
||||
'%(exc)s', {'name': name, 'exc': e})
|
||||
|
@ -1,141 +0,0 @@
|
||||
#
|
||||
# Copyright 2014-2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import tenacity
|
||||
import tooz.coordination
|
||||
from tooz import hashring
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('backend_url',
|
||||
help='The backend URL to use for distributed coordination. If '
|
||||
'left empty, per-deployment central agent and per-host '
|
||||
'compute agent won\'t do workload '
|
||||
'partitioning and will only function correctly if a '
|
||||
'single instance of that service is running.'),
|
||||
cfg.FloatOpt('check_watchers',
|
||||
default=10.0,
|
||||
help='Number of seconds between checks to see if group '
|
||||
'membership has changed'),
|
||||
cfg.IntOpt('retry_backoff',
|
||||
default=1,
|
||||
help='Retry backoff factor when retrying to connect with '
|
||||
'coordination backend'),
|
||||
cfg.IntOpt('max_retry_interval',
|
||||
default=30,
|
||||
help='Maximum number of seconds between retry to join '
|
||||
'partitioning group')
|
||||
]
|
||||
|
||||
|
||||
class PartitionCoordinator(object):
|
||||
"""Workload partitioning coordinator.
|
||||
|
||||
This class uses the `tooz` library to manage group membership.
|
||||
|
||||
Coordination errors and reconnects are handled under the hood, so the
|
||||
service using the partition coordinator need not care whether the
|
||||
coordination backend is down. The `extract_my_subset` will simply return an
|
||||
empty iterable in this case.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, my_id):
|
||||
self.conf = conf
|
||||
self._my_id = my_id
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
conf.coordination.backend_url, my_id)
|
||||
|
||||
def start(self):
|
||||
try:
|
||||
self._coordinator.start(start_heart=True)
|
||||
LOG.info('Coordination backend started successfully.')
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.exception('Error connecting to coordination backend.')
|
||||
|
||||
def stop(self):
|
||||
try:
|
||||
self._coordinator.stop()
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.exception('Error connecting to coordination backend.')
|
||||
finally:
|
||||
del self._coordinator
|
||||
|
||||
def watch_group(self, namespace, callback):
|
||||
self._coordinator.watch_join_group(namespace, callback)
|
||||
self._coordinator.watch_leave_group(namespace, callback)
|
||||
|
||||
def run_watchers(self):
|
||||
self._coordinator.run_watchers()
|
||||
|
||||
def join_group(self, group_id):
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(
|
||||
multiplier=self.conf.coordination.retry_backoff,
|
||||
max=self.conf.coordination.max_retry_interval),
|
||||
retry=tenacity.retry_never)
|
||||
def _inner():
|
||||
try:
|
||||
self._coordinator.join_group_create(group_id)
|
||||
except tooz.coordination.MemberAlreadyExist:
|
||||
pass
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.exception('Error joining partitioning group %s,'
|
||||
' re-trying', group_id)
|
||||
raise tenacity.TryAgain
|
||||
LOG.info('Joined partitioning group %s', group_id)
|
||||
|
||||
return _inner()
|
||||
|
||||
def _get_members(self, group_id):
|
||||
while True:
|
||||
get_members_req = self._coordinator.get_members(group_id)
|
||||
try:
|
||||
return get_members_req.get()
|
||||
except tooz.coordination.GroupNotCreated:
|
||||
self.join_group(group_id)
|
||||
|
||||
def extract_my_subset(self, group_id, iterable):
|
||||
"""Filters an iterable, returning only objects assigned to this agent.
|
||||
|
||||
We have a list of objects and get a list of active group members from
|
||||
`tooz`. We then hash all the objects into buckets and return only
|
||||
the ones that hashed into *our* bucket.
|
||||
"""
|
||||
try:
|
||||
members = self._get_members(group_id)
|
||||
hr = hashring.HashRing(members, partitions=100)
|
||||
iterable = list(iterable)
|
||||
filtered = [v for v in iterable
|
||||
if self._my_id in hr.get_nodes(self.encode_task(v))]
|
||||
LOG.debug('The universal set: %s, my subset: %s',
|
||||
[six.text_type(f) for f in iterable],
|
||||
[six.text_type(f) for f in filtered])
|
||||
return filtered
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.exception('Error getting group membership info from '
|
||||
'coordination backend.')
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def encode_task(value):
|
||||
"""encode to bytes"""
|
||||
return six.text_type(value).encode('utf-8')
|
@ -1,4 +1,5 @@
|
||||
#
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
# Copyright 2012-2013 eNovance <licensing@enovance.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -25,9 +26,10 @@ from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
import six
|
||||
from stevedore import extension
|
||||
from tooz import coordination
|
||||
|
||||
from ceilometer import coordination
|
||||
from ceilometer.event import endpoint as event_endpoint
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import messaging
|
||||
@ -112,9 +114,14 @@ class NotificationService(cotyledon.Service):
|
||||
super(NotificationService, self).__init__(worker_id)
|
||||
self.startup_delay = worker_id
|
||||
self.conf = conf
|
||||
# XXX uuid4().bytes ought to work, but it requires ascii for now
|
||||
self.coordination_id = (coordination_id or
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
if self.conf.notification.workload_partitioning:
|
||||
# XXX uuid4().bytes ought to work, but it requires ascii for now
|
||||
coordination_id = (coordination_id or
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
self.partition_coordinator = coordination.get_coordinator(
|
||||
self.conf.coordination.backend_url, coordination_id)
|
||||
else:
|
||||
self.partition_coordinator = None
|
||||
|
||||
@classmethod
|
||||
def _get_notifications_manager(cls, pm):
|
||||
@ -168,7 +175,6 @@ class NotificationService(cotyledon.Service):
|
||||
super(NotificationService, self).run()
|
||||
self.shutdown = False
|
||||
self.periodic = None
|
||||
self.partition_coordinator = None
|
||||
self.coord_lock = threading.Lock()
|
||||
|
||||
self.listeners = []
|
||||
@ -184,9 +190,6 @@ class NotificationService(cotyledon.Service):
|
||||
self.transport = messaging.get_transport(self.conf)
|
||||
|
||||
if self.conf.notification.workload_partitioning:
|
||||
self.group_id = self.NOTIFICATION_NAMESPACE
|
||||
self.partition_coordinator = coordination.PartitionCoordinator(
|
||||
self.conf, self.coordination_id)
|
||||
self.partition_coordinator.start()
|
||||
else:
|
||||
# FIXME(sileht): endpoint uses the notification_topics option
|
||||
@ -204,9 +207,8 @@ class NotificationService(cotyledon.Service):
|
||||
|
||||
if self.conf.notification.workload_partitioning:
|
||||
# join group after all manager set up is configured
|
||||
self.partition_coordinator.join_group(self.group_id)
|
||||
self.partition_coordinator.watch_group(self.group_id,
|
||||
self._refresh_agent)
|
||||
self.hashring = self.partition_coordinator.join_partitioned_group(
|
||||
self.NOTIFICATION_NAMESPACE)
|
||||
|
||||
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
|
||||
run_immediately=True)
|
||||
@ -219,7 +221,6 @@ class NotificationService(cotyledon.Service):
|
||||
self.periodic.add(run_watchers)
|
||||
|
||||
utils.spawn_thread(self.periodic.start)
|
||||
|
||||
# configure pipelines after all coordination is configured.
|
||||
with self.coord_lock:
|
||||
self._configure_pipeline_listener()
|
||||
@ -277,14 +278,13 @@ class NotificationService(cotyledon.Service):
|
||||
ev_pipes = self.event_pipeline_manager.pipelines
|
||||
pipelines = self.pipeline_manager.pipelines + ev_pipes
|
||||
transport = messaging.get_transport(self.conf)
|
||||
partitioned = six.moves.range(
|
||||
self.conf.notification.pipeline_processing_queues
|
||||
)
|
||||
|
||||
if self.partition_coordinator:
|
||||
partitioned = self.partition_coordinator.extract_my_subset(
|
||||
self.group_id,
|
||||
range(self.conf.notification.pipeline_processing_queues))
|
||||
else:
|
||||
partitioned = range(
|
||||
self.conf.notification.pipeline_processing_queues
|
||||
)
|
||||
partitioned = list(filter(
|
||||
self.hashring.belongs_to_self, partitioned))
|
||||
|
||||
endpoints = []
|
||||
targets = []
|
||||
|
@ -26,7 +26,6 @@ import ceilometer.compute.virt.inspector
|
||||
import ceilometer.compute.virt.libvirt.utils
|
||||
import ceilometer.compute.virt.vmware.inspector
|
||||
import ceilometer.compute.virt.xenapi.inspector
|
||||
import ceilometer.coordination
|
||||
import ceilometer.dispatcher
|
||||
import ceilometer.dispatcher.file
|
||||
import ceilometer.dispatcher.gnocchi_opts
|
||||
@ -98,7 +97,20 @@ def list_opts():
|
||||
ceilometer.api.controllers.v2.root.API_OPTS)),
|
||||
('collector', ceilometer.collector.OPTS),
|
||||
('compute', ceilometer.compute.discovery.OPTS),
|
||||
('coordination', ceilometer.coordination.OPTS),
|
||||
('coordination', [
|
||||
cfg.StrOpt(
|
||||
'backend_url',
|
||||
help='The backend URL to use for distributed coordination. If '
|
||||
'left empty, per-deployment central agent and per-host '
|
||||
'compute agent won\'t do workload '
|
||||
'partitioning and will only function correctly if a '
|
||||
'single instance of that service is running.'),
|
||||
cfg.FloatOpt(
|
||||
'check_watchers',
|
||||
default=10.0,
|
||||
help='Number of seconds between checks to see if group '
|
||||
'membership has changed'),
|
||||
]),
|
||||
('database', ceilometer.storage.OPTS),
|
||||
('dispatcher_file', ceilometer.dispatcher.file.OPTS),
|
||||
('dispatcher_http', ceilometer.dispatcher.http.http_dispatcher_opts),
|
||||
|
@ -297,6 +297,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||
fake_publisher_cls.return_value = self.publisher
|
||||
self._check_notification_service()
|
||||
|
||||
@mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock())
|
||||
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
|
||||
def test_notification_threads(self, m_listener):
|
||||
self.CONF.set_override('batch_size', 1, group='notification')
|
||||
@ -304,6 +305,7 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||
m_listener.assert_called_with(
|
||||
override_pool_size=self.CONF.max_parallel_requests)
|
||||
m_listener.reset_mock()
|
||||
self.srv.terminate()
|
||||
self.CONF.set_override('batch_size', 2, group='notification')
|
||||
self.srv.run()
|
||||
m_listener.assert_called_with(override_pool_size=1)
|
||||
@ -325,22 +327,32 @@ class TestRealNotificationHA(BaseRealNotification):
|
||||
|
||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||
def test_retain_common_targets_on_refresh(self, mock_listener):
|
||||
with mock.patch('ceilometer.coordination.PartitionCoordinator'
|
||||
'.extract_my_subset', return_value=[1, 2]):
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
maybe = {"maybe": 0}
|
||||
|
||||
def _once_over_five(item):
|
||||
maybe["maybe"] += 1
|
||||
return maybe["maybe"] % 5 == 0
|
||||
|
||||
hashring = mock.MagicMock()
|
||||
hashring.belongs_to_self = _once_over_five
|
||||
self.srv.partition_coordinator = pc = mock.MagicMock()
|
||||
pc.join_partitioned_group.return_value = hashring
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
listened_before = [target.topic for target in
|
||||
mock_listener.call_args[0][1]]
|
||||
self.assertEqual(4, len(listened_before))
|
||||
with mock.patch('ceilometer.coordination.PartitionCoordinator'
|
||||
'.extract_my_subset', return_value=[1, 3]):
|
||||
self.srv._refresh_agent(None)
|
||||
self.srv._refresh_agent(None)
|
||||
listened_after = [target.topic for target in
|
||||
mock_listener.call_args[0][1]]
|
||||
self.assertEqual(4, len(listened_after))
|
||||
common = set(listened_before) & set(listened_after)
|
||||
for topic in common:
|
||||
self.assertTrue(topic.endswith('1'))
|
||||
self.assertEqual(
|
||||
{'ceilometer-pipe-test_pipeline:test_sink-4',
|
||||
'ceilometer-pipe-event:test_event:test_sink-4',
|
||||
'ceilometer-pipe-event:test_event:test_sink-9',
|
||||
'ceilometer-pipe-test_pipeline:test_sink-9'},
|
||||
common)
|
||||
|
||||
@mock.patch('oslo_messaging.get_batch_notification_listener')
|
||||
def test_notify_to_relevant_endpoint(self, mock_listener):
|
||||
@ -464,14 +476,31 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
||||
def _check_notifications(self, fake_publisher_cls):
|
||||
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
|
||||
|
||||
self.srv = notification.NotificationService(0, self.CONF, 'harry')
|
||||
self.srv2 = notification.NotificationService(0, self.CONF, 'lloyd')
|
||||
with mock.patch('ceilometer.coordination.PartitionCoordinator'
|
||||
'._get_members', return_value=['harry', 'lloyd']):
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
self.srv2.run()
|
||||
self.addCleanup(self.srv2.terminate)
|
||||
maybe = {"srv": 0, "srv2": -1}
|
||||
|
||||
def _sometimes_srv(item):
|
||||
maybe["srv"] += 1
|
||||
return (maybe["srv"] % 2) == 0
|
||||
|
||||
self.srv = notification.NotificationService(0, self.CONF)
|
||||
self.srv.partition_coordinator = pc = mock.MagicMock()
|
||||
hashring = mock.MagicMock()
|
||||
hashring.belongs_to_self = _sometimes_srv
|
||||
pc.join_partitioned_group.return_value = hashring
|
||||
self.srv.run()
|
||||
self.addCleanup(self.srv.terminate)
|
||||
|
||||
def _sometimes_srv2(item):
|
||||
maybe["srv2"] += 1
|
||||
return (maybe["srv2"] % 2) == 0
|
||||
|
||||
self.srv2 = notification.NotificationService(0, self.CONF)
|
||||
self.srv2.partition_coordinator = pc = mock.MagicMock()
|
||||
hashring = mock.MagicMock()
|
||||
hashring.belongs_to_self = _sometimes_srv2
|
||||
pc.join_partitioned_group.return_value = hashring
|
||||
self.srv2.run()
|
||||
self.addCleanup(self.srv2.terminate)
|
||||
|
||||
notifier = messaging.get_notifier(self.transport,
|
||||
"compute.vagrant-precise")
|
||||
@ -484,7 +513,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
||||
self.expected_samples = 4
|
||||
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
|
||||
start = time.time()
|
||||
while time.time() - start < 60:
|
||||
while time.time() - start < 10:
|
||||
if (len(self.publisher.samples + self.publisher2.samples) >=
|
||||
self.expected_samples):
|
||||
break
|
||||
|
@ -32,7 +32,6 @@ from ceilometer import pipeline
|
||||
from ceilometer import sample
|
||||
from ceilometer import service
|
||||
from ceilometer.tests import base
|
||||
from ceilometer import utils
|
||||
|
||||
|
||||
class TestSample(sample.Sample):
|
||||
@ -248,16 +247,20 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(BaseAgentManagerTestCase, self).setUp()
|
||||
self.CONF = service.prepare_service([], [])
|
||||
self.CONF.set_override("backend_url", "zake://", "coordination")
|
||||
self.CONF.set_override(
|
||||
'cfg_file',
|
||||
self.path_get('etc/ceilometer/polling.yaml'), group='polling'
|
||||
)
|
||||
self.mgr = self.create_manager()
|
||||
self.mgr.extensions = self.create_extension_list()
|
||||
self.mgr.partition_coordinator = mock.MagicMock()
|
||||
fake_subset = lambda _, x: x
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
p_coord.extract_my_subset.side_effect = fake_subset
|
||||
|
||||
self.hashring = mock.MagicMock()
|
||||
self.hashring.belongs_to_self = mock.MagicMock()
|
||||
self.hashring.belongs_to_self.return_value = True
|
||||
|
||||
self.mgr.hashrings = mock.MagicMock()
|
||||
self.mgr.hashrings.__getitem__.return_value = self.hashring
|
||||
self.mgr.tg = mock.MagicMock()
|
||||
self.polling_cfg = {
|
||||
'sources': [{
|
||||
@ -291,27 +294,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
@mock.patch('ceilometer.pipeline.setup_polling')
|
||||
def test_start(self, setup_polling):
|
||||
self.mgr.setup_polling_tasks = mock.MagicMock()
|
||||
mpc = self.mgr.partition_coordinator
|
||||
self.mgr.run()
|
||||
setup_polling.assert_called_once_with(self.CONF)
|
||||
mpc.start.assert_called_once_with()
|
||||
self.assertEqual(2, mpc.join_group.call_count)
|
||||
self.mgr.setup_polling_tasks.assert_called_once_with()
|
||||
self.mgr.terminate()
|
||||
mpc.stop.assert_called_once_with()
|
||||
|
||||
def test_join_partitioning_groups(self):
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.mgr.join_partitioning_groups()
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
static_group_ids = [utils.hash_of_set(p['resources'])
|
||||
for p in self.polling_cfg['sources']
|
||||
if p['resources']]
|
||||
expected = [mock.call(self.mgr.construct_group_id(g))
|
||||
for g in ['another_group', 'global'] + static_group_ids]
|
||||
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
|
||||
for c in expected:
|
||||
self.assertIn(c, p_coord.join_group.call_args_list)
|
||||
|
||||
def test_setup_polling_tasks(self):
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
@ -578,7 +564,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
|
||||
def test_discovery_partitioning(self):
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
self.polling_cfg['sources'][0]['discovery'] = [
|
||||
'testdiscovery', 'testdiscoveryanother',
|
||||
'testdiscoverynonexistent', 'testdiscoveryexception']
|
||||
@ -586,17 +571,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
|
||||
d.obj.resources)
|
||||
for d in self.mgr.discoveries
|
||||
if hasattr(d.obj, 'resources')]
|
||||
self.assertEqual(len(expected),
|
||||
len(p_coord.extract_my_subset.call_args_list))
|
||||
for c in expected:
|
||||
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
|
||||
self.mgr.hashrings.__getitem__.assert_called_with(
|
||||
'central-compute-another_group')
|
||||
self.hashring.belongs_to_self.assert_not_called()
|
||||
|
||||
def test_static_resources_partitioning(self):
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
static_resources = ['static_1', 'static_2']
|
||||
static_resources2 = ['static_3', 'static_4']
|
||||
self.polling_cfg['sources'][0]['resources'] = static_resources
|
||||
@ -616,17 +595,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
# Only two groups need to be created, one for each polling,
|
||||
# even though counter test is used twice
|
||||
expected = [mock.call(self.mgr.construct_group_id(
|
||||
utils.hash_of_set(resources)),
|
||||
resources)
|
||||
for resources in [static_resources,
|
||||
static_resources2]]
|
||||
self.assertEqual(len(expected),
|
||||
len(p_coord.extract_my_subset.call_args_list))
|
||||
for c in expected:
|
||||
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
|
||||
self.hashring.belongs_to_self.assert_has_calls([
|
||||
mock.call('static_1'),
|
||||
mock.call('static_2'),
|
||||
mock.call('static_3'),
|
||||
mock.call('static_4'),
|
||||
], any_order=True)
|
||||
|
||||
@mock.patch('ceilometer.agent.manager.LOG')
|
||||
def test_polling_and_notify_with_resources(self, LOG):
|
||||
|
@ -1,228 +0,0 @@
|
||||
#
|
||||
# Copyright 2014-2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import mock
|
||||
import tooz.coordination
|
||||
from tooz import hashring
|
||||
|
||||
from ceilometer import coordination
|
||||
from ceilometer import service
|
||||
from ceilometer.tests import base
|
||||
|
||||
|
||||
class MockToozCoordinator(tooz.coordination.CoordinationDriver):
|
||||
def __init__(self, member_id, shared_storage):
|
||||
super(MockToozCoordinator, self).__init__(member_id)
|
||||
self._member_id = member_id
|
||||
self._shared_storage = shared_storage
|
||||
|
||||
def create_group(self, group_id):
|
||||
if group_id in self._shared_storage:
|
||||
return MockAsyncError(
|
||||
tooz.coordination.GroupAlreadyExist(group_id))
|
||||
self._shared_storage[group_id] = {}
|
||||
return MockAsyncResult(None)
|
||||
|
||||
def join_group(self, group_id, capabilities=b''):
|
||||
if group_id not in self._shared_storage:
|
||||
return MockAsyncError(
|
||||
tooz.coordination.GroupNotCreated(group_id))
|
||||
if self._member_id in self._shared_storage[group_id]:
|
||||
return MockAsyncError(
|
||||
tooz.coordination.MemberAlreadyExist(group_id,
|
||||
self._member_id))
|
||||
self._shared_storage[group_id][self._member_id] = {
|
||||
"capabilities": capabilities,
|
||||
}
|
||||
return MockAsyncResult(None)
|
||||
|
||||
def get_members(self, group_id):
|
||||
if group_id not in self._shared_storage:
|
||||
return MockAsyncError(
|
||||
tooz.coordination.GroupNotCreated(group_id))
|
||||
return MockAsyncResult(self._shared_storage[group_id])
|
||||
|
||||
|
||||
class MockToozCoordExceptionOnJoinRaiser(MockToozCoordinator):
|
||||
def __init__(self, member_id, shared_storage, retry_count=None):
|
||||
super(MockToozCoordExceptionOnJoinRaiser,
|
||||
self).__init__(member_id, shared_storage)
|
||||
self.tooz_error_count = retry_count
|
||||
self.count = 0
|
||||
|
||||
def join_group_create(self, group_id, capabilities=b''):
|
||||
if self.count == self.tooz_error_count:
|
||||
return super(
|
||||
MockToozCoordExceptionOnJoinRaiser, self).join_group_create(
|
||||
group_id, capabilities)
|
||||
else:
|
||||
self.count += 1
|
||||
raise tooz.coordination.ToozError('error')
|
||||
|
||||
|
||||
class MockAsyncResult(tooz.coordination.CoordAsyncResult):
|
||||
def __init__(self, result):
|
||||
self.result = result
|
||||
|
||||
def get(self, timeout=0):
|
||||
return self.result
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
||||
|
||||
|
||||
class MockAsyncError(tooz.coordination.CoordAsyncResult):
|
||||
def __init__(self, error):
|
||||
self.error = error
|
||||
|
||||
def get(self, timeout=0):
|
||||
raise self.error
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
||||
|
||||
|
||||
class MockLoggingHandler(logging.Handler):
|
||||
"""Mock logging handler to check for expected logs."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.reset()
|
||||
logging.Handler.__init__(self, *args, **kwargs)
|
||||
|
||||
def emit(self, record):
|
||||
self.messages[record.levelname.lower()].append(record.getMessage())
|
||||
|
||||
def reset(self):
|
||||
self.messages = {'debug': [],
|
||||
'info': [],
|
||||
'warning': [],
|
||||
'error': [],
|
||||
'critical': []}
|
||||
|
||||
|
||||
class TestPartitioning(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPartitioning, self).setUp()
|
||||
self.CONF = service.prepare_service([], [])
|
||||
self.str_handler = MockLoggingHandler()
|
||||
coordination.LOG.logger.addHandler(self.str_handler)
|
||||
self.shared_storage = {}
|
||||
|
||||
def _get_new_started_coordinator(self, shared_storage, agent_id=None,
|
||||
coordinator_cls=None, retry_count=None):
|
||||
coordinator_cls = coordinator_cls or MockToozCoordinator
|
||||
self.CONF.set_override('backend_url', 'xxx://yyy',
|
||||
group='coordination')
|
||||
with mock.patch('tooz.coordination.get_coordinator',
|
||||
lambda _, member_id:
|
||||
coordinator_cls(member_id, shared_storage,
|
||||
retry_count) if retry_count else
|
||||
coordinator_cls(member_id, shared_storage)):
|
||||
pc = coordination.PartitionCoordinator(self.CONF, agent_id)
|
||||
pc.start()
|
||||
self.addCleanup(pc.stop)
|
||||
return pc
|
||||
|
||||
def _usage_simulation(self, *agents_kwargs):
|
||||
partition_coordinators = []
|
||||
for kwargs in agents_kwargs:
|
||||
partition_coordinator = self._get_new_started_coordinator(
|
||||
self.shared_storage, kwargs['agent_id'], kwargs.get(
|
||||
'coordinator_cls'))
|
||||
partition_coordinator.join_group(kwargs['group_id'])
|
||||
partition_coordinators.append(partition_coordinator)
|
||||
|
||||
for i, kwargs in enumerate(agents_kwargs):
|
||||
all_resources = kwargs.get('all_resources', [])
|
||||
expected_resources = kwargs.get('expected_resources', [])
|
||||
actual_resources = partition_coordinators[i].extract_my_subset(
|
||||
kwargs['group_id'], all_resources)
|
||||
self.assertEqual(expected_resources, actual_resources)
|
||||
|
||||
def test_single_group(self):
|
||||
agents = [dict(agent_id='agent1', group_id='group'),
|
||||
dict(agent_id='agent2', group_id='group')]
|
||||
self._usage_simulation(*agents)
|
||||
|
||||
self.assertEqual(['group'], sorted(self.shared_storage.keys()))
|
||||
self.assertEqual(['agent1', 'agent2'],
|
||||
sorted(self.shared_storage['group'].keys()))
|
||||
|
||||
def test_multiple_groups(self):
|
||||
agents = [dict(agent_id='agent1', group_id='group1'),
|
||||
dict(agent_id='agent2', group_id='group2')]
|
||||
self._usage_simulation(*agents)
|
||||
|
||||
self.assertEqual(['group1', 'group2'],
|
||||
sorted(self.shared_storage.keys()))
|
||||
|
||||
def test_partitioning(self):
|
||||
all_resources = ['resource_%s' % i for i in range(1000)]
|
||||
agents = ['agent_%s' % i for i in range(10)]
|
||||
|
||||
expected_resources = [list() for _ in range(len(agents))]
|
||||
hr = hashring.HashRing(agents, partitions=100)
|
||||
for r in all_resources:
|
||||
encode = coordination.PartitionCoordinator.encode_task
|
||||
key = agents.index(list(hr.get_nodes(encode(r)))[0])
|
||||
expected_resources[key].append(r)
|
||||
|
||||
agents_kwargs = []
|
||||
for i, agent in enumerate(agents):
|
||||
agents_kwargs.append(dict(agent_id=agent,
|
||||
group_id='group',
|
||||
all_resources=all_resources,
|
||||
expected_resources=expected_resources[i]))
|
||||
self._usage_simulation(*agents_kwargs)
|
||||
|
||||
def test_coordination_backend_connection_fail_on_join(self):
|
||||
coord = self._get_new_started_coordinator(
|
||||
{'group': {}}, 'agent1', MockToozCoordExceptionOnJoinRaiser,
|
||||
retry_count=2)
|
||||
with mock.patch('tooz.coordination.get_coordinator',
|
||||
return_value=MockToozCoordExceptionOnJoinRaiser):
|
||||
coord.join_group(group_id='group')
|
||||
|
||||
expected_errors = ['Error joining partitioning group group,'
|
||||
' re-trying',
|
||||
'Error joining partitioning group group,'
|
||||
' re-trying']
|
||||
self.assertEqual(expected_errors, self.str_handler.messages['error'])
|
||||
|
||||
def test_partitioning_with_unicode(self):
|
||||
all_resources = [u'\u0634\u0628\u06a9\u0647',
|
||||
u'\u0627\u0647\u0644',
|
||||
u'\u0645\u062d\u0628\u0627\u0646']
|
||||
agents = ['agent_%s' % i for i in range(2)]
|
||||
|
||||
expected_resources = [list() for _ in range(len(agents))]
|
||||
hr = hashring.HashRing(agents, partitions=100)
|
||||
for r in all_resources:
|
||||
encode = coordination.PartitionCoordinator.encode_task
|
||||
key = agents.index(list(hr.get_nodes(encode(r)))[0])
|
||||
expected_resources[key].append(r)
|
||||
|
||||
agents_kwargs = []
|
||||
for i, agent in enumerate(agents):
|
||||
agents_kwargs.append(dict(agent_id=agent,
|
||||
group_id='group',
|
||||
all_resources=all_resources,
|
||||
expected_resources=expected_resources[i]))
|
||||
self._usage_simulation(*agents_kwargs)
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
Ceilometer now leverages the latest distribution mechanism provided by the
|
||||
tooz library. Therefore the options `coordination.retry_backoff` and
|
||||
`coordination.max_retry_interval` do not exist anymore.
|
Loading…
x
Reference in New Issue
Block a user