Adding alarm history to telemetry

Alarm history appears as a subresource of alarms.
Alarm history supports list only, with optional query filtering.

Change-Id: Iaad5b6c1451c48a450b86e280860903b4324d2f9
This commit is contained in:
Steve Lewis
2014-08-19 19:24:11 -07:00
parent cb59cfe7cc
commit 2a72304692
2 changed files with 126 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
from openstack.telemetry import telemetry_service
class AlarmChange(resource.Resource):
resource_key = 'alarm_change'
base_path = '/v2/alarms/%(alarm_id)s/history'
service = telemetry_service.TelemetryService()
# Supported Operations
allow_list = True
# Properties
alarm_id = resource.prop('alarm_id')
detail = resource.prop('detail')
event_id = resource.prop('event_id')
on_behalf_of = resource.prop('on_behalf_of')
project_id = resource.prop('project_id')
triggered_at = resource.prop('timestamp')
type = resource.prop('type')
user_id = resource.prop('user_id')
@property
def id(self):
try:
val = self.event_id
except AttributeError:
val = None
return val
@classmethod
def list(cls, session, path_args=None, **params):
url = cls.base_path % path_args
resp = session.get(url, service=cls.service, params=params)
changes = []
for item in resp.body:
changes.append(cls.existing(**item))
return changes

View File

@@ -0,0 +1,75 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.telemetry.v2 import alarm_change
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'alarm_id': 0,
'detail': '1',
'event_id': IDENTIFIER,
'on_behalf_of': '3',
'project_id': '4',
'timestamp': '5',
'type': '6',
'user_id': '7',
}
class TestAlarmChange(testtools.TestCase):
def test_basic(self):
sot = alarm_change.AlarmChange()
self.assertEqual('alarm_change', sot.resource_key)
self.assertIsNone(sot.resources_key)
self.assertEqual('/v2/alarms/%(alarm_id)s/history', sot.base_path)
self.assertEqual('metering', sot.service.service_type)
self.assertFalse(sot.allow_create)
self.assertFalse(sot.allow_retrieve)
self.assertFalse(sot.allow_update)
self.assertFalse(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = alarm_change.AlarmChange(EXAMPLE)
self.assertEqual(IDENTIFIER, sot.id)
self.assertEqual(EXAMPLE['alarm_id'], sot.alarm_id)
self.assertEqual(EXAMPLE['detail'], sot.detail)
self.assertEqual(IDENTIFIER, sot.event_id)
self.assertEqual(EXAMPLE['on_behalf_of'], sot.on_behalf_of)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['timestamp'], sot.triggered_at)
self.assertEqual(EXAMPLE['type'], sot.type)
self.assertEqual(EXAMPLE['user_id'], sot.user_id)
def test_list(self):
sess = mock.Mock()
resp = mock.Mock()
resp.body = [EXAMPLE, EXAMPLE]
sess.get = mock.Mock(return_value=resp)
path_args = {'alarm_id': IDENTIFIER}
found = alarm_change.AlarmChange.list(sess, path_args=path_args)
self.assertEqual(2, len(found))
first = found[0]
self.assertEqual(IDENTIFIER, first.id)
self.assertEqual(EXAMPLE['alarm_id'], first.alarm_id)
self.assertEqual(EXAMPLE['detail'], first.detail)
self.assertEqual(IDENTIFIER, first.event_id)
self.assertEqual(EXAMPLE['on_behalf_of'], first.on_behalf_of)
self.assertEqual(EXAMPLE['project_id'], first.project_id)
self.assertEqual(EXAMPLE['timestamp'], first.triggered_at)
self.assertEqual(EXAMPLE['type'], first.type)
self.assertEqual(EXAMPLE['user_id'], first.user_id)