Add the Alarm resource to telemetry

Alarm supports full CRUD with query string filters on list.
Alarm state can be retrieved and modified through available methods.
Alarm History sub-resource implementation to be implemented separately.

Change-Id: Iaa8751c33939ecb40a60b08fd184c60888c4c148
This commit is contained in:
Steve Lewis
2014-08-13 17:21:16 -07:00
parent f39e056efc
commit b34ddef81f
2 changed files with 179 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
from openstack.telemetry import telemetry_service
from openstack import utils
class Alarm(resource.Resource):
base_path = '/v2/alarms'
service = telemetry_service.TelemetryService()
# Supported Operations
allow_create = True
allow_retrieve = True
allow_update = True
allow_delete = True
allow_list = True
# Properties
alarm_actions = resource.prop('alarm_actions')
alarm_id = resource.prop('alarm_id')
combination_rule = resource.prop('combination_rule')
description = resource.prop('description')
enabled = resource.prop('enabled', type=bool)
insufficient_data_actions = resource.prop('insufficient_data_actions')
name = resource.prop('name')
ok_actions = resource.prop('ok_actions')
project_id = resource.prop('project_id')
repeat_actions = resource.prop('repeat_actions', type=bool)
state = resource.prop('state')
state_changed_at = resource.prop('state_timestamp')
threshold_rule = resource.prop('threshold_rule')
time_constraints = resource.prop('time_constraints')
type = resource.prop('type')
updated_at = resource.prop('timestamp')
user_id = resource.prop('user_id')
@property
def id(self):
try:
val = self.alarm_id
except AttributeError:
val = None
return val
def __repr__(self):
return "alarm: %s" % self._attrs
def change_state(self, session, next_state):
"""Set the state of an alarm.
The next_state may be one of: 'ok' 'insufficient data' 'alarm'
"""
url = utils.urljoin(self.base_path, self.id, 'state')
resp = session.put(url, service=self.service, json=next_state).body
return resp
def check_state(self, session):
"""Retrieve the current state of an alarm from the service.
The properties of the alarm are not modified.
"""
url = utils.urljoin(self.base_path, self.id, 'state')
resp = session.get(url, service=self.service).body
current_state = resp.replace('\"', '')
return current_state

View File

@@ -0,0 +1,103 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.telemetry.v2 import alarm
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'alarm_actions': ['1'],
'alarm_id': IDENTIFIER,
'combination_rule': {'alarm_ids': ['2', 'b'], 'operator': 'or', },
'description': '3',
'enabled': True,
'insufficient_data_actions': ['4'],
'name': '5',
'ok_actions': ['6'],
'project_id': '7',
'repeat_actions': False,
'state': 'insufficient data',
'state_timestamp': '8',
'timestamp': '9',
'threshold_rule': {'meter_name': 'a',
'evaluation_periods:': '1',
'period': '60',
'statistic': 'avg',
'threshold': '92.6',
'comparison_operator': 'gt',
'exclude_outliers': True, },
'time_constraints': [{'name': 'a', 'duration': 'b', 'start': 'c', }],
'type': '10',
'user_id': '11',
}
class TestAlarm(testtools.TestCase):
def setUp(self):
super(TestAlarm, self).setUp()
self.resp = mock.Mock()
self.resp.body = ''
self.sess = mock.Mock()
self.sess.put = mock.MagicMock()
self.sess.put.return_value = self.resp
def test_basic(self):
sot = alarm.Alarm()
self.assertIsNone(sot.resource_key)
self.assertIsNone(sot.resources_key)
self.assertEqual('/v2/alarms', sot.base_path)
self.assertEqual('metering', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_retrieve)
self.assertTrue(sot.allow_update)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = alarm.Alarm(EXAMPLE)
self.assertEqual(IDENTIFIER, sot.id)
self.assertEqual(EXAMPLE['alarm_actions'], sot.alarm_actions)
self.assertEqual(IDENTIFIER, sot.alarm_id)
self.assertEqual(EXAMPLE['combination_rule'], sot.combination_rule)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertTrue(sot.enabled)
self.assertEqual(EXAMPLE['insufficient_data_actions'],
sot.insufficient_data_actions)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['ok_actions'], sot.ok_actions)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertFalse(sot.repeat_actions)
self.assertEqual(EXAMPLE['state'], sot.state)
self.assertEqual(EXAMPLE['state_timestamp'], sot.state_changed_at)
self.assertEqual(EXAMPLE['timestamp'], sot.updated_at)
self.assertEqual(EXAMPLE['threshold_rule'], sot.threshold_rule)
self.assertEqual(EXAMPLE['time_constraints'], sot.time_constraints)
self.assertEqual(EXAMPLE['type'], sot.type)
self.assertEqual(EXAMPLE['user_id'], sot.user_id)
def test_check_status(self):
sot = alarm.Alarm(EXAMPLE)
sot.check_state(self.sess)
url = 'v2/alarms/IDENTIFIER/state'
self.sess.get.assert_called_with(url, service=sot.service)
def test_change_status(self):
sot = alarm.Alarm(EXAMPLE)
self.assertEqual(self.resp.body, sot.change_state(self.sess, 'alarm'))
url = 'v2/alarms/IDENTIFIER/state'
self.sess.put.assert_called_with(url, service=sot.service,
json='alarm')