dongwenjuan 501c0644d3 Rewrite Aodh datasource
Implements: blueprint rewrite-aodh-datasource

Depends-On: I4b1140c20c68821eaf0849c9ee551ff9b1c27deb
Change-Id: Iaaa72dd5214f6d8168f4ba4560a6cec67d01d2a9
Signed-off-by: dongwenjuan <dong.wenjuan@zte.com.cn>
2017-11-08 13:37:48 +08:00

87 lines
3.3 KiB
Python

# Copyright 2016 Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_log import log as logging
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.aodh import AODH_DATASOURCE
from vitrage_tempest_tests.tests.api.alarms.base import BaseAlarmsTest
from vitrage_tempest_tests.tests.common import aodh_utils
from vitrage_tempest_tests.tests.common import nova_utils
from vitrage_tempest_tests.tests.common.tempest_clients import TempestClients
from vitrage_tempest_tests.tests import utils
LOG = logging.getLogger(__name__)
class TestAlarms(BaseAlarmsTest):
"""Alarms test class for Vitrage API tests."""
@classmethod
def setUpClass(cls):
super(TestAlarms, cls).setUpClass()
@utils.tempest_logger
def test_compare_cli_vs_api_alarms(self):
"""Wrapper that returns a test graph."""
try:
instances = nova_utils.create_instances(num_instances=1)
self.assertNotEqual(len(instances), 0,
'The instances list is empty')
aodh_utils.create_aodh_alarm(
resource_id=instances[0].id,
name='tempest_aodh_test')
api_alarms = TempestClients.vitrage().alarm.list(vitrage_id=None)
cli_alarms = utils.run_vitrage_command(
'vitrage alarm list', self.conf)
self._compare_alarms_lists(
api_alarms, cli_alarms, AODH_DATASOURCE,
utils.uni2str(instances[0].id))
except Exception as e:
self._handle_exception(e)
raise
finally:
aodh_utils.delete_all_ceilometer_alarms()
nova_utils.delete_all_instances()
def _compare_alarms_lists(self, api_alarms, cli_alarms,
resource_type, resource_id):
"""Validate alarm existence """
self.assertNotEqual(len(api_alarms), 0,
'The alarms list taken from api is empty')
self.assertIsNotNone(cli_alarms,
'The alarms list taken from cli is empty')
LOG.info("The alarms list taken from cli is : " +
str(cli_alarms))
LOG.info("The alarms list taken by api is : " +
str(json.dumps(api_alarms)))
cli_items = cli_alarms.splitlines()
api_by_type = self._filter_list_by_pairs_parameters(
api_alarms, [VProps.VITRAGE_TYPE], [resource_type])
cli_by_type = cli_alarms.count(' ' + resource_type + ' ')
api_by_id = self._filter_list_by_pairs_parameters(
api_alarms, ['resource_id'], [resource_id])
cli_by_id = cli_alarms.count(resource_id)
self.assertEqual(len(cli_items), len(api_alarms) + 4)
self.assertEqual(cli_by_type, len(api_by_type))
self.assertEqual(cli_by_id, len(api_by_id))