support multiple-meter payloads
some payloads (ie. CADF) may contain multiple meters per payload. we need to support ability to loop through meters and process them individually. this patch adds support for gathering a list of meters with consistent schema. a multi attribute is added to denote which fields are jsonpath and could return multiple values. currently, this supports looping across multiple meters of the same resource only. Implements: blueprint declarative-notifications Change-Id: I94ff536e2563d4c740a8514f814316474dc58e87
This commit is contained in:
parent
5edd2b4825
commit
ab3812a0a2
@ -67,6 +67,18 @@ metric:
|
||||
project_id: payload.tenant_id
|
||||
resource_id: payload.snapshot_id
|
||||
|
||||
# Swift
|
||||
- name: payload.measurements.[*].metric.[*].name
|
||||
event_type: 'objectstore.http.request'
|
||||
type: 'delta'
|
||||
unit: payload.measurements.[*].metric.[*].unit
|
||||
volume: payload.measurements.[*].result
|
||||
resource_id: payload.target.id
|
||||
user_id: payload.initiator.id
|
||||
project_id: payload.initiator.project_id
|
||||
multi: ['name', 'unit', 'volume']
|
||||
|
||||
|
||||
# NOTE: non-metric meters are generally events/existence meters
|
||||
# These are expected to be DEPRECATED in future releases
|
||||
#
|
||||
|
@ -12,6 +12,7 @@
|
||||
# under the License.
|
||||
|
||||
import fnmatch
|
||||
import itertools
|
||||
import os
|
||||
import pkg_resources
|
||||
import six
|
||||
@ -68,7 +69,7 @@ class MeterDefinition(object):
|
||||
if fnmatch.fnmatch(meter_name, t):
|
||||
return True
|
||||
|
||||
def parse_fields(self, field, message):
|
||||
def parse_fields(self, field, message, all_values=False):
|
||||
fval = self.cfg.get(field)
|
||||
if not fval:
|
||||
return
|
||||
@ -84,7 +85,9 @@ class MeterDefinition(object):
|
||||
values = [match.value for match in parts.find(message)
|
||||
if match.value is not None]
|
||||
if values:
|
||||
return values[0]
|
||||
if not all_values:
|
||||
return values[0]
|
||||
return values
|
||||
|
||||
def _validate_type(self):
|
||||
if self.cfg['type'] not in sample.TYPES:
|
||||
@ -145,6 +148,10 @@ def load_definitions(config_def):
|
||||
for event_def in reversed(config_def['metric'])]
|
||||
|
||||
|
||||
class InvalidPayload(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ProcessMeterNotifications(plugin_base.NotificationBase):
|
||||
|
||||
event_types = []
|
||||
@ -182,21 +189,57 @@ class ProcessMeterNotifications(plugin_base.NotificationBase):
|
||||
for topic in conf.notification_topics)
|
||||
return targets
|
||||
|
||||
@staticmethod
|
||||
def _normalise_as_list(value, d, body, length):
|
||||
values = d.parse_fields(value, body, True)
|
||||
if not values:
|
||||
if value in d.cfg.get('multi'):
|
||||
LOG.warning('Could not find %s values', value)
|
||||
raise InvalidPayload
|
||||
values = [d.cfg[value]]
|
||||
elif value in d.cfg.get('multi') and length != len(values):
|
||||
LOG.warning('Not all fetched meters contain "%s" field', value)
|
||||
raise InvalidPayload
|
||||
return values
|
||||
|
||||
def process_notification(self, notification_body):
|
||||
for d in self.definitions:
|
||||
if d.match_type(notification_body['event_type']):
|
||||
userid = self.get_user_id(d, notification_body)
|
||||
projectid = self.get_project_id(d, notification_body)
|
||||
resourceid = d.parse_fields('resource_id', notification_body)
|
||||
yield sample.Sample.from_notification(
|
||||
name=d.cfg['name'],
|
||||
type=d.cfg['type'],
|
||||
unit=d.cfg['unit'],
|
||||
volume=d.parse_fields('volume', notification_body),
|
||||
resource_id=resourceid,
|
||||
user_id=userid,
|
||||
project_id=projectid,
|
||||
message=notification_body)
|
||||
if d.cfg.get('multi'):
|
||||
meters = d.parse_fields('name', notification_body, True)
|
||||
if not meters: # skip if no meters in payload
|
||||
break
|
||||
try:
|
||||
volumes = self._normalise_as_list(
|
||||
'volume', d, notification_body, len(meters))
|
||||
units = self._normalise_as_list(
|
||||
'unit', d, notification_body, len(meters))
|
||||
types = self._normalise_as_list(
|
||||
'type', d, notification_body, len(meters))
|
||||
except InvalidPayload:
|
||||
break
|
||||
for m, v, u, t in zip(meters, volumes,
|
||||
itertools.cycle(units),
|
||||
itertools.cycle(types)):
|
||||
yield sample.Sample.from_notification(
|
||||
name=m, type=t, unit=u, volume=v,
|
||||
resource_id=resourceid,
|
||||
user_id=userid,
|
||||
project_id=projectid,
|
||||
message=notification_body)
|
||||
else:
|
||||
yield sample.Sample.from_notification(
|
||||
name=d.cfg['name'],
|
||||
type=d.cfg['type'],
|
||||
unit=d.cfg['unit'],
|
||||
volume=d.parse_fields('volume', notification_body),
|
||||
resource_id=resourceid,
|
||||
user_id=userid,
|
||||
project_id=projectid,
|
||||
message=notification_body)
|
||||
|
||||
@staticmethod
|
||||
def get_user_id(d, notification_body):
|
||||
|
@ -60,22 +60,3 @@ class SwiftWsgiMiddleware(_Base, plugin_base.NonMetricNotificationBase):
|
||||
user_id=message['payload']['initiator']['id'],
|
||||
project_id=message['payload']['initiator']['project_id'],
|
||||
message=message)
|
||||
|
||||
|
||||
class SwiftWsgiMiddlewareMeters(_Base):
|
||||
|
||||
@property
|
||||
def event_types(self):
|
||||
return ['objectstore.http.request']
|
||||
|
||||
def process_notification(self, message):
|
||||
for meter in message['payload'].get('measurements', []):
|
||||
yield sample.Sample.from_notification(
|
||||
name=meter['metric']['name'],
|
||||
type=sample.TYPE_DELTA,
|
||||
unit=meter['metric']['unit'],
|
||||
volume=meter['result'],
|
||||
resource_id=message['payload']['target']['id'],
|
||||
user_id=message['payload']['initiator']['id'],
|
||||
project_id=message['payload']['initiator']['project_id'],
|
||||
message=message)
|
||||
|
@ -12,6 +12,7 @@
|
||||
# under the License.
|
||||
"""Tests for ceilometer.meter.notifications
|
||||
"""
|
||||
import copy
|
||||
import mock
|
||||
import six
|
||||
import yaml
|
||||
@ -42,6 +43,76 @@ NOTIFICATION = {
|
||||
'publisher_id': "foo123"
|
||||
}
|
||||
|
||||
MIDDLEWARE_EVENT = {
|
||||
u'_context_request_id': u'req-a8bfa89b-d28b-4b95-9e4b-7d7875275650',
|
||||
u'_context_quota_class': None,
|
||||
u'event_type': u'objectstore.http.request',
|
||||
u'_context_service_catalog': [],
|
||||
u'_context_auth_token': None,
|
||||
u'_context_user_id': None,
|
||||
u'priority': u'INFO',
|
||||
u'_context_is_admin': True,
|
||||
u'_context_user': None,
|
||||
u'publisher_id': u'ceilometermiddleware',
|
||||
u'message_id': u'6eccedba-120e-4db8-9735-2ad5f061e5ee',
|
||||
u'_context_remote_address': None,
|
||||
u'_context_roles': [],
|
||||
u'timestamp': u'2013-07-29 06:51:34.474815',
|
||||
u'_context_timestamp': u'2013-07-29T06:51:34.348091',
|
||||
u'_unique_id': u'0ee26117077648e18d88ac76e28a72e2',
|
||||
u'_context_project_name': None,
|
||||
u'_context_read_deleted': u'no',
|
||||
u'_context_tenant': None,
|
||||
u'_context_instance_lock_checked': False,
|
||||
u'_context_project_id': None,
|
||||
u'_context_user_name': None,
|
||||
u'payload': {
|
||||
'typeURI': 'http: //schemas.dmtf.org/cloud/audit/1.0/event',
|
||||
'eventTime': '2015-01-30T16: 38: 43.233621',
|
||||
'target': {
|
||||
'action': 'get',
|
||||
'typeURI': 'service/storage/object',
|
||||
'id': 'account',
|
||||
'metadata': {
|
||||
'path': '/1.0/CUSTOM_account/container/obj',
|
||||
'version': '1.0',
|
||||
'container': 'container',
|
||||
'object': 'obj'
|
||||
}
|
||||
},
|
||||
'observer': {
|
||||
'id': 'target'
|
||||
},
|
||||
'eventType': 'activity',
|
||||
'measurements': [
|
||||
{
|
||||
'metric': {
|
||||
'metricId': 'openstack: uuid',
|
||||
'name': 'storage.objects.outgoing.bytes',
|
||||
'unit': 'B'
|
||||
},
|
||||
'result': 28
|
||||
},
|
||||
{
|
||||
'metric': {
|
||||
'metricId': 'openstack: uuid2',
|
||||
'name': 'storage.objects.incoming.bytes',
|
||||
'unit': 'B'
|
||||
},
|
||||
'result': 1
|
||||
}
|
||||
],
|
||||
'initiator': {
|
||||
'typeURI': 'service/security/account/user',
|
||||
'project_id': None,
|
||||
'id': 'openstack: 288f6260-bf37-4737-a178-5038c84ba244'
|
||||
},
|
||||
'action': 'read',
|
||||
'outcome': 'success',
|
||||
'id': 'openstack: 69972bb6-14dd-46e4-bdaf-3148014363dc'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class TestMeterDefinition(test.BaseTestCase):
|
||||
|
||||
@ -171,3 +242,104 @@ class TestMeterProcessing(test.BaseTestCase):
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(NOTIFICATION))
|
||||
self.assertEqual(2, len(c))
|
||||
|
||||
def test_multi_meter_payload(self):
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="payload.measurements.[*].metric.[*].name",
|
||||
event_type="objectstore.http.request",
|
||||
type="delta",
|
||||
unit="payload.measurements.[*].metric.[*].unit",
|
||||
volume="payload.measurements.[*].result",
|
||||
resource_id="payload.target_id",
|
||||
project_id="payload.initiator.project_id",
|
||||
multi="name")]})
|
||||
self.handler.definitions = notifications.load_definitions(
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(MIDDLEWARE_EVENT))
|
||||
self.assertEqual(2, len(c))
|
||||
s1 = c[0].as_dict()
|
||||
self.assertEqual('storage.objects.outgoing.bytes', s1['name'])
|
||||
self.assertEqual(28, s1['volume'])
|
||||
self.assertEqual('B', s1['unit'])
|
||||
s2 = c[1].as_dict()
|
||||
self.assertEqual('storage.objects.incoming.bytes', s2['name'])
|
||||
self.assertEqual(1, s2['volume'])
|
||||
self.assertEqual('B', s2['unit'])
|
||||
|
||||
def test_multi_meter_payload_single(self):
|
||||
event = copy.deepcopy(MIDDLEWARE_EVENT)
|
||||
del event['payload']['measurements'][1]
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="payload.measurements.[*].metric.[*].name",
|
||||
event_type="objectstore.http.request",
|
||||
type="delta",
|
||||
unit="payload.measurements.[*].metric.[*].unit",
|
||||
volume="payload.measurements.[*].result",
|
||||
resource_id="payload.target_id",
|
||||
project_id="payload.initiator.project_id",
|
||||
multi="name")]})
|
||||
self.handler.definitions = notifications.load_definitions(
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(event))
|
||||
self.assertEqual(1, len(c))
|
||||
s1 = c[0].as_dict()
|
||||
self.assertEqual('storage.objects.outgoing.bytes', s1['name'])
|
||||
self.assertEqual(28, s1['volume'])
|
||||
self.assertEqual('B', s1['unit'])
|
||||
|
||||
def test_multi_meter_payload_none(self):
|
||||
event = copy.deepcopy(MIDDLEWARE_EVENT)
|
||||
del event['payload']['measurements']
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="payload.measurements.[*].metric.[*].name",
|
||||
event_type="objectstore.http.request",
|
||||
type="delta",
|
||||
unit="payload.measurements.[*].metric.[*].unit",
|
||||
volume="payload.measurements.[*].result",
|
||||
resource_id="payload.target_id",
|
||||
project_id="payload.initiator.project_id",
|
||||
multi="name")]})
|
||||
self.handler.definitions = notifications.load_definitions(
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(event))
|
||||
self.assertEqual(0, len(c))
|
||||
|
||||
@mock.patch('ceilometer.meter.notifications.LOG')
|
||||
def test_multi_meter_payload_invalid_missing(self, LOG):
|
||||
event = copy.deepcopy(MIDDLEWARE_EVENT)
|
||||
del event['payload']['measurements'][0]['result']
|
||||
del event['payload']['measurements'][1]['result']
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="payload.measurements.[*].metric.[*].name",
|
||||
event_type="objectstore.http.request",
|
||||
type="delta",
|
||||
unit="payload.measurements.[*].metric.[*].unit",
|
||||
volume="payload.measurements.[*].result",
|
||||
resource_id="payload.target_id",
|
||||
project_id="payload.initiator.project_id",
|
||||
multi=["name", "unit", "volume"])]})
|
||||
self.handler.definitions = notifications.load_definitions(
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(event))
|
||||
self.assertEqual(0, len(c))
|
||||
LOG.warning.assert_called_with('Could not find %s values', 'volume')
|
||||
|
||||
@mock.patch('ceilometer.meter.notifications.LOG')
|
||||
def test_multi_meter_payload_invalid_short(self, LOG):
|
||||
event = copy.deepcopy(MIDDLEWARE_EVENT)
|
||||
del event['payload']['measurements'][0]['result']
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="payload.measurements.[*].metric.[*].name",
|
||||
event_type="objectstore.http.request",
|
||||
type="delta",
|
||||
unit="payload.measurements.[*].metric.[*].unit",
|
||||
volume="payload.measurements.[*].result",
|
||||
resource_id="payload.target_id",
|
||||
project_id="payload.initiator.project_id",
|
||||
multi=["name", "unit", "volume"])]})
|
||||
self.handler.definitions = notifications.load_definitions(
|
||||
self.__setup_meter_def_file(cfg))
|
||||
c = list(self.handler.process_notification(event))
|
||||
self.assertEqual(0, len(c))
|
||||
LOG.warning.assert_called_with('Not all fetched meters contain "%s" '
|
||||
'field', 'volume')
|
||||
|
@ -11,7 +11,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for swift notification events."""
|
||||
import copy
|
||||
import mock
|
||||
|
||||
from ceilometer.objectstore import notifications
|
||||
@ -99,24 +98,3 @@ class TestMiddlewareNotifications(test.BaseTestCase):
|
||||
self.assertEqual(target['id'], samples[0].resource_id)
|
||||
self.assertEqual(initiator['id'], samples[0].user_id)
|
||||
self.assertEqual(initiator['project_id'], samples[0].project_id)
|
||||
|
||||
def test_middleware_event_meters(self):
|
||||
v = notifications.SwiftWsgiMiddlewareMeters(mock.Mock())
|
||||
samples = list(v.process_notification(MIDDLEWARE_EVENT))
|
||||
self.assertEqual(2, len(samples))
|
||||
target = MIDDLEWARE_EVENT['payload']['target']
|
||||
initiator = MIDDLEWARE_EVENT['payload']['initiator']
|
||||
for i in range(2):
|
||||
measure = MIDDLEWARE_EVENT['payload']['measurements'][i]
|
||||
self.assertEqual(measure['metric']['name'], samples[i].name)
|
||||
self.assertEqual(measure['metric']['unit'], samples[i].unit)
|
||||
self.assertEqual(measure['result'], samples[i].volume)
|
||||
self.assertEqual(target['id'], samples[i].resource_id)
|
||||
self.assertEqual(initiator['id'], samples[i].user_id)
|
||||
self.assertEqual(initiator['project_id'], samples[i].project_id)
|
||||
|
||||
def test_middleware_without_measurements(self):
|
||||
v = notifications.SwiftWsgiMiddlewareMeters(mock.Mock())
|
||||
event = copy.copy(MIDDLEWARE_EVENT)
|
||||
event['payload'].pop('measurements')
|
||||
self.assertEqual([], list(v.process_notification(event)))
|
||||
|
@ -70,7 +70,6 @@ ceilometer.notification =
|
||||
network.services.vpn.ikepolicy = ceilometer.network.notifications:IKEPolicy
|
||||
network.services.vpn.connections = ceilometer.network.notifications:IPSecSiteConnection
|
||||
objectstore.request = ceilometer.objectstore.notifications:SwiftWsgiMiddleware
|
||||
objectstore.request.meters = ceilometer.objectstore.notifications:SwiftWsgiMiddlewareMeters
|
||||
_sample = ceilometer.telemetry.notifications:TelemetryApiPost
|
||||
trove.instance.exists = ceilometer.database.notifications:InstanceExists
|
||||
dns.domain.exists = ceilometer.dns.notifications:DomainExists
|
||||
|
Loading…
Reference in New Issue
Block a user