Enhance test_alarms

Use specific alarm definition id in list alarms
instead of deleting all the alarm definitions after every single test.
Improved test_list_alarms_by_lifecycle_state and test_list_alarms_by_link

Change-Id: I3e7064b505922fff6cee4ee1854aeaa75fb08287
This commit is contained in:
kaiyan-sheng 2015-11-12 19:13:00 -07:00
parent cedcec7432
commit 72dddf620a
2 changed files with 388 additions and 428 deletions

View File

@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import time
@ -88,36 +87,14 @@ def create_alarm_definition(name=None,
return alarm_definition
def create_alarms_for_test_alarms(cls, num):
for num in xrange(num):
# create an alarm definition
expression = "avg(name-1) > 0"
name = data_utils.rand_name('name-1')
alarm_definition = create_alarm_definition(
name=name, expression=expression)
resp, response_body = cls.monasca_client.create_alarm_definitions(
alarm_definition)
cls.assertEqual(201, resp.status)
# create some metrics
for i in xrange(180):
metric = create_metric(name='name-1')
cls.monasca_client.create_metrics(metric)
time.sleep(1)
resp, response_body = cls.monasca_client.list_alarms()
elements = response_body['elements']
if len(elements) >= num:
break
def delete_alarm_definitions(self):
def delete_alarm_definitions(monasca_client):
# Delete alarm definitions
resp, response_body = self.monasca_client.list_alarm_definitions()
resp, response_body = monasca_client.list_alarm_definitions()
elements = response_body['elements']
if elements:
for element in elements:
alarm_def_id = element['id']
self.monasca_client.delete_alarm_definition(alarm_def_id)
monasca_client.delete_alarm_definition(alarm_def_id)
def create_alarm_definitions_with_num(cls, expression):
@ -146,67 +123,6 @@ def create_alarm_definition_for_test_alarm_definition():
return alarm_definition
def create_metrics_for_test_alarms_match_by(cls, num, sub_expressions, list):
# list=True when match_by is a set
# sub_expressions=True when expression of the alarm has multiple
# sub expressions
# create some metrics
for i in xrange(180):
if list:
metric1 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'mini-mon',
'device': '/dev/sda1'})
metric2 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'devstack',
'device': '/dev/sda1'})
metric3 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'mini-mon',
'device': 'tmpfs'})
metric4 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'devstack',
'device': 'tmpfs'})
cls.monasca_client.create_metrics(metric1)
cls.monasca_client.create_metrics(metric2)
cls.monasca_client.create_metrics(metric3)
cls.monasca_client.create_metrics(metric4)
time.sleep(1)
resp, response_body = cls.monasca_client.list_alarms()
elements = response_body['elements']
if len(elements) >= num:
break
else:
metric1 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'mini-mon'})
metric2 = create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring', 'hostname': 'devstack'})
cls.monasca_client.create_metrics(metric1)
cls.monasca_client.create_metrics(metric2)
if sub_expressions:
metric3 = create_metric(
name='cpu.user_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon'})
metric4 = create_metric(
name='cpu.user_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack'})
cls.monasca_client.create_metrics(metric3)
cls.monasca_client.create_metrics(metric4)
else:
pass
time.sleep(1)
resp, response_body = cls.monasca_client.list_alarms()
elements = response_body['elements']
if len(elements) >= num:
break
def timestamp_to_iso(timestamp):
time_utc = datetime.datetime.utcfromtimestamp(timestamp / 1000.0)
time_iso_base = time_utc.strftime("%Y-%m-%dT%H:%M:%S")
@ -227,37 +143,3 @@ def timestamp_to_iso_millis(timestamp):
time_iso_new = time_iso_base + ':' + time_iso_second[0] + millisecond
time_iso_millisecond = time_iso_new + 'Z'
return time_iso_millisecond
def test_list_measurements_test_element(cls, element, test_key,
test_value):
cls.assertEqual(set(element),
set(['columns', 'dimensions', 'id', 'measurements',
'name']))
cls.assertEqual(set(element['columns']),
set(['timestamp', 'value', 'value_meta']))
cls.assertTrue(str(element['id']) is not None)
if test_key is not None and test_value is not None:
cls.assertEqual(str(element['dimensions'][test_key]), test_value)
def test_list_measurements_test_measurement(cls, measurement, test_metric,
test_vm_key, test_vm_value):
time_iso_millisecond = timestamp_to_iso_millis(test_metric['timestamp'])
cls.assertEqual(str(measurement[0]), time_iso_millisecond)
cls.assertEqual(measurement[1], test_metric['value'])
if test_vm_key is not None and test_vm_value is not None:
cls.assertEqual(str(measurement[2][test_vm_key]), test_vm_value)
def test_list_metrics_test_element(cls, element, test_key=None, test_value=None,
test_name=None):
cls.assertTrue(type(element['id']) is unicode)
cls.assertTrue(type(element['name']) is unicode)
cls.assertTrue(type(element['dimensions']) is dict)
cls.assertEqual(set(element), set(['dimensions', 'id', 'name']))
cls.assertTrue(str(element['id']) is not None)
if test_key is not None and test_value is not None:
cls.assertEqual(str(element['dimensions'][test_key]), test_value)
if test_name is not None:
cls.assertEqual(str(element['name']), test_name)

View File

@ -14,8 +14,10 @@
# TODO(RMH): Update documentation. Get alarms returns alarm_definition, not
# TODO(RMH): alarm_definition_id in response body
import time
from monasca_tempest_tests.tests.api import base
from monasca_tempest_tests.tests.api import constants
from monasca_tempest_tests.tests.api import helpers
from tempest.common.utils import data_utils
from tempest import test
@ -34,305 +36,246 @@ class TestAlarms(base.BaseMonascaTest):
@test.attr(type="gate")
def test_list_alarms(self):
helpers.create_alarms_for_test_alarms(self, num=1)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
resp, response_body = self.monasca_client.list_alarms()
self.assertEqual(200, resp.status)
self.assertTrue(set(['links', 'elements']) ==
set(response_body))
elements = response_body['elements']
if elements:
element = elements[0]
self.assertTrue(set(['id',
'links',
'alarm_definition',
'metrics',
'state',
'lifecycle_state',
'link',
'state_updated_timestamp',
'updated_timestamp',
'created_timestamp']) ==
set(element))
for metric in element['metrics']:
target_metric = helpers.create_metric()
self.assertEqual(target_metric['name'], metric['name'])
self.assertEqual(target_metric['dimensions'],
metric['dimensions'])
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms: at least one alarm is " \
"needed."
self.fail(error_msg)
for element in response_body['elements']:
self._verify_alarm_keys(element)
metric = element['metrics'][0]
if metric['name'] == expected_metric['name']:
self._verify_metric_in_alarm(metric, expected_metric)
return
self.fail("Failed test_list_alarms: cannot find the alarm just "
"created.")
@test.attr(type="gate")
def test_list_alarms_by_alarm_definition_id(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
element = elements[0]
alarm_definition_id = element['alarm_definition']['id']
query_parms = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_parms)
self.assertEqual(200, resp.status)
first_element = response_body['elements'][0]
self.assertEqual(first_element, element)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_alarm_definition_id: " \
"at least one alarm is needed."
self.self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
@test.attr(type="gate")
def test_list_alarms_by_metric_name(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
element = elements[0]
metric_name = element['metrics'][0]['name']
query_parms = '?metric_name=' + str(metric_name)
resp, response_body = self.monasca_client.list_alarms(query_parms)
self.assertEqual(200, resp.status)
first_element = response_body['elements'][0]
self.assertEqual(first_element, element)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_metric_name: at least " \
"one alarm is needed."
self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_parms = '?metric_name=' + expected_metric['name']
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_metric_dimensions(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
query_parms = '?metric_dimensions=key-2:value-2,key-1:value-1'
resp, response_body = self.monasca_client.\
list_alarms(query_parms)
self.assertEqual(200, resp.status)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_metric_dimensions: at " \
"least one alarm is needed."
self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
for key in expected_metric['dimensions']:
value = expected_metric['dimensions'][key]
query_parms = '?metric_dimensions=' + key + ':' + value
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_state(self):
helpers.create_alarms_for_test_alarms(self, num=3)
helpers.delete_alarm_definitions(self.monasca_client)
self._create_alarms_for_test_alarms(num=3)
resp, response_body = self.monasca_client.list_alarms()
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=3)
elements = response_body['elements']
number_of_alarms = len(elements)
if number_of_alarms < 3:
helpers.delete_alarm_definitions(self)
error_msg = ("Failed test_list_alarms_by_state: not the correct"
" number of alarms to test. 3 alarms are "
"needed. Current number of alarms = {}").\
format(number_of_alarms)
self.fail(error_msg)
else:
len0 = len(elements)
query_parms = '?state=UNDETERMINED'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
len1 = len(response_body1['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=OK'
resp, response_body2 = self.monasca_client.list_alarms(query_parms)
len2 = len(response_body2['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=ALARM'
resp, response_body3 = self.monasca_client.list_alarms(query_parms)
len3 = len(response_body3['elements'])
self.assertEqual(200, resp.status)
self.assertEqual(len0, len1 + len2 + len3)
helpers.delete_alarm_definitions(self)
len0 = len(elements)
query_parms = '?state=UNDETERMINED'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
len1 = len(response_body1['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=OK'
resp, response_body2 = self.monasca_client.list_alarms(query_parms)
len2 = len(response_body2['elements'])
self.assertEqual(200, resp.status)
query_parms = '?state=ALARM'
resp, response_body3 = self.monasca_client.list_alarms(query_parms)
len3 = len(response_body3['elements'])
self.assertEqual(200, resp.status)
self.assertEqual(len0, len1 + len2 + len3)
@test.attr(type="gate")
def test_list_alarms_by_lifecycle_state(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
query_parms = '?lifecycle_state=None'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self.assertEqual(200, resp.status)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_lifecycle_state: at " \
"least one alarm is needed."
self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
alarm_id = response_body['elements'][0]['id']
self.monasca_client.patch_alarm(id=alarm_id, lifecycle_state="OPEN")
query_parms = '?alarm_definition_id=' + str(
alarm_definition_ids[0]) + '&lifecycle_state=OPEN'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_link(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
query_parms = '?link=None'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self.assertEqual(200, resp.status)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_link: at least one " \
"alarm is needed."
self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
alarm_id = response_body['elements'][0]['id']
self.monasca_client.patch_alarm(
id=alarm_id, link="http://somesite.com/this-alarm-info")
query_parms = '?link=http://somesite.com/this-alarm-info'
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_state_updated_start_time(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
if elements:
resp, response_body = self.monasca_client.list_alarms()
elements = response_body['elements']
element = elements[0]
state_updated_start_time = element['state_updated_timestamp']
query_parms = '?state_updated_timestamp=' + \
str(state_updated_start_time)
resp, response_body = self.monasca_client.list_alarms(query_parms)
self.assertEqual(200, resp.status)
first_element = response_body['elements'][0]
self.assertEqual(element, first_element)
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed " \
"test_list_alarms_by_state_updated_start_time: at " \
"least one alarm is needed."
self.fail(error_msg)
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
element = response_body['elements'][0]
state_updated_start_time = element['state_updated_timestamp']
query_parms = '?alarm_definition_id=' + str(alarm_definition_ids[0])\
+ '&state_updated_timestamp=' + \
str(state_updated_start_time)
resp, response_body = self.monasca_client.list_alarms(query_parms)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
first_element = response_body['elements'][0]
self.assertEqual(element, first_element)
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
self.assertEqual(alarm_definition_ids[0],
element['alarm_definition']['id'])
@test.attr(type="gate")
def test_list_alarms_by_offset_limit(self):
helpers.create_alarms_for_test_alarms(self, num=2)
helpers.delete_alarm_definitions(self.monasca_client)
self._create_alarms_for_test_alarms(num=2)
resp, response_body = self.monasca_client.list_alarms()
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
number_of_alarms = len(elements)
if number_of_alarms < 2:
helpers.delete_alarm_definitions(self)
error_msg = ("Failed test_list_alarms_by_offset_limit: 2 alarms "
"are needed. Current number of alarms = {}").\
format(number_of_alarms)
self.fail(error_msg)
else:
first_element = elements[0]
next_element = elements[1]
id_first_element = first_element['id']
query_parms = '?offset=' + str(id_first_element) + '&limit=1'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
elements = response_body1['elements']
self.assertEqual(1, len(elements))
self.assertEqual(elements[0]['id'], next_element['id'])
self.assertEqual(elements[0], next_element)
helpers.delete_alarm_definitions(self)
first_element = elements[0]
next_element = elements[1]
id_first_element = first_element['id']
query_parms = '?offset=' + str(id_first_element) + '&limit=1'
resp, response_body1 = self.monasca_client.list_alarms(query_parms)
elements = response_body1['elements']
self.assertEqual(1, len(elements))
self.assertEqual(elements[0]['id'], next_element['id'])
self.assertEqual(elements[0], next_element)
@test.attr(type="gate")
def test_get_alarm(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
if elements:
element = elements[0]
id = element['id']
resp, response_body = self.monasca_client.get_alarm(id)
self.assertEqual(200, resp.status)
self.assertTrue(set(['id',
'links',
'alarm_definition',
'metrics',
'state',
'lifecycle_state',
'link',
'state_updated_timestamp',
'updated_timestamp',
'created_timestamp']) ==
set(response_body))
for metric in element['metrics']:
target_metric = helpers.create_metric()
self.assertEqual(target_metric['name'], metric['name'])
self.assertEqual(target_metric['dimensions'],
metric['dimensions'])
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_get_alarm: at least one alarm is needed."
self.fail(error_msg)
element = response_body['elements'][0]
alarm_id = element['id']
resp, response_body = self.monasca_client.get_alarm(alarm_id)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
metric = element['metrics'][0]
self._verify_metric_in_alarm(metric, expected_metric)
@test.attr(type="gate")
@test.attr(type=['negative'])
def test_get_alarm_with_invalid_id(self):
id = data_utils.rand_name()
alarm_id = data_utils.rand_name()
self.assertRaises(exceptions.NotFound, self.monasca_client.get_alarm,
id)
alarm_id)
@test.attr(type="gate")
def test_update_alarm(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
if elements:
element = elements[0]
id = element['id']
updated_state = "ALARM"
updated_lifecycle_state = "OPEN"
updated_link = "http://somesite.com"
resp, response_body = self.monasca_client.update_alarm(
id=id, state=updated_state,
lifecycle_state=updated_lifecycle_state, link=updated_link)
self.assertEqual(200, resp.status)
self.assertTrue(set(['id',
'links',
'alarm_definition',
'metrics',
'state',
'lifecycle_state',
'link',
'state_updated_timestamp',
'updated_timestamp',
'created_timestamp']) ==
set(response_body))
# Validate fields updated
self.assertEqual(updated_state, response_body['state'])
self.assertEqual(updated_lifecycle_state, response_body[
'lifecycle_state'])
self.assertEqual(updated_link, response_body[
'link'])
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_list_alarms_by_offset_limit: at least " \
"one alarm is needed."
self.fail(error_msg)
element = response_body['elements'][0]
alarm_id = element['id']
updated_state = "ALARM"
updated_lifecycle_state = "OPEN"
updated_link = "http://somesite.com"
resp, response_body = self.monasca_client.update_alarm(
id=alarm_id, state=updated_state,
lifecycle_state=updated_lifecycle_state, link=updated_link)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
# Validate fields updated
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
element = response_body['elements'][0]
self.assertEqual(updated_state, element['state'])
self.assertEqual(updated_lifecycle_state, element['lifecycle_state'])
self.assertEqual(updated_link, element['link'])
@test.attr(type="gate")
def test_patch_alarm(self):
helpers.create_alarms_for_test_alarms(self, num=1)
resp, response_body = self.monasca_client.list_alarms()
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
if elements:
id = elements[0]['id']
updated_state = "UNDETERMINED"
resp, response_body = self.monasca_client.patch_alarm(
id=id, state=updated_state)
self.assertEqual(200, resp.status)
self.assertTrue(set(['id',
'links',
'alarm_definition',
'metrics',
'state',
'lifecycle_state',
'link',
'state_updated_timestamp',
'updated_timestamp',
'created_timestamp']) ==
set(response_body))
# Validate the field patched
self.assertEqual(updated_state, response_body['state'])
helpers.delete_alarm_definitions(self)
else:
error_msg = "Failed test_patch_alarm: at least one alarm is " \
"needed."
self.fail(error_msg)
alarm_id = elements[0]['id']
patch_link = "http://somesite.com"
resp, response_body = self.monasca_client.patch_alarm(
id=alarm_id, link=patch_link)
self.assertEqual(200, resp.status)
self._verify_alarm_keys(response_body)
# Validate the field patched
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
self.assertEqual(patch_link, response_body['elements'][0]['link'])
@test.attr(type="gate")
def test_delete_alarm(self):
alarm_definition_ids, expected_metric \
= self._create_alarms_for_test_alarms(num=1)
query_param = '?alarm_definition_id=' + str(alarm_definition_ids[0])
resp, response_body = self.monasca_client.list_alarms(query_param)
self.assertEqual(200, resp.status)
elements = response_body['elements']
alarm_id = elements[0]['id']
resp, response_body = self.monasca_client.delete_alarm(alarm_id)
self.assertEqual(204, resp.status)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=0)
@test.attr(type="gate")
@test.attr(type=['negative'])
@ -348,97 +291,232 @@ class TestAlarms(base.BaseMonascaTest):
expression = "avg(cpu.idle_perc{service=monitoring}) < 20"
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression)
self.monasca_client.create_alarm_definitions(alarm_definition)
helpers.create_metrics_for_test_alarms_match_by(self, num=2,
sub_expressions=False,
list=False)
resp, response_body = self.monasca_client.list_alarms()
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
self._create_metrics_for_match_by(
num=1, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=1)
elements = response_body['elements']
metrics = elements[0]['metrics']
self.assertEqual(len(metrics), 2)
self.assertNotEqual(metrics[0], metrics[1])
helpers.delete_alarm_definitions(self)
# Create an alarm definition with match_by
name = data_utils.rand_name('alarm_definition_1')
name = data_utils.rand_name('alarm_definition_2')
expression = "avg(cpu.idle_perc{service=monitoring}) < 20"
match_by = ['hostname']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
self.monasca_client.create_alarm_definitions(alarm_definition)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
# create some metrics
helpers.create_metrics_for_test_alarms_match_by(self, num=2,
sub_expressions=False,
list=False)
resp, response_body = self.monasca_client.list_alarms()
self._create_metrics_for_match_by(
num=2, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
self.assertEqual(len(elements), 2)
self.assertEqual(len(elements[0]['metrics']), 1)
self.assertEqual(len(elements[1]['metrics']), 1)
self.assertNotEqual(elements[0]['metrics'], elements[1]['metrics'])
helpers.delete_alarm_definitions(self)
@test.attr(type="gate")
def test_create_alarms_with_sub_expressions_and_match_by(self):
helpers.delete_alarm_definitions(self)
# Create an alarm definition with sub-expressions and match_by
name = data_utils.rand_name('alarm_definition_1')
name = data_utils.rand_name('alarm_definition_3')
expression = "avg(cpu.idle_perc{service=monitoring}) < 10 or " \
"avg(cpu.user_perc{service=monitoring}) > 60"
match_by = ['hostname']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
self.monasca_client.create_alarm_definitions(alarm_definition)
helpers.create_metrics_for_test_alarms_match_by(self, num=2,
sub_expressions=True,
list=False)
resp, response_body = self.monasca_client.list_alarms()
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
self._create_metrics_for_match_by_sub_expressions(
num=2, alarm_definition_id=alarm_definition_id)
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=2)
elements = response_body['elements']
self.assertEqual(len(elements), 2)
self.assertEqual(len(elements[0]['metrics']), 2)
self.assertEqual(len(elements[1]['metrics']), 2)
hostname_1 = elements[0]['metrics'][0]['dimensions']['hostname']
hostname_2 = elements[0]['metrics'][1]['dimensions']['hostname']
hostname_3 = elements[1]['metrics'][0]['dimensions']['hostname']
hostname_4 = elements[1]['metrics'][1]['dimensions']['hostname']
self.assertEqual(hostname_1, hostname_2)
self.assertEqual(hostname_3, hostname_4)
self.assertNotEqual(hostname_1, hostname_3)
helpers.delete_alarm_definitions(self)
hostnames = []
for i in xrange(2):
self.assertEqual(len(elements[i]['metrics']), 2)
for i in xrange(2):
for j in xrange(2):
hostnames.append(elements[i]['metrics'][j]['dimensions'][
'hostname'])
self.assertEqual(hostnames[0], hostnames[1])
self.assertEqual(hostnames[2], hostnames[3])
self.assertNotEqual(hostnames[0], hostnames[2])
@test.attr(type="gate")
def test_create_alarms_with_match_by_list(self):
helpers.delete_alarm_definitions(self)
# Create an alarm definition with match_by as a list
name = data_utils.rand_name('alarm_definition_1')
name = data_utils.rand_name('alarm_definition')
expression = "avg(cpu.idle_perc{service=monitoring}) < 10"
match_by = ['hostname', 'device']
alarm_definition = helpers.create_alarm_definition(
name=name, description="description", expression=expression,
match_by=match_by)
self.monasca_client.create_alarm_definitions(alarm_definition)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_id = response_body['id']
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
# create some metrics
helpers.create_metrics_for_test_alarms_match_by(self, num=4,
sub_expressions=True,
list=True)
resp, response_body = self.monasca_client.list_alarms()
self._create_metrics_for_match_by_sub_expressions_list(
num=4, alarm_definition_id=alarm_definition_id)
resp, response_body = self.monasca_client.list_alarms(query_param)
self._verify_list_alarms_elements(resp, response_body,
expect_num_elements=4)
elements = response_body['elements']
self.assertEqual(len(elements), 4)
self.assertEqual(len(elements[0]['metrics']), 1)
self.assertEqual(len(elements[1]['metrics']), 1)
self.assertEqual(len(elements[2]['metrics']), 1)
self.assertEqual(len(elements[3]['metrics']), 1)
dimensions_1 = elements[0]['metrics'][0]['dimensions']
dimensions_2 = elements[1]['metrics'][0]['dimensions']
dimensions_3 = elements[2]['metrics'][0]['dimensions']
dimensions_4 = elements[3]['metrics'][0]['dimensions']
self.assertNotEqual(dimensions_1, dimensions_2)
self.assertNotEqual(dimensions_1, dimensions_3)
self.assertNotEqual(dimensions_1, dimensions_4)
self.assertNotEqual(dimensions_2, dimensions_3)
self.assertNotEqual(dimensions_2, dimensions_4)
self.assertNotEqual(dimensions_3, dimensions_4)
helpers.delete_alarm_definitions(self)
dimensions = []
for i in xrange(4):
self.assertEqual(len(elements[i]['metrics']), 1)
dimensions.append(elements[i]['metrics'][0]['dimensions'])
for i in xrange(4):
for j in xrange(4):
if i != j:
self.assertNotEqual(dimensions[i], dimensions[j])
def _verify_list_alarms_elements(self, resp, response_body,
expect_num_elements):
self.assertEqual(200, resp.status)
self.assertTrue(set(['links', 'elements']) ==
set(response_body))
error_msg = ("Failed: {} alarm is needed and current number "
"of alarm is {}").format(expect_num_elements,
len(response_body['elements']))
self.assertEqual(len(response_body['elements']),
expect_num_elements, error_msg)
def _create_alarms_for_test_alarms(self, num):
metric_name = data_utils.rand_name('name')
key = data_utils.rand_name('key')
value = data_utils.rand_name('value')
alarm_definition_ids = []
for i in xrange(num):
# create an alarm definition
expression = "avg(" + metric_name + ") > 0"
name = data_utils.rand_name('name-1')
alarm_definition = helpers.create_alarm_definition(
name=name, expression=expression)
resp, response_body = self.monasca_client.create_alarm_definitions(
alarm_definition)
alarm_definition_ids.append(response_body['id'])
expected_metric = helpers.create_metric(name=metric_name,
dimensions={key: value})
# create some metrics
for j in xrange(num):
for i in xrange(constants.MAX_RETRIES):
self.monasca_client.create_metrics(expected_metric)
time.sleep(constants.RETRY_WAIT_SECS)
query_param = '?alarm_definition_id=' + \
str(alarm_definition_ids[j])
resp, response_body = self.monasca_client.list_alarms(
query_param)
elements = response_body['elements']
if len(elements) >= 1:
break
return alarm_definition_ids, expected_metric
def _create_metrics_for_match_by(self, num, alarm_definition_id):
metric1 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon'})
metric2 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack'})
self.monasca_client.create_metrics(metric1)
self.monasca_client.create_metrics(metric2)
self._waiting_for_alarms(num, alarm_definition_id)
def _create_metrics_for_match_by_sub_expressions(self, num,
alarm_definition_id):
metric1 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon'})
metric2 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack'})
self.monasca_client.create_metrics(metric1)
self.monasca_client.create_metrics(metric2)
metric3 = helpers.create_metric(
name='cpu.user_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon'})
metric4 = helpers.create_metric(
name='cpu.user_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack'})
self.monasca_client.create_metrics(metric3)
self.monasca_client.create_metrics(metric4)
self._waiting_for_alarms(num, alarm_definition_id)
def _create_metrics_for_match_by_sub_expressions_list(self, num,
alarm_definition_id):
# create some metrics
metric1 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon',
'device': '/dev/sda1'})
metric2 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack',
'device': '/dev/sda1'})
metric3 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'mini-mon',
'device': 'tmpfs'})
metric4 = helpers.create_metric(
name='cpu.idle_perc',
dimensions={'service': 'monitoring',
'hostname': 'devstack',
'device': 'tmpfs'})
self.monasca_client.create_metrics(metric1)
self.monasca_client.create_metrics(metric2)
self.monasca_client.create_metrics(metric3)
self.monasca_client.create_metrics(metric4)
self._waiting_for_alarms(num, alarm_definition_id)
def _waiting_for_alarms(self, num, alarm_definition_id):
query_param = '?alarm_definition_id=' + str(alarm_definition_id)
for i in xrange(constants.MAX_RETRIES):
time.sleep(constants.RETRY_WAIT_SECS)
resp, response_body = self.monasca_client.\
list_alarms(query_param)
elements = response_body['elements']
if len(elements) >= num:
break
def _verify_alarm_keys(self, response_body):
self.assertTrue(set(['id',
'links',
'alarm_definition',
'metrics',
'state',
'lifecycle_state',
'link',
'state_updated_timestamp',
'updated_timestamp',
'created_timestamp']) ==
set(response_body))
def _verify_metric_in_alarm(self, metric, expected_metric):
self.assertEqual(metric['dimensions'], expected_metric['dimensions'])
self.assertEqual(metric['name'], expected_metric['name'])