add support to parse user metadata
leverage the same user metadata parsing the pollster use. it basically grabs all entries in target payload with specified namespace. this is needed for nova free polling Change-Id: I676a1c2f16cc4259072b51df62a1c5e342caeb6f
This commit is contained in:
parent
6f1482f088
commit
dcd69a9e6a
@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ceilometer.compute import util as compute_util
|
||||
from ceilometer import sample
|
||||
|
||||
|
||||
@ -72,8 +71,8 @@ def _get_metadata_from_object(conf, instance):
|
||||
metadata['root_gb'] = (int(metadata['disk_gb']) -
|
||||
int(metadata['ephemeral_gb']))
|
||||
|
||||
return compute_util.add_reserved_user_metadata(conf, instance.metadata,
|
||||
metadata)
|
||||
return sample.add_reserved_user_metadata(conf, instance.metadata,
|
||||
metadata)
|
||||
|
||||
|
||||
def make_sample_from_instance(conf, instance, name, type, unit, volume,
|
||||
|
@ -1,64 +0,0 @@
|
||||
#
|
||||
# Copyright 2014 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
|
||||
# Below config is for collecting metadata which user defined in nova or else,
|
||||
# and then storing it to Sample for future use according to user's requirement.
|
||||
# Such as using it as OpenTSDB tags for metrics.
|
||||
OPTS = [
|
||||
cfg.ListOpt('reserved_metadata_namespace',
|
||||
default=['metering.'],
|
||||
help='List of metadata prefixes reserved for metering use.'),
|
||||
cfg.IntOpt('reserved_metadata_length',
|
||||
default=256,
|
||||
help='Limit on length of reserved metadata values.'),
|
||||
cfg.ListOpt('reserved_metadata_keys',
|
||||
default=[],
|
||||
help='List of metadata keys reserved for metering use. And '
|
||||
'these keys are additional to the ones included in the '
|
||||
'namespace.'),
|
||||
]
|
||||
|
||||
|
||||
def add_reserved_user_metadata(conf, src_metadata, dest_metadata):
|
||||
limit = conf.reserved_metadata_length
|
||||
user_metadata = {}
|
||||
for prefix in conf.reserved_metadata_namespace:
|
||||
md = dict(
|
||||
(k[len(prefix):].replace('.', '_'),
|
||||
v[:limit] if isinstance(v, six.string_types) else v)
|
||||
for k, v in src_metadata.items()
|
||||
if (k.startswith(prefix) and
|
||||
k[len(prefix):].replace('.', '_') not in dest_metadata)
|
||||
)
|
||||
user_metadata.update(md)
|
||||
|
||||
for metadata_key in conf.reserved_metadata_keys:
|
||||
md = dict(
|
||||
(k.replace('.', '_'),
|
||||
v[:limit] if isinstance(v, six.string_types) else v)
|
||||
for k, v in src_metadata.items()
|
||||
if (k == metadata_key and
|
||||
k.replace('.', '_') not in dest_metadata)
|
||||
)
|
||||
user_metadata.update(md)
|
||||
|
||||
if user_metadata:
|
||||
dest_metadata['user_metadata'] = user_metadata
|
||||
|
||||
return dest_metadata
|
@ -88,6 +88,7 @@ metric:
|
||||
user_id: $.payload.user_id
|
||||
project_id: $.payload.tenant_id
|
||||
resource_id: $.payload.instance_id
|
||||
user_metadata: $.payload.metadata
|
||||
|
||||
- name: 'vcpus'
|
||||
event_type: 'compute.instance.*'
|
||||
@ -97,6 +98,7 @@ metric:
|
||||
user_id: $.payload.user_id
|
||||
project_id: $.payload.tenant_id
|
||||
resource_id: $.payload.instance_id
|
||||
user_metadata: $.payload.metadata
|
||||
|
||||
- name: 'compute.instance.booting.time'
|
||||
event_type: 'compute.instance.create.end'
|
||||
@ -107,6 +109,7 @@ metric:
|
||||
plugin: 'timedelta'
|
||||
project_id: $.payload.tenant_id
|
||||
resource_id: $.payload.instance_id
|
||||
user_metadata: $.payload.metadata
|
||||
|
||||
- name: 'disk.root.size'
|
||||
event_type: 'compute.instance.*'
|
||||
@ -116,6 +119,7 @@ metric:
|
||||
user_id: $.payload.user_id
|
||||
project_id: $.payload.tenant_id
|
||||
resource_id: $.payload.instance_id
|
||||
user_metadata: $.payload.metadata
|
||||
|
||||
- name: 'disk.ephemeral.size'
|
||||
event_type: 'compute.instance.*'
|
||||
@ -125,6 +129,7 @@ metric:
|
||||
user_id: $.payload.user_id
|
||||
project_id: $.payload.tenant_id
|
||||
resource_id: $.payload.instance_id
|
||||
user_metadata: $.payload.metadata
|
||||
|
||||
- name: 'bandwidth'
|
||||
event_type: 'l3.meter'
|
||||
|
@ -24,7 +24,7 @@ from stevedore import extension
|
||||
from ceilometer.agent import plugin_base
|
||||
from ceilometer import declarative
|
||||
from ceilometer.i18n import _LE, _LW
|
||||
from ceilometer import sample
|
||||
from ceilometer import sample as sample_util
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('meter_definitions_cfg_file',
|
||||
@ -44,7 +44,8 @@ class MeterDefinition(object):
|
||||
REQUIRED_FIELDS = ['name', 'type', 'event_type', 'unit', 'volume',
|
||||
'resource_id']
|
||||
|
||||
def __init__(self, definition_cfg, plugin_manager):
|
||||
def __init__(self, definition_cfg, conf, plugin_manager):
|
||||
self.conf = conf
|
||||
self.cfg = definition_cfg
|
||||
missing = [field for field in self.REQUIRED_FIELDS
|
||||
if not self.cfg.get(field)]
|
||||
@ -57,7 +58,7 @@ class MeterDefinition(object):
|
||||
self._event_type = [self._event_type]
|
||||
|
||||
if ('type' not in self.cfg.get('lookup', []) and
|
||||
self.cfg['type'] not in sample.TYPES):
|
||||
self.cfg['type'] not in sample_util.TYPES):
|
||||
raise declarative.MeterDefinitionException(
|
||||
_LE("Invalid type %s specified") % self.cfg['type'], self.cfg)
|
||||
|
||||
@ -67,6 +68,7 @@ class MeterDefinition(object):
|
||||
'project_id', "_context_tenant_id|_context_tenant", plugin_manager)
|
||||
self._attributes = {}
|
||||
self._metadata_attributes = {}
|
||||
self._user_meta = None
|
||||
|
||||
for name in self.SAMPLE_ATTRIBUTES:
|
||||
attr_cfg = self.cfg.get(name)
|
||||
@ -77,6 +79,10 @@ class MeterDefinition(object):
|
||||
for name in metadata:
|
||||
self._metadata_attributes[name] = declarative.Definition(
|
||||
name, metadata[name], plugin_manager)
|
||||
user_meta = self.cfg.get('user_metadata')
|
||||
if user_meta:
|
||||
self._user_meta = declarative.Definition(None, user_meta,
|
||||
plugin_manager)
|
||||
|
||||
# List of fields we expected when multiple meter are in the payload
|
||||
self.lookup = self.cfg.get('lookup')
|
||||
@ -102,6 +108,12 @@ class MeterDefinition(object):
|
||||
if value:
|
||||
sample['metadata'][name] = value
|
||||
|
||||
if self._user_meta:
|
||||
meta = self._user_meta.parse(message)
|
||||
if meta:
|
||||
sample_util.add_reserved_user_metadata(
|
||||
self.conf, meta, sample['metadata'])
|
||||
|
||||
# NOTE(sileht): We expect multiple samples in the payload
|
||||
# so put each attribute into a list
|
||||
if self.lookup:
|
||||
@ -178,7 +190,8 @@ class ProcessMeterNotifications(plugin_base.NotificationBase):
|
||||
% meter_cfg)
|
||||
continue
|
||||
try:
|
||||
md = MeterDefinition(meter_cfg, plugin_manager)
|
||||
md = MeterDefinition(meter_cfg, self.manager.conf,
|
||||
plugin_manager)
|
||||
except declarative.DefinitionException as e:
|
||||
errmsg = _LE("Error loading meter definition: %s")
|
||||
LOG.error(errmsg, six.text_type(e))
|
||||
@ -221,4 +234,4 @@ class ProcessMeterNotifications(plugin_base.NotificationBase):
|
||||
for d in self.definitions:
|
||||
if d.match_type(notification_body['event_type']):
|
||||
for s in d.to_samples(notification_body):
|
||||
yield sample.Sample.from_notification(**s)
|
||||
yield sample_util.Sample.from_notification(**s)
|
||||
|
@ -22,7 +22,6 @@ import ceilometer.api.app
|
||||
import ceilometer.api.controllers.v2.root
|
||||
import ceilometer.collector
|
||||
import ceilometer.compute.discovery
|
||||
import ceilometer.compute.util
|
||||
import ceilometer.compute.virt.inspector
|
||||
import ceilometer.compute.virt.libvirt.inspector
|
||||
import ceilometer.compute.virt.vmware.inspector
|
||||
@ -79,7 +78,6 @@ def list_opts():
|
||||
('DEFAULT',
|
||||
itertools.chain(ceilometer.agent.manager.OPTS,
|
||||
ceilometer.api.app.OPTS,
|
||||
ceilometer.compute.util.OPTS,
|
||||
ceilometer.compute.virt.inspector.OPTS,
|
||||
ceilometer.compute.virt.libvirt.inspector.OPTS,
|
||||
ceilometer.dispatcher.OPTS,
|
||||
|
@ -25,14 +25,55 @@ import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('sample_source',
|
||||
default='openstack',
|
||||
help='Source for samples emitted on this instance.'),
|
||||
cfg.ListOpt('reserved_metadata_namespace',
|
||||
default=['metering.'],
|
||||
help='List of metadata prefixes reserved for metering use.'),
|
||||
cfg.IntOpt('reserved_metadata_length',
|
||||
default=256,
|
||||
help='Limit on length of reserved metadata values.'),
|
||||
cfg.ListOpt('reserved_metadata_keys',
|
||||
default=[],
|
||||
help='List of metadata keys reserved for metering use. And '
|
||||
'these keys are additional to the ones included in the '
|
||||
'namespace.'),
|
||||
]
|
||||
|
||||
|
||||
def add_reserved_user_metadata(conf, src_metadata, dest_metadata):
|
||||
limit = conf.reserved_metadata_length
|
||||
user_metadata = {}
|
||||
for prefix in conf.reserved_metadata_namespace:
|
||||
md = dict(
|
||||
(k[len(prefix):].replace('.', '_'),
|
||||
v[:limit] if isinstance(v, six.string_types) else v)
|
||||
for k, v in src_metadata.items()
|
||||
if (k.startswith(prefix) and
|
||||
k[len(prefix):].replace('.', '_') not in dest_metadata)
|
||||
)
|
||||
user_metadata.update(md)
|
||||
|
||||
for metadata_key in conf.reserved_metadata_keys:
|
||||
md = dict(
|
||||
(k.replace('.', '_'),
|
||||
v[:limit] if isinstance(v, six.string_types) else v)
|
||||
for k, v in src_metadata.items()
|
||||
if (k == metadata_key and
|
||||
k.replace('.', '_') not in dest_metadata)
|
||||
)
|
||||
user_metadata.update(md)
|
||||
|
||||
if user_metadata:
|
||||
dest_metadata['user_metadata'] = user_metadata
|
||||
|
||||
return dest_metadata
|
||||
|
||||
|
||||
# Fields explanation:
|
||||
#
|
||||
# Source: the source of this sample
|
||||
|
@ -49,6 +49,28 @@ NOTIFICATION = {
|
||||
'publisher_id': "foo123"
|
||||
}
|
||||
|
||||
USER_META = {
|
||||
'event_type': u'test.create',
|
||||
'timestamp': u'2015-06-1909: 19: 35.786893',
|
||||
'payload': {u'user_id': u'e1d870e51c7340cb9d555b15cbfcaec2',
|
||||
u'resource_id': u'bea70e51c7340cb9d555b15cbfcaec23',
|
||||
u'timestamp': u'2015-06-19T09:19:35.785330',
|
||||
u'created_at': u'2015-06-19T09:25:35.785330',
|
||||
u'launched_at': u'2015-06-19T09:25:40.785330',
|
||||
u'message_signature': u'fake_signature1',
|
||||
u'resource_metadata': {u'foo': u'bar'},
|
||||
u'source': u'30be1fc9a03c4e94ab05c403a8a377f2: openstack',
|
||||
u'volume': 1.0,
|
||||
u'project_id': u'30be1fc9a03c4e94ab05c403a8a377f2',
|
||||
u'metadata': {u'metering.xyz': u'abc', u'ignore': u'this'},
|
||||
},
|
||||
u'_context_tenant': u'30be1fc9a03c4e94ab05c403a8a377f2',
|
||||
u'_context_request_id': u'req-da91b4bf-d2b5-43ae-8b66-c7752e72726d',
|
||||
u'_context_user': u'e1d870e51c7340cb9d555b15cbfcaec2',
|
||||
'message_id': u'939823de-c242-45a2-a399-083f4d6a8c3e',
|
||||
'publisher_id': "foo123"
|
||||
}
|
||||
|
||||
MIDDLEWARE_EVENT = {
|
||||
u'_context_request_id': u'req-a8bfa89b-d28b-4b95-9e4b-7d7875275650',
|
||||
u'_context_quota_class': None,
|
||||
@ -228,7 +250,7 @@ class TestMeterDefinition(test.BaseTestCase):
|
||||
volume="$.payload.volume",
|
||||
resource_id="$.payload.resource_id",
|
||||
project_id="$.payload.project_id")
|
||||
handler = notifications.MeterDefinition(cfg, mock.Mock())
|
||||
handler = notifications.MeterDefinition(cfg, mock.Mock(), mock.Mock())
|
||||
self.assertTrue(handler.match_type("test.create"))
|
||||
sample = list(handler.to_samples(NOTIFICATION))[0]
|
||||
self.assertEqual(1.0, sample["volume"])
|
||||
@ -240,7 +262,7 @@ class TestMeterDefinition(test.BaseTestCase):
|
||||
def test_config_required_missing_fields(self):
|
||||
cfg = dict()
|
||||
try:
|
||||
notifications.MeterDefinition(cfg, mock.Mock())
|
||||
notifications.MeterDefinition(cfg, mock.Mock(), mock.Mock())
|
||||
except declarative.DefinitionException as e:
|
||||
self.assertIn("Required fields ['name', 'type', 'event_type',"
|
||||
" 'unit', 'volume', 'resource_id']"
|
||||
@ -252,7 +274,7 @@ class TestMeterDefinition(test.BaseTestCase):
|
||||
unit="foo", volume="bar",
|
||||
resource_id="bea70e51c7340cb9d555b15cbfcaec23")
|
||||
try:
|
||||
notifications.MeterDefinition(cfg, mock.Mock())
|
||||
notifications.MeterDefinition(cfg, mock.Mock(), mock.Mock())
|
||||
except declarative.DefinitionException as e:
|
||||
self.assertIn("Invalid type foo specified",
|
||||
encodeutils.exception_to_unicode(e))
|
||||
@ -494,6 +516,41 @@ class TestMeterProcessing(test.BaseTestCase):
|
||||
'dict': NOTIFICATION['payload']['resource_metadata']}
|
||||
self.assertEqual(meta, s1['resource_metadata'])
|
||||
|
||||
def test_user_meta(self):
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="test1",
|
||||
event_type="test.*",
|
||||
type="delta",
|
||||
unit="B",
|
||||
volume="$.payload.volume",
|
||||
resource_id="$.payload.resource_id",
|
||||
project_id="$.payload.project_id",
|
||||
user_metadata="$.payload.metadata",)]})
|
||||
self._load_meter_def_file(cfg)
|
||||
c = list(self.handler.process_notification(USER_META))
|
||||
self.assertEqual(1, len(c))
|
||||
s1 = c[0].as_dict()
|
||||
meta = {'user_metadata': {'xyz': 'abc'}}
|
||||
self.assertEqual(meta, s1['resource_metadata'])
|
||||
|
||||
def test_user_meta_and_custom(self):
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="test1",
|
||||
event_type="test.*",
|
||||
type="delta",
|
||||
unit="B",
|
||||
volume="$.payload.volume",
|
||||
resource_id="$.payload.resource_id",
|
||||
project_id="$.payload.project_id",
|
||||
user_metadata="$.payload.metadata",
|
||||
metadata={'proj': '$.payload.project_id'})]})
|
||||
self._load_meter_def_file(cfg)
|
||||
c = list(self.handler.process_notification(USER_META))
|
||||
self.assertEqual(1, len(c))
|
||||
s1 = c[0].as_dict()
|
||||
meta = {'user_metadata': {'xyz': 'abc'}, 'proj': s1['project_id']}
|
||||
self.assertEqual(meta, s1['resource_metadata'])
|
||||
|
||||
def test_multi_match_event_meter(self):
|
||||
cfg = yaml.dump(
|
||||
{'metric': [dict(name="test1",
|
||||
|
Loading…
Reference in New Issue
Block a user