From 3571a607f29fb4a7bc6494f2d6c56f632c80a103 Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Tue, 19 Aug 2014 06:20:48 -0400 Subject: [PATCH] Switch partitioned alarm evaluation to a hash-based approach Short version: make use of the new distributed workload partitioning utilities in Ceilometer to simplify the alarm evaluation partitioning. Code is intentionally non-consolidated to enable easy deletion of 'singleton' and 'partitioned' services in the Kilo cycle. Longer version: The assignment of alarms to individual partitioned alarm evaluators now follows the same pattern as the division of resources between scaled-out central agents. The evaluators each join a tooz group and emit a periodic heartbeat to tooz. Tooz provides distributed group membership information. Thus the set of evaluators share minimal knowledge, but this is sufficient to guide a hash-based approach to determining whether an individual alarm UUID falls under the responsibility of an individual evaluator. The current RPC-fanout-based presence reporting and the master/slave division of responsibilities can be dropped in the next cycle. Also the rebalancing logic when a certain threshold of alarm deletion is crossed will no longer be required. DocImpact Change-Id: Ica8dae569f9ff1c2f8fe58be6ae2def66be0da54 Implements: blueprint hash-based-alarm-partitioning --- ceilometer/alarm/rpc.py | 5 +- ceilometer/alarm/service.py | 47 +++++++- ceilometer/cmd/alarm.py | 8 +- ceilometer/tests/alarm/test_alarm_svc.py | 143 +++++++++++++++++++++++ ceilometer/tests/test_bin.py | 5 +- setup.cfg | 1 + 6 files changed, 203 insertions(+), 6 deletions(-) create mode 100644 ceilometer/tests/alarm/test_alarm_svc.py diff --git a/ceilometer/alarm/rpc.py b/ceilometer/alarm/rpc.py index 2f876b0c..e9419121 100644 --- a/ceilometer/alarm/rpc.py +++ b/ceilometer/alarm/rpc.py @@ -32,7 +32,10 @@ OPTS = [ cfg.StrOpt('partition_rpc_topic', default='alarm_partition_coordination', help='The topic that ceilometer uses for alarm partition ' - 'coordination messages.'), + 'coordination messages. DEPRECATED: RPC-based partitioned' + 'alarm evaluation service will be removed in Kilo in ' + 'favour of the default alarm evaluation service using ' + 'tooz for partitioning.'), ] cfg.CONF.register_opts(OPTS, group='alarm') diff --git a/ceilometer/alarm/service.py b/ceilometer/alarm/service.py index 614d03f8..c2687dfb 100644 --- a/ceilometer/alarm/service.py +++ b/ceilometer/alarm/service.py @@ -25,8 +25,9 @@ from oslo.utils import netutils import six from stevedore import extension -from ceilometer.alarm.partition import coordination +from ceilometer.alarm.partition import coordination as alarm_coordination from ceilometer.alarm import rpc as rpc_alarm +from ceilometer import coordination as coordination from ceilometer import messaging from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log @@ -48,6 +49,8 @@ cfg.CONF.import_opt('notifier_rpc_topic', 'ceilometer.alarm.rpc', group='alarm') cfg.CONF.import_opt('partition_rpc_topic', 'ceilometer.alarm.rpc', group='alarm') +cfg.CONF.import_opt('heartbeat', 'ceilometer.coordination', + group='coordination') LOG = log.getLogger(__name__) @@ -109,6 +112,46 @@ class AlarmService(object): pass +@six.add_metaclass(abc.ABCMeta) +class AlarmEvaluationService(AlarmService, os_service.Service): + + PARTITIONING_GROUP_NAME = "alarm_evaluator" + + def __init__(self): + super(AlarmEvaluationService, self).__init__() + self._load_evaluators() + self.api_client = None + self.partition_coordinator = coordination.PartitionCoordinator() + + def start(self): + super(AlarmEvaluationService, self).start() + self.partition_coordinator.start() + self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME) + + # allow time for coordination if necessary + delay_start = self.partition_coordinator.is_active() + + if self.evaluators: + interval = cfg.CONF.alarm.evaluation_interval + self.tg.add_timer( + interval, + self._evaluate_assigned_alarms, + initial_delay=interval if delay_start else None) + if self.partition_coordinator.is_active(): + heartbeat_interval = min(cfg.CONF.coordination.heartbeat, + cfg.CONF.alarm.evaluation_interval / 4) + self.tg.add_timer(heartbeat_interval, + self.partition_coordinator.heartbeat) + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + + def _assigned_alarms(self): + all_alarms = self._client.alarms.list(q=[{'field': 'enabled', + 'value': True}]) + return self.partition_coordinator.extract_my_subset( + self.PARTITIONING_GROUP_NAME, all_alarms) + + class SingletonAlarmService(AlarmService, os_service.Service): def __init__(self): @@ -142,7 +185,7 @@ class PartitionedAlarmService(AlarmService, os_service.Service): self._load_evaluators() self.api_client = None - self.partition_coordinator = coordination.PartitionCoordinator() + self.partition_coordinator = alarm_coordination.PartitionCoordinator() def start(self): super(PartitionedAlarmService, self).start() diff --git a/ceilometer/cmd/alarm.py b/ceilometer/cmd/alarm.py index 9ca337d0..0c0c9497 100644 --- a/ceilometer/cmd/alarm.py +++ b/ceilometer/cmd/alarm.py @@ -24,8 +24,12 @@ from ceilometer import service OPTS = [ - cfg.StrOpt('evaluation_service', default='singleton', - help='Driver to use for alarm evaluation service.'), + cfg.StrOpt('evaluation_service', default='default', + help='Driver to use for alarm evaluation service. DEPRECATED: ' + '"singleton" and "partitioned" alarm evaluator ' + 'services will be removed in Kilo in favour of the ' + 'default alarm evaluation service using tooz for ' + 'partitioning.'), ] cfg.CONF.register_opts(OPTS, group='alarm') diff --git a/ceilometer/tests/alarm/test_alarm_svc.py b/ceilometer/tests/alarm/test_alarm_svc.py new file mode 100644 index 00000000..acd7f069 --- /dev/null +++ b/ceilometer/tests/alarm/test_alarm_svc.py @@ -0,0 +1,143 @@ +# +# Copyright 2013 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer.alarm.service.SingletonAlarmService. +""" +import mock +from oslo.config import fixture as fixture_config +from stevedore import extension + +from ceilometer.alarm import service +from ceilometer.tests import base as tests_base + + +class TestAlarmEvaluationService(tests_base.BaseTestCase): + def setUp(self): + super(TestAlarmEvaluationService, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + self.setup_messaging(self.CONF) + + self.threshold_eval = mock.Mock() + self.evaluators = extension.ExtensionManager.make_test_instance( + [ + extension.Extension( + 'threshold', + None, + None, + self.threshold_eval), + ] + ) + self.api_client = mock.MagicMock() + self.svc = service.AlarmEvaluationService() + self.svc.tg = mock.Mock() + self.svc.partition_coordinator = mock.MagicMock() + p_coord = self.svc.partition_coordinator + p_coord.extract_my_subset.side_effect = lambda _, x: x + self.svc.evaluators = self.evaluators + self.svc.supported_evaluators = ['threshold'] + + def _do_test_start(self, test_interval=120, + coordination_heartbeat=1.0, + coordination_active=False): + self.CONF.set_override('evaluation_interval', + test_interval, + group='alarm') + self.CONF.set_override('heartbeat', + coordination_heartbeat, + group='coordination') + with mock.patch('ceilometerclient.client.get_client', + return_value=self.api_client): + p_coord_mock = self.svc.partition_coordinator + p_coord_mock.is_active.return_value = coordination_active + + self.svc.start() + self.svc.partition_coordinator.start.assert_called_once_with() + self.svc.partition_coordinator.join_group.assert_called_once_with( + self.svc.PARTITIONING_GROUP_NAME) + + initial_delay = test_interval if coordination_active else None + expected = [ + mock.call(test_interval, + self.svc._evaluate_assigned_alarms, + initial_delay=initial_delay), + mock.call(604800, mock.ANY), + ] + if coordination_active: + hb_interval = min(coordination_heartbeat, test_interval / 4) + hb_call = mock.call(hb_interval, + self.svc.partition_coordinator.heartbeat) + expected.insert(1, hb_call) + actual = self.svc.tg.add_timer.call_args_list + self.assertEqual(expected, actual) + + def test_start_singleton(self): + self._do_test_start(coordination_active=False) + + def test_start_coordinated(self): + self._do_test_start(coordination_active=True) + + def test_start_coordinated_high_hb_interval(self): + self._do_test_start(coordination_active=True, test_interval=10, + coordination_heartbeat=5) + + def test_evaluation_cycle(self): + alarm = mock.Mock(type='threshold') + self.api_client.alarms.list.return_value = [alarm] + with mock.patch('ceilometerclient.client.get_client', + return_value=self.api_client): + p_coord_mock = self.svc.partition_coordinator + p_coord_mock.extract_my_subset.return_value = [alarm] + + self.svc._evaluate_assigned_alarms() + + p_coord_mock.extract_my_subset.assert_called_once_with( + self.svc.PARTITIONING_GROUP_NAME, [alarm]) + self.threshold_eval.evaluate.assert_called_once_with(alarm) + + def test_unknown_extension_skipped(self): + alarms = [ + mock.Mock(type='not_existing_type'), + mock.Mock(type='threshold') + ] + + self.api_client.alarms.list.return_value = alarms + with mock.patch('ceilometerclient.client.get_client', + return_value=self.api_client): + self.svc.start() + self.svc._evaluate_assigned_alarms() + self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) + + def test_singleton_endpoint_types(self): + endpoint_types = ["internalURL", "publicURL"] + for endpoint_type in endpoint_types: + self.CONF.set_override('os_endpoint_type', + endpoint_type, + group='service_credentials') + with mock.patch('ceilometerclient.client.get_client') as client: + self.svc.api_client = None + self.svc._evaluate_assigned_alarms() + conf = self.CONF.service_credentials + expected = [mock.call(2, + os_auth_url=conf.os_auth_url, + os_region_name=conf.os_region_name, + os_tenant_name=conf.os_tenant_name, + os_password=conf.os_password, + os_username=conf.os_username, + os_cacert=conf.os_cacert, + os_endpoint_type=conf.os_endpoint_type, + insecure=conf.insecure)] + actual = client.call_args_list + self.assertEqual(expected, actual) diff --git a/ceilometer/tests/test_bin.py b/ceilometer/tests/test_bin.py index 584c8f3d..fe75cf6d 100644 --- a/ceilometer/tests/test_bin.py +++ b/ceilometer/tests/test_bin.py @@ -130,7 +130,10 @@ class BinAlarmEvaluatorServiceTestCase(base.BaseTestCase): os.remove(self.tempfile) def test_default_config(self): - self._do_test(None, "SingletonAlarmService") + self._do_test(None, "AlarmEvaluationService") + + def test_singleton_driver(self): + self._do_test('singleton', "SingletonAlarmService") def test_backward_compat(self): self._do_test("ceilometer.alarm.service.PartitionedAlarmService", diff --git a/setup.cfg b/setup.cfg index 0ac52674..c54bf494 100644 --- a/setup.cfg +++ b/setup.cfg @@ -225,6 +225,7 @@ ceilometer.alarm.evaluator = combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator ceilometer.alarm.evaluator_service = + default = ceilometer.alarm.service:AlarmEvaluationService singleton = ceilometer.alarm.service:SingletonAlarmService partitioned = ceilometer.alarm.service:PartitionedAlarmService # NOTE(sileht): for backward compatibility