Switch partitioned alarm evaluation to a hash-based approach

Short version: make use of the new distributed workload partitioning
utilities in Ceilometer to simplify the alarm evaluation
partitioning. Code is intentionally non-consolidated to enable
easy deletion of 'singleton' and 'partitioned' services in the Kilo
cycle.

Longer version:
The assignment of alarms to individual partitioned alarm evaluators
now follows the same pattern as the division of resources between
scaled-out central agents.

The evaluators each join a tooz group and emit a periodic heartbeat
to tooz. Tooz provides distributed group membership information.

Thus the set of evaluators share minimal knowledge, but this is
sufficient to guide a hash-based approach to determining whether
an individual alarm UUID falls under the responsibility of an
individual evaluator.

The current RPC-fanout-based presence reporting and the master/slave
division of responsibilities can be dropped in the next cycle.
Also the rebalancing logic when a certain threshold of alarm
deletion is crossed will no longer be required.

DocImpact
Change-Id: Ica8dae569f9ff1c2f8fe58be6ae2def66be0da54
Implements: blueprint hash-based-alarm-partitioning
This commit is contained in:
Nejc Saje 2014-08-19 06:20:48 -04:00
parent 9a2f8618de
commit 3571a607f2
6 changed files with 203 additions and 6 deletions

View File

@ -32,7 +32,10 @@ OPTS = [
cfg.StrOpt('partition_rpc_topic',
default='alarm_partition_coordination',
help='The topic that ceilometer uses for alarm partition '
'coordination messages.'),
'coordination messages. DEPRECATED: RPC-based partitioned'
'alarm evaluation service will be removed in Kilo in '
'favour of the default alarm evaluation service using '
'tooz for partitioning.'),
]
cfg.CONF.register_opts(OPTS, group='alarm')

View File

@ -25,8 +25,9 @@ from oslo.utils import netutils
import six
from stevedore import extension
from ceilometer.alarm.partition import coordination
from ceilometer.alarm.partition import coordination as alarm_coordination
from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import coordination as coordination
from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
@ -48,6 +49,8 @@ cfg.CONF.import_opt('notifier_rpc_topic', 'ceilometer.alarm.rpc',
group='alarm')
cfg.CONF.import_opt('partition_rpc_topic', 'ceilometer.alarm.rpc',
group='alarm')
cfg.CONF.import_opt('heartbeat', 'ceilometer.coordination',
group='coordination')
LOG = log.getLogger(__name__)
@ -109,6 +112,46 @@ class AlarmService(object):
pass
@six.add_metaclass(abc.ABCMeta)
class AlarmEvaluationService(AlarmService, os_service.Service):
PARTITIONING_GROUP_NAME = "alarm_evaluator"
def __init__(self):
super(AlarmEvaluationService, self).__init__()
self._load_evaluators()
self.api_client = None
self.partition_coordinator = coordination.PartitionCoordinator()
def start(self):
super(AlarmEvaluationService, self).start()
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
if self.evaluators:
interval = cfg.CONF.alarm.evaluation_interval
self.tg.add_timer(
interval,
self._evaluate_assigned_alarms,
initial_delay=interval if delay_start else None)
if self.partition_coordinator.is_active():
heartbeat_interval = min(cfg.CONF.coordination.heartbeat,
cfg.CONF.alarm.evaluation_interval / 4)
self.tg.add_timer(heartbeat_interval,
self.partition_coordinator.heartbeat)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def _assigned_alarms(self):
all_alarms = self._client.alarms.list(q=[{'field': 'enabled',
'value': True}])
return self.partition_coordinator.extract_my_subset(
self.PARTITIONING_GROUP_NAME, all_alarms)
class SingletonAlarmService(AlarmService, os_service.Service):
def __init__(self):
@ -142,7 +185,7 @@ class PartitionedAlarmService(AlarmService, os_service.Service):
self._load_evaluators()
self.api_client = None
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator = alarm_coordination.PartitionCoordinator()
def start(self):
super(PartitionedAlarmService, self).start()

View File

@ -24,8 +24,12 @@ from ceilometer import service
OPTS = [
cfg.StrOpt('evaluation_service', default='singleton',
help='Driver to use for alarm evaluation service.'),
cfg.StrOpt('evaluation_service', default='default',
help='Driver to use for alarm evaluation service. DEPRECATED: '
'"singleton" and "partitioned" alarm evaluator '
'services will be removed in Kilo in favour of the '
'default alarm evaluation service using tooz for '
'partitioning.'),
]
cfg.CONF.register_opts(OPTS, group='alarm')

View File

@ -0,0 +1,143 @@
#
# Copyright 2013 Red Hat, Inc
#
# Author: Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer.alarm.service.SingletonAlarmService.
"""
import mock
from oslo.config import fixture as fixture_config
from stevedore import extension
from ceilometer.alarm import service
from ceilometer.tests import base as tests_base
class TestAlarmEvaluationService(tests_base.BaseTestCase):
def setUp(self):
super(TestAlarmEvaluationService, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.setup_messaging(self.CONF)
self.threshold_eval = mock.Mock()
self.evaluators = extension.ExtensionManager.make_test_instance(
[
extension.Extension(
'threshold',
None,
None,
self.threshold_eval),
]
)
self.api_client = mock.MagicMock()
self.svc = service.AlarmEvaluationService()
self.svc.tg = mock.Mock()
self.svc.partition_coordinator = mock.MagicMock()
p_coord = self.svc.partition_coordinator
p_coord.extract_my_subset.side_effect = lambda _, x: x
self.svc.evaluators = self.evaluators
self.svc.supported_evaluators = ['threshold']
def _do_test_start(self, test_interval=120,
coordination_heartbeat=1.0,
coordination_active=False):
self.CONF.set_override('evaluation_interval',
test_interval,
group='alarm')
self.CONF.set_override('heartbeat',
coordination_heartbeat,
group='coordination')
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
p_coord_mock = self.svc.partition_coordinator
p_coord_mock.is_active.return_value = coordination_active
self.svc.start()
self.svc.partition_coordinator.start.assert_called_once_with()
self.svc.partition_coordinator.join_group.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME)
initial_delay = test_interval if coordination_active else None
expected = [
mock.call(test_interval,
self.svc._evaluate_assigned_alarms,
initial_delay=initial_delay),
mock.call(604800, mock.ANY),
]
if coordination_active:
hb_interval = min(coordination_heartbeat, test_interval / 4)
hb_call = mock.call(hb_interval,
self.svc.partition_coordinator.heartbeat)
expected.insert(1, hb_call)
actual = self.svc.tg.add_timer.call_args_list
self.assertEqual(expected, actual)
def test_start_singleton(self):
self._do_test_start(coordination_active=False)
def test_start_coordinated(self):
self._do_test_start(coordination_active=True)
def test_start_coordinated_high_hb_interval(self):
self._do_test_start(coordination_active=True, test_interval=10,
coordination_heartbeat=5)
def test_evaluation_cycle(self):
alarm = mock.Mock(type='threshold')
self.api_client.alarms.list.return_value = [alarm]
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
p_coord_mock = self.svc.partition_coordinator
p_coord_mock.extract_my_subset.return_value = [alarm]
self.svc._evaluate_assigned_alarms()
p_coord_mock.extract_my_subset.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME, [alarm])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
def test_unknown_extension_skipped(self):
alarms = [
mock.Mock(type='not_existing_type'),
mock.Mock(type='threshold')
]
self.api_client.alarms.list.return_value = alarms
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.svc.start()
self.svc._evaluate_assigned_alarms()
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
def test_singleton_endpoint_types(self):
endpoint_types = ["internalURL", "publicURL"]
for endpoint_type in endpoint_types:
self.CONF.set_override('os_endpoint_type',
endpoint_type,
group='service_credentials')
with mock.patch('ceilometerclient.client.get_client') as client:
self.svc.api_client = None
self.svc._evaluate_assigned_alarms()
conf = self.CONF.service_credentials
expected = [mock.call(2,
os_auth_url=conf.os_auth_url,
os_region_name=conf.os_region_name,
os_tenant_name=conf.os_tenant_name,
os_password=conf.os_password,
os_username=conf.os_username,
os_cacert=conf.os_cacert,
os_endpoint_type=conf.os_endpoint_type,
insecure=conf.insecure)]
actual = client.call_args_list
self.assertEqual(expected, actual)

View File

@ -130,7 +130,10 @@ class BinAlarmEvaluatorServiceTestCase(base.BaseTestCase):
os.remove(self.tempfile)
def test_default_config(self):
self._do_test(None, "SingletonAlarmService")
self._do_test(None, "AlarmEvaluationService")
def test_singleton_driver(self):
self._do_test('singleton', "SingletonAlarmService")
def test_backward_compat(self):
self._do_test("ceilometer.alarm.service.PartitionedAlarmService",

View File

@ -225,6 +225,7 @@ ceilometer.alarm.evaluator =
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
ceilometer.alarm.evaluator_service =
default = ceilometer.alarm.service:AlarmEvaluationService
singleton = ceilometer.alarm.service:SingletonAlarmService
partitioned = ceilometer.alarm.service:PartitionedAlarmService
# NOTE(sileht): for backward compatibility