tempest: migrate api tests from tempest tree

This patch migrate all tests for alarming function in tempest tree
to this aodh tree.

This patch also fixes conflicts in parameter name.

Implements: blueprint tempest-plugin
Closes-Bug: 1549424
Change-Id: Ib548d2cbb005365ad4f97e56876e5bc2ad1c0aae
This commit is contained in:
Ryota MIBU 2015-12-09 15:50:34 +09:00
parent d9463371a3
commit 10bd3e6402
5 changed files with 207 additions and 18 deletions

View File

@ -12,8 +12,8 @@
from tempest.common.utils import data_utils
from tempest import config
from tempest.lib import exceptions as lib_exc
import tempest.test
from tempest_lib import exceptions as lib_exc
from aodh.tests.tempest.service import client
@ -29,7 +29,7 @@ class BaseAlarmingTest(tempest.test.BaseTestCase):
@classmethod
def skip_checks(cls):
super(BaseAlarmingTest, cls).skip_checks()
if not CONF.service_available.aodh:
if not CONF.service_available.aodh_plugin:
raise cls.skipException("Aodh support is required")
@classmethod

View File

@ -0,0 +1,111 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from aodh.tests.tempest.api import base
class TelemetryAlarmingAPITest(base.BaseAlarmingTest):
@classmethod
def resource_setup(cls):
super(TelemetryAlarmingAPITest, cls).resource_setup()
cls.rule = {'meter_name': 'cpu_util',
'comparison_operator': 'gt',
'threshold': 80.0,
'period': 70}
for i in range(2):
cls.create_alarm(threshold_rule=cls.rule)
@test.idempotent_id('1c918e06-210b-41eb-bd45-14676dd77cd7')
def test_alarm_list(self):
# List alarms
alarm_list = self.alarming_client.list_alarms()
# Verify created alarm in the list
fetched_ids = [a['alarm_id'] for a in alarm_list]
missing_alarms = [a for a in self.alarm_ids if a not in fetched_ids]
self.assertEqual(0, len(missing_alarms),
"Failed to find the following created alarm(s)"
" in a fetched list: %s" %
', '.join(str(a) for a in missing_alarms))
@test.idempotent_id('1297b095-39c1-4e74-8a1f-4ae998cedd68')
def test_create_update_get_delete_alarm(self):
# Create an alarm
alarm_name = data_utils.rand_name('telemetry_alarm')
body = self.alarming_client.create_alarm(
name=alarm_name, type='threshold', threshold_rule=self.rule)
self.assertEqual(alarm_name, body['name'])
alarm_id = body['alarm_id']
self.assertDictContainsSubset(self.rule, body['threshold_rule'])
# Update alarm with new rule and new name
new_rule = {'meter_name': 'cpu',
'comparison_operator': 'eq',
'threshold': 70.0,
'period': 60}
alarm_name_updated = data_utils.rand_name('telemetry-alarm-update')
body = self.alarming_client.update_alarm(
alarm_id,
threshold_rule=new_rule,
name=alarm_name_updated,
type='threshold')
self.assertEqual(alarm_name_updated, body['name'])
self.assertDictContainsSubset(new_rule, body['threshold_rule'])
# Get and verify details of an alarm after update
body = self.alarming_client.show_alarm(alarm_id)
self.assertEqual(alarm_name_updated, body['name'])
self.assertDictContainsSubset(new_rule, body['threshold_rule'])
# Get history for the alarm and verify the same
body = self.alarming_client.show_alarm_history(alarm_id)
self.assertEqual("rule change", body[0]['type'])
self.assertIn(alarm_name_updated, body[0]['detail'])
self.assertEqual("creation", body[1]['type'])
self.assertIn(alarm_name, body[1]['detail'])
# Delete alarm and verify if deleted
self.alarming_client.delete_alarm(alarm_id)
self.assertRaises(lib_exc.NotFound,
self.alarming_client.show_alarm, alarm_id)
@test.idempotent_id('aca49486-70bb-4016-87e0-f6131374f742')
def test_set_get_alarm_state(self):
alarm_states = ['ok', 'alarm', 'insufficient data']
alarm = self.create_alarm(threshold_rule=self.rule)
# Set alarm state and verify
new_state =\
[elem for elem in alarm_states if elem != alarm['state']][0]
state = self.alarming_client.alarm_set_state(alarm['alarm_id'],
new_state)
self.assertEqual(new_state, state.data)
# Get alarm state and verify
state = self.alarming_client.show_alarm_state(alarm['alarm_id'])
self.assertEqual(new_state, state.data)
@test.idempotent_id('08d7e45a-1344-4e5c-ba6f-f6cbb77f55ba')
def test_create_delete_alarm_with_combination_rule(self):
rule = {"alarm_ids": self.alarm_ids,
"operator": "or"}
# Verifies alarm create
alarm_name = data_utils.rand_name('combination_alarm')
body = self.alarming_client.create_alarm(name=alarm_name,
combination_rule=rule,
type='combination')
self.assertEqual(alarm_name, body['name'])
alarm_id = body['alarm_id']
self.assertDictContainsSubset(rule, body['combination_rule'])
# Verify alarm delete
self.alarming_client.delete_alarm(alarm_id)
self.assertRaises(lib_exc.NotFound,
self.alarming_client.show_alarm, alarm_id)

View File

@ -0,0 +1,72 @@
# Copyright 2015 GlobalLogic. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from tempest.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from aodh.tests.tempest.api import base
class TelemetryAlarmingNegativeTest(base.BaseAlarmingTest):
"""Negative tests for show_alarm, update_alarm, show_alarm_history tests
** show non-existent alarm
** show the deleted alarm
** delete deleted alarm
** update deleted alarm
"""
@test.attr(type=['negative'])
@test.idempotent_id('668743d5-08ad-4480-b2b8-15da34f81e7e')
def test_get_non_existent_alarm(self):
# get the non-existent alarm
non_existent_id = str(uuid.uuid4())
self.assertRaises(lib_exc.NotFound, self.alarming_client.show_alarm,
non_existent_id)
@test.attr(type=['negative'])
@test.idempotent_id('ef45000d-0a72-4781-866d-4cb7bf2582ae')
def test_get_update_show_history_delete_deleted_alarm(self):
# get, update and delete the deleted alarm
alarm_name = data_utils.rand_name('telemetry_alarm')
rule = {'meter_name': 'cpu',
'comparison_operator': 'eq',
'threshold': 100.0,
'period': 90}
body = self.alarming_client.create_alarm(
name=alarm_name,
type='threshold',
threshold_rule=rule)
alarm_id = body['alarm_id']
self.alarming_client.delete_alarm(alarm_id)
# get the deleted alarm
self.assertRaises(lib_exc.NotFound, self.alarming_client.show_alarm,
alarm_id)
# update the deleted alarm
updated_alarm_name = data_utils.rand_name('telemetry_alarm_updated')
updated_rule = {'meter_name': 'cpu_new',
'comparison_operator': 'eq',
'threshold': 70,
'period': 50}
self.assertRaises(lib_exc.NotFound, self.alarming_client.update_alarm,
alarm_id, threshold_rule=updated_rule,
name=updated_alarm_name,
type='threshold')
# delete the deleted alarm
self.assertRaises(lib_exc.NotFound, self.alarming_client.delete_alarm,
alarm_id)

View File

@ -20,12 +20,12 @@ service_available_group = cfg.OptGroup(name="service_available",
title="Available OpenStack Services")
ServiceAvailableGroup = [
cfg.BoolOpt("aodh",
cfg.BoolOpt("aodh_plugin",
default=True,
help="Whether or not Aodh is expected to be available"),
]
alarming_group = cfg.OptGroup(name='alarming',
alarming_group = cfg.OptGroup(name='alarming_plugin',
title='Alarming Service Options')
AlarmingGroup = [

View File

@ -104,18 +104,24 @@ class AlarmingClient(service_client.ServiceClient):
class Manager(manager.Manager):
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials, service)
self._set_alarming_client()
default_params = {
'disable_ssl_certificate_validation':
CONF.identity.disable_ssl_certificate_validation,
'ca_certs': CONF.identity.ca_certificates_file,
'trace_requests': CONF.debug.trace_requests
}
def _set_alarming_client(self):
if CONF.service_available.aodh:
self.alarming_client = AlarmingClient(
self.auth_provider,
CONF.alarming.catalog_type,
CONF.identity.region,
endpoint_type=CONF.alarming.endpoint_type,
disable_ssl_certificate_validation=(
CONF.identity.disable_ssl_certificate_validation),
ca_certs=CONF.identity.ca_certificates_file,
trace_requests=CONF.debug.trace_requests)
alarming_params = {
'service': CONF.alarming_plugin.catalog_type,
'region': CONF.identity.region,
'endpoint_type': CONF.alarming_plugin.endpoint_type,
}
alarming_params.update(default_params)
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials)
self.set_alarming_client()
def set_alarming_client(self):
self.alarming_client = AlarmingClient(self.auth_provider,
**self.alarming_params)