# # Copyright 2013 Red Hat, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Tests for aodh.evaluator.AlarmEvaluationService. """ import fixtures import time from unittest import mock from observabilityclient import prometheus_client from oslo_config import fixture as fixture_config from stevedore import extension from aodh import evaluator from aodh import service from aodh.evaluator import prometheus from aodh.tests import base as tests_base class TestAlarmEvaluationService(tests_base.BaseTestCase): def setUp(self): super(TestAlarmEvaluationService, self).setUp() conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf self.CONF.set_override('workers', 1, 'evaluator') self.setup_messaging(self.CONF) self.threshold_eval = mock.MagicMock() self._fake_conn = mock.Mock() self._fake_conn.get_alarms.return_value = [] self._fake_pc = mock.Mock() self._fake_em = extension.ExtensionManager.make_test_instance( [ extension.Extension( 'gnocchi_aggregation_by_metrics_threshold', None, None, self.threshold_eval), ] ) self.useFixture(fixtures.MockPatch( 'stevedore.extension.ExtensionManager', return_value=self._fake_em )) self.useFixture(fixtures.MockPatch( 'aodh.coordination.PartitionCoordinator', return_value=self._fake_pc )) self.useFixture(fixtures.MockPatch( 'aodh.storage.get_connection_from_config', return_value=self._fake_conn )) def _do_test_start(self, test_interval=120, coordination_heartbeat_interval=1.0, coordination_active=False): self.CONF.set_override('evaluation_interval', test_interval, group='evaluator') self.CONF.set_override('heartbeat_interval', coordination_heartbeat_interval, group='coordination') self._fake_pc.is_active.return_value = coordination_active svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) svc.terminate() svc.partition_coordinator.start.assert_called_once_with() svc.partition_coordinator.join_group.assert_called_once_with( svc.PARTITIONING_GROUP_NAME) def test_start_singleton(self): self._do_test_start(coordination_active=False) def test_start_coordinated(self): self._do_test_start(coordination_active=True) def test_start_coordinated_high_hb_interval(self): self._do_test_start(coordination_active=True, test_interval=10, coordination_heartbeat_interval=5) def test_evaluation_cycle(self): alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', alarm_id="alarm_id1") self._fake_pc.extract_my_subset.return_value = ["alarm_id1"] self._fake_pc.is_active.side_effect = [False, False, True, True] self._fake_conn.get_alarms.return_value = [alarm] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) target = svc.partition_coordinator.extract_my_subset target.assert_called_once_with(svc.PARTITIONING_GROUP_NAME, ["alarm_id1"]) self.threshold_eval.evaluate.assert_called_once_with(alarm) def test_evaluation_cycle_with_bad_alarm(self): alarms = [ mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', name='bad', alarm_id='a'), mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', name='good', alarm_id='b'), ] self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None] self._fake_pc.is_active.side_effect = [False, False, True, True, True] self._fake_pc.extract_my_subset.return_value = ['a', 'b'] self._fake_conn.get_alarms.return_value = alarms svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])], self.threshold_eval.evaluate.call_args_list) def test_unknown_extension_skipped(self): alarms = [ mock.Mock(type='not_existing_type', alarm_id='a'), mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', alarm_id='b') ] self._fake_pc.is_active.return_value = False self._fake_pc.extract_my_subset.return_value = ['a', 'b'] self._fake_conn.get_alarms.return_value = alarms svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) def test_check_alarm_query_constraints(self): self._fake_conn.get_alarms.return_value = [] self._fake_pc.extract_my_subset.return_value = [] self._fake_pc.is_active.return_value = False svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) child = {'enabled': True, 'type': {'ne': 'event'}} self.assertDictContains(svc.storage_conn.get_alarms.call_args[1], child) def test_evaluation_cycle_no_coordination(self): alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', alarm_id="alarm_id1") self._fake_pc.is_active.return_value = False self._fake_conn.get_alarms.return_value = [alarm] self._fake_conn.conditional_update.return_value = True svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) target = svc.partition_coordinator.extract_my_subset self.assertEqual(0, target.call_count) self.threshold_eval.evaluate.assert_called_once_with(alarm) def test_evaluation_cycle_no_coordination_alarm_modified(self): alarm = mock.Mock(type='gnocchi_aggregation_by_metrics_threshold', alarm_id="alarm_id1") self._fake_pc.is_active.return_value = False self._fake_conn.get_alarms.return_value = [alarm] self._fake_conn.conditional_update.return_value = False svc = evaluator.AlarmEvaluationService(0, self.CONF) self.addCleanup(svc.terminate) time.sleep(1) target = svc.partition_coordinator.extract_my_subset self.assertEqual(0, target.call_count) self.assertEqual(0, self.threshold_eval.evaluate.call_count) class TestPrometheusEvaluator(tests_base.BaseTestCase): def setUp(self): super(TestPrometheusEvaluator, self).setUp() conf = service.prepare_service(argv=[], config_files=[]) self.CONF = self.useFixture(fixture_config.Config(conf)).conf def test_rule_evaluation(self): metric_list = [ prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (0, 10)}), prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (1, 15)}), prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (2, 20)}), prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (3, 25)}), prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (4, 30)}), prometheus_client.PrometheusMetric({'metric': 'mtr', 'value': (5, 15)}), ] with mock.patch.object(prometheus.PrometheusEvaluator, '_set_obsclient', return_value=None): # mock Prometheus client ev = prometheus.PrometheusEvaluator(self.CONF) ev._get_metric_data = mock.Mock(return_value=metric_list) # test transfer to alarm state state, trend, stats, outside, reason = ev.evaluate_rule( {'query': 'mtr', 'threshold': 9, 'comparison_operator': 'gt'}) self.assertEqual('alarm', state) self.assertEqual(6, outside) # test transfer to ok state state, trend, stats, outside, reason = ev.evaluate_rule( {'query': 'mtr', 'threshold': 31, 'comparison_operator': 'gt'}) self.assertEqual('ok', state) self.assertEqual(0, outside) # test trending to alarm state state, trend, stats, outside, reason = ev.evaluate_rule( {'query': 'mtr', 'threshold': 14, 'comparison_operator': 'gt'}) self.assertEqual('alarm', trend) self.assertEqual(5, outside) # test trending to ok state state, trend, stats, outside, reason = ev.evaluate_rule( {'query': 'mtr', 'threshold': 20, 'comparison_operator': 'gt'}) self.assertEqual('ok', trend) self.assertEqual(2, outside)