Remove monasca

This removes the monasca integration. Monasca was recently
declared as inactive project. In addition the monasca related
unit tests were very verbose and they were outputing a lot
of errors. Lately one of the unittests even started failing
randomly.

Change-Id: I468079b2a790245bd682c80d0116fb0da60d0e7c
This commit is contained in:
Jaromir Wysoglad 2023-11-08 04:30:36 -05:00
parent 46461a5ef4
commit d2e247cf38
18 changed files with 5 additions and 1728 deletions

View File

@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials"
# List of group that can set auth_section to use a different
# credentials section
OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar', 'monasca']
OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar']
def get_session(conf, requests_session=None, group=None, timeout=None):

View File

@ -1,112 +0,0 @@
# Copyright 2015 Hewlett-Packard Company
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monascaclient import client
from monascaclient import exc
from oslo_log import log
import tenacity
from ceilometer.i18n import _
from ceilometer import keystone_client
LOG = log.getLogger(__name__)
class MonascaException(Exception):
def __init__(self, message=''):
msg = 'An exception is raised from Monasca: ' + message
super(MonascaException, self).__init__(msg)
class MonascaServiceException(Exception):
def __init__(self, message=''):
msg = 'Monasca service is unavailable: ' + message
super(MonascaServiceException, self).__init__(msg)
class MonascaInvalidParametersException(Exception):
code = 400
def __init__(self, message=''):
msg = 'Request cannot be handled by Monasca: ' + message
super(MonascaInvalidParametersException, self).__init__(msg)
class Client(object):
"""A client which gets information via python-monascaclient."""
def __init__(self, conf, parsed_url):
self.conf = conf
self._retry_interval = conf.monasca.client_retry_interval
self._max_retries = conf.monasca.client_max_retries or 1
self._enable_api_pagination = conf.monasca.enable_api_pagination
# NOTE(zqfan): There are many concurrency requests while using
# Ceilosca, to save system resource, we don't retry too many times.
if self._max_retries < 0 or self._max_retries > 10:
LOG.warning('Reduce max retries from %s to 10',
self._max_retries)
self._max_retries = 10
monasca_auth_group = conf.monasca.auth_section
session = keystone_client.get_session(conf, group=monasca_auth_group)
self._endpoint = parsed_url.netloc + parsed_url.path
LOG.info(_("monasca_client: using %s as Monasca endpoint") %
self._endpoint)
self._get_client(session)
def _get_client(self, session):
self._mon_client = client.Client(self.conf.monasca.clientapi_version,
endpoint=self._endpoint,
session=session)
def call_func(self, func, **kwargs):
"""General method for calling any Monasca API function."""
@tenacity.retry(
wait=tenacity.wait_fixed(self._retry_interval),
stop=tenacity.stop_after_attempt(self._max_retries),
retry=(tenacity.retry_if_exception_type(MonascaServiceException) |
tenacity.retry_if_exception_type(MonascaException)))
def _inner():
try:
return func(**kwargs)
except (exc.http.InternalServerError,
exc.http.ServiceUnavailable,
exc.http.BadGateway,
exc.connection.ConnectionError) as e:
LOG.exception(e)
msg = '%s: %s' % (e.__class__.__name__, e)
raise MonascaServiceException(msg)
except exc.http.HttpError as e:
LOG.exception(e)
msg = '%s: %s' % (e.__class__.__name__, e)
status_code = e.http_status
if not isinstance(status_code, int):
status_code = 500
if 400 <= status_code < 500:
raise MonascaInvalidParametersException(msg)
else:
raise MonascaException(msg)
except Exception as e:
LOG.exception(e)
msg = '%s: %s' % (e.__class__.__name__, e)
raise MonascaException(msg)
return _inner()
def metrics_create(self, **kwargs):
return self.call_func(self._mon_client.metrics.create,
**kwargs)

View File

@ -1,92 +0,0 @@
#
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" All monasca ceilometer config opts"""
from oslo_config import cfg
OPTS = [
# from monasca_client
cfg.StrOpt('clientapi_version',
default='2_0',
help='Version of Monasca client to use while publishing.'),
cfg.BoolOpt('enable_api_pagination',
default=False,
help='Enable paging through monasca api resultset.'),
# from monasca_data_filter
cfg.StrOpt('monasca_mappings',
default='/etc/ceilometer/monasca_field_definitions.yaml',
help='Monasca static and dynamic field mappings'),
# from multi region opts
cfg.StrOpt('control_plane',
default='None',
help='The name of control plane'),
cfg.StrOpt('cluster',
default='None',
help='The name of cluster'),
cfg.StrOpt('cloud_name',
default='None',
help='The name of cloud'),
# from publisher monasca
cfg.BoolOpt('batch_mode',
default=True,
help='Indicates whether samples are'
' published in a batch.'),
cfg.IntOpt('batch_count',
default=1000,
help='Maximum number of samples in a batch.'),
cfg.IntOpt('batch_timeout',
default=15,
help='Maximum time interval(seconds) after which '
'samples are published in a batch.'),
cfg.IntOpt('batch_polling_interval',
default=5,
help='Frequency of checking if batch criteria is met.'),
cfg.BoolOpt('retry_on_failure',
default=False,
help='Indicates whether publisher retries publishing '
'sample in case of failure. Only a few error cases '
'are queued for a retry.'),
# NOTE: the retry interval is hard coded for the periodicals decorator
cfg.IntOpt('batch_max_retries',
default=3,
help='Maximum number of retry attempts on a publishing '
'failure to Monasca API.'),
cfg.BoolOpt('archive_on_failure',
default=False,
help='When turned on, archives metrics in file system when '
'publish to Monasca fails or metric publish maxes out '
'retry attempts.'),
cfg.StrOpt('archive_path',
default='mon_pub_failures.txt',
help='File of metrics that failed to publish to '
'Monasca. These include metrics that failed to '
'publish on first attempt and failed metrics that'
' maxed out their retries.'),
# For use with the monasca_client
cfg.IntOpt('client_max_retries',
default=3,
help='Maximum number of retry attempts of connecting to '
'Monasca API.'),
cfg.IntOpt('client_retry_interval',
default=60,
help='Frequency of attempting a retry connecting to Monasca '
'API.'),
]

View File

@ -27,7 +27,6 @@ import ceilometer.ipmi.platform.intel_node_manager
import ceilometer.ipmi.pollsters
import ceilometer.keystone_client
import ceilometer.meter.notifications
import ceilometer.monasca_opts
import ceilometer.neutron_client
import ceilometer.notification
import ceilometer.nova_client
@ -91,7 +90,6 @@ def list_opts():
itertools.chain(ceilometer.ipmi.platform.intel_node_manager.OPTS,
ceilometer.ipmi.pollsters.OPTS)),
('meter', ceilometer.meter.notifications.OPTS),
('monasca', ceilometer.monasca_opts.OPTS),
('notification',
itertools.chain(ceilometer.notification.OPTS,
ceilometer.notification.EXCHANGES_OPTS)),

View File

@ -1,249 +0,0 @@
#
# Copyright 2015 Hewlett Packard
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from futurist import periodics
import os
import threading
import time
from oslo_log import log
import ceilometer
from ceilometer import monasca_client as mon_client
from ceilometer import publisher
from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
from monascaclient import exc
import traceback
# Have to use constants rather than conf to satisfy @periodicals
BATCH_POLLING_INTERVAL = 5
BATCH_RETRY_INTERVAL = 60
LOG = log.getLogger(__name__)
class MonascaPublisher(publisher.ConfigPublisherBase):
"""Publisher to publish samples to monasca using monasca-client.
Example URL to place in pipeline.yaml:
- monasca://http://192.168.10.4:8070/v2.0
"""
def __init__(self, conf, parsed_url):
super(MonascaPublisher, self).__init__(conf, parsed_url)
# list to hold metrics to be published in batch (behaves like queue)
self.metric_queue = []
self.time_of_last_batch_run = time.time()
self.mon_client = mon_client.Client(self.conf, parsed_url)
self.mon_filter = MonascaDataFilter(self.conf)
# add flush_batch function to periodic callables
periodic_callables = [
# The function to run + any automatically provided
# positional and keyword arguments to provide to it
# everytime it is activated.
(self.flush_batch, (), {}),
]
if self.conf.monasca.retry_on_failure:
# list to hold metrics to be re-tried (behaves like queue)
self.retry_queue = []
# list to store retry attempts for metrics in retry_queue
self.retry_counter = []
# add retry_batch function to periodic callables
periodic_callables.append((self.retry_batch, (), {}))
if self.conf.monasca.archive_on_failure:
archive_path = self.conf.monasca.archive_path
if not os.path.exists(archive_path):
archive_path = self.conf.find_file(archive_path)
self.archive_handler = publisher.get_publisher(
self.conf,
'file://' +
str(archive_path),
'ceilometer.sample.publisher')
# start periodic worker
self.periodic_worker = periodics.PeriodicWorker(periodic_callables)
self.periodic_thread = threading.Thread(
target=self.periodic_worker.start)
self.periodic_thread.daemon = True
self.periodic_thread.start()
def _publish_handler(self, func, metrics, batch=False):
"""Handles publishing and exceptions that arise."""
try:
metric_count = len(metrics)
if batch:
func(**{'jsonbody': metrics})
else:
func(**metrics[0])
LOG.info('Successfully published %d metric(s)' % metric_count)
except mon_client.MonascaServiceException:
# Assuming atomicity of create or failure - meaning
# either all succeed or all fail in a batch
LOG.error('Metric create failed for %(count)d metric(s) with'
' name(s) %(names)s ' %
({'count': len(metrics),
'names': ','.join([metric['name']
for metric in metrics])}))
if self.conf.monasca.retry_on_failure:
# retry payload in case of internal server error(500),
# service unavailable error(503),bad gateway (502) or
# Communication Error
# append failed metrics to retry_queue
LOG.debug('Adding metrics to retry queue.')
self.retry_queue.extend(metrics)
# initialize the retry_attempt for the each failed
# metric in retry_counter
self.retry_counter.extend(
[0 * i for i in range(metric_count)])
else:
if hasattr(self, 'archive_handler'):
self.archive_handler.publish_samples(None, metrics)
except Exception:
LOG.info(traceback.format_exc())
if hasattr(self, 'archive_handler'):
self.archive_handler.publish_samples(None, metrics)
def publish_samples(self, samples):
"""Main method called to publish samples."""
for sample in samples:
metric = self.mon_filter.process_sample_for_monasca(sample)
# In batch mode, push metric to queue,
# else publish the metric
if self.conf.monasca.batch_mode:
LOG.debug('Adding metric to queue.')
self.metric_queue.append(metric)
else:
LOG.info('Publishing metric with name %(name)s and'
' timestamp %(ts)s to endpoint.' %
({'name': metric['name'],
'ts': metric['timestamp']}))
self._publish_handler(self.mon_client.metrics_create, [metric])
def is_batch_ready(self):
"""Method to check if batch is ready to trigger."""
previous_time = self.time_of_last_batch_run
current_time = time.time()
elapsed_time = current_time - previous_time
if elapsed_time >= self.conf.monasca.batch_timeout and len(self.
metric_queue) > 0:
LOG.info('Batch timeout exceeded, triggering batch publish.')
return True
else:
if len(self.metric_queue) >= self.conf.monasca.batch_count:
LOG.info('Batch queue full, triggering batch publish.')
return True
else:
return False
@periodics.periodic(BATCH_POLLING_INTERVAL)
def flush_batch(self):
"""Method to flush the queued metrics."""
# print "flush batch... %s" % str(time.time())
if self.is_batch_ready():
# publish all metrics in queue at this point
batch_count = len(self.metric_queue)
LOG.info("batch is ready: batch_count %s" % str(batch_count))
self._publish_handler(self.mon_client.metrics_create,
self.metric_queue[:batch_count],
batch=True)
self.time_of_last_batch_run = time.time()
# slice queue to remove metrics that
# published with success or failed and got queued on
# retry queue
self.metric_queue = self.metric_queue[batch_count:]
def is_retry_ready(self):
"""Method to check if retry batch is ready to trigger."""
if len(self.retry_queue) > 0:
LOG.info('Retry queue has items, triggering retry.')
return True
else:
return False
@periodics.periodic(BATCH_RETRY_INTERVAL)
def retry_batch(self):
"""Method to retry the failed metrics."""
# print "retry batch...%s" % str(time.time())
if self.is_retry_ready():
retry_count = len(self.retry_queue)
# Iterate over the retry_queue to eliminate
# metrics that have maxed out their retry attempts
for ctr in range(retry_count):
if self.retry_counter[ctr] > self.conf.\
monasca.batch_max_retries:
if hasattr(self, 'archive_handler'):
self.archive_handler.publish_samples(
None,
[self.retry_queue[ctr]])
LOG.info('Removing metric %s from retry queue.'
' Metric retry maxed out retry attempts' %
self.retry_queue[ctr]['name'])
del self.retry_queue[ctr]
del self.retry_counter[ctr]
# Iterate over the retry_queue to retry the
# publish for each metric.
# If an exception occurs, the retry count for
# the failed metric is incremented.
# If the retry succeeds, remove the metric and
# the retry count from the retry_queue and retry_counter resp.
ctr = 0
while ctr < len(self.retry_queue):
try:
LOG.info('Retrying metric publish from retry queue.')
self.mon_client.metrics_create(**self.retry_queue[ctr])
# remove from retry queue if publish was success
LOG.info('Retrying metric %s successful,'
' removing metric from retry queue.' %
self.retry_queue[ctr]['name'])
del self.retry_queue[ctr]
del self.retry_counter[ctr]
except exc.ClientException:
LOG.error('Exception encountered in retry. '
'Batch will be retried in next attempt.')
# if retry failed, increment the retry counter
self.retry_counter[ctr] += 1
ctr += 1
def flush_to_file(self):
# TODO(persist maxed-out metrics to file)
pass
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -1,228 +0,0 @@
#
# Copyright 2015 Hewlett-Packard Company
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from jsonpath_rw_ext import parser
from oslo_log import log
from oslo_utils import timeutils
import yaml
from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
class UnableToLoadMappings(Exception):
pass
class NoMappingsFound(Exception):
pass
class CeiloscaMappingDefinitionException(Exception):
def __init__(self, message, definition_cfg):
super(CeiloscaMappingDefinitionException, self).__init__(message)
self.message = message
self.definition_cfg = definition_cfg
def __str__(self):
return '%s %s: %s' % (self.__class__.__name__,
self.definition_cfg, self.message)
class MonascaDataFilter(object):
JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser()
def __init__(self, conf):
self.conf = conf
self._mapping = {}
self._mapping = self._get_mapping()
def _get_mapping(self):
with open(self.conf.monasca.monasca_mappings, 'r') as f:
try:
return yaml.safe_load(f)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = ("Invalid YAML syntax in Monasca Data "
"Filter file %(file)s at line: "
"%(line)s, column: %(column)s."
% dict(file=self.conf.monasca.monasca_mappings,
line=mark.line + 1,
column=mark.column + 1))
else:
errmsg = ("YAML error reading Monasca Data Filter "
"file %(file)s" %
dict(file=self.conf.monasca.monasca_mappings))
LOG.error(errmsg)
raise UnableToLoadMappings(err.message)
def _convert_timestamp(self, timestamp):
if isinstance(timestamp, datetime.datetime):
ts = timestamp
else:
ts = timeutils.parse_isotime(timestamp)
tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo))
# convert timestamp to milli seconds as Monasca expects
return int(tdelta.total_seconds() * 1000)
def _convert_to_sample(self, s):
return sample_util.Sample(
name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source')).as_dict()
def get_value_for_nested_dictionary(self, lst, dct):
val = dct
for element in lst:
if isinstance(val, dict) and element in val:
val = val.get(element)
else:
return
return val
def parse_jsonpath(self, field):
try:
parts = self.JSONPATH_RW_PARSER.parse(field)
except Exception as e:
raise CeiloscaMappingDefinitionException(
"Parse error in JSONPath specification "
"'%(jsonpath)s': %(err)s"
% dict(jsonpath=field, err=e))
return parts
def _get_value_metadata_for_key(self, sample_meta, meta_key):
"""Get the data for the given key, supporting JSONPath"""
if isinstance(meta_key, dict):
# extract key and jsonpath
# If following convention, dict will have one and only one
# element of the form <monasca key>: <json path>
if len(meta_key.keys()) == 1:
mon_key = list(meta_key.keys())[0]
else:
# If no keys or more keys than one
raise CeiloscaMappingDefinitionException(
"Field definition format mismatch, should "
"have only one key:value pair. %(meta_key)s" %
{'meta_key': meta_key}, meta_key)
json_path = meta_key[mon_key]
parts = self.parse_jsonpath(json_path)
val_matches = parts.find(sample_meta)
if len(val_matches) > 0:
# resolve the find to the first match and get value
val = val_matches[0].value
if not isinstance(val, (str, str)) \
and not isinstance(val, int):
# Don't support lists or dicts or ...
raise CeiloscaMappingDefinitionException(
"Metadata format mismatch, value "
"should be a simple string. %(valuev)s" %
{'valuev': val}, meta_key)
else:
val = 'None'
return mon_key, val
else:
# simple string
val = sample_meta.get(meta_key, None)
if val is not None:
return meta_key, val
else:
# one more attempt using a dotted notation
# TODO(joadavis) Deprecate this . notation code
# in favor of jsonpath
if len(meta_key.split('.')) > 1:
val = self.get_value_for_nested_dictionary(
meta_key.split('.'), sample_meta)
if val is not None:
return meta_key, val
else:
return meta_key, 'None'
else:
return meta_key, 'None'
def process_sample_for_monasca(self, sample_obj):
if not self._mapping:
raise NoMappingsFound("Unable to process the sample")
dimensions = {}
dimensions['datasource'] = 'ceilometer'
# control_plane, cluster and cloud_name can be None, but we use
# literal 'None' for such case
dimensions['control_plane'] = self.conf.monasca.control_plane or 'None'
dimensions['cluster'] = self.conf.monasca.cluster or 'None'
dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None'
if isinstance(sample_obj, sample_util.Sample):
sample = sample_obj.as_dict()
elif isinstance(sample_obj, dict):
if 'counter_name' in sample_obj:
sample = self._convert_to_sample(sample_obj)
else:
sample = sample_obj
for dim in self._mapping['dimensions']:
val = sample.get(dim, None)
if val:
dimensions[dim] = val
else:
dimensions[dim] = 'None'
sample_meta = sample.get('resource_metadata', None)
value_meta = {}
meter_name = sample.get('name') or sample.get('counter_name')
if sample_meta:
for meta_key in self._mapping['metadata']['common']:
monasca_key, val = self._get_value_metadata_for_key(
sample_meta, meta_key)
value_meta[monasca_key] = val
if meter_name in self._mapping['metadata'].keys():
for meta_key in self._mapping['metadata'][meter_name]:
monasca_key, val = self._get_value_metadata_for_key(
sample_meta, meta_key)
value_meta[monasca_key] = val
meter_value = sample.get('volume') or sample.get('counter_volume')
if meter_value is None:
meter_value = 0
metric = dict(
name=meter_name,
timestamp=self._convert_timestamp(sample['timestamp']),
value=meter_value,
dimensions=dimensions,
value_meta=value_meta,
)
LOG.debug("Generated metric with name %(name)s,"
" timestamp %(timestamp)s, value %(value)s,"
" dimensions %(dimensions)s" %
{'name': metric['name'],
'timestamp': metric['timestamp'],
'value': metric['value'],
'dimensions': metric['dimensions']})
return metric

View File

@ -1,511 +0,0 @@
#
# Copyright 2015 Hewlett-Packard Company
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from unittest import mock
from oslotest import base
from ceilometer import monasca_opts
from ceilometer.publisher import monasca_data_filter as mdf
from ceilometer import sample
from ceilometer import service
class TestMonUtils(base.BaseTestCase):
def setUp(self):
super(TestMonUtils, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.register_opts(list(monasca_opts.OPTS),
'monasca')
self._field_mappings = {
'dimensions': ['resource_id',
'project_id',
'user_id',
'geolocation',
'region',
'source',
'availability_zone'],
'metadata': {
'common': ['event_type',
'audit_period_beginning',
'audit_period_ending'],
'image': ['size', 'status', 'image_meta.base_url',
'image_meta.base_url2', 'image_meta.base_url3',
'image_meta.base_url4'],
'image.delete': ['size', 'status'],
'image.size': ['size', 'status'],
'image.update': ['size', 'status'],
'image.upload': ['size', 'status'],
'instance': ['state', 'state_description'],
'snapshot': ['status'],
'snapshot.size': ['status'],
'volume': ['status'],
'volume.size': ['status'],
}
}
self._field_mappings_cinder = {
'dimensions': ['resource_id',
'project_id',
'user_id',
'geolocation',
'region',
'source',
'availability_zone'],
'metadata': {
'common': ['event_type',
'audit_period_beginning',
'audit_period_ending',
'arbitrary_new_field'],
'volume.create.end':
['size', 'status',
{'metering.prn_name':
"$.metadata[?(@.key = 'metering.prn_name')].value"},
{'metering.prn_type':
"$.metadata[?(@.key = 'metering.prn_type')].value"},
'volume_type', 'created_at',
'host'],
'volume': ['status'],
'volume.size': ['status'],
}
}
self._field_mappings_bad_format = {
'dimensions': ['resource_id',
'project_id',
'user_id',
'geolocation',
'region',
'source',
'availability_zone'],
'metadata': {
'common': ['event_type',
'audit_period_beginning',
'audit_period_ending',
'arbitrary_new_field'],
'volume.create.end':
['size', 'status',
{'metering.prn_name':
"$.metadata[?(@.key = 'metering.prn_name')].value",
'metering.prn_type':
"$.metadata[?(@.key = 'metering.prn_type')].value"},
'volume_type', 'created_at',
'host'],
'volume': ['status'],
'volume.size': ['status'],
}
}
def test_process_sample(self):
s = sample.Sample(
name='test',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
with mock.patch(to_patch, side_effect=[self._field_mappings]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('dimensions'))
self.assertIsNotNone(r.get('value_meta'))
self.assertIsNotNone(r.get('value'))
self.assertEqual(s.user_id, r['dimensions'].get('user_id'))
self.assertEqual(s.project_id, r['dimensions'].get('project_id'))
self.assertEqual(s.resource_id, r['dimensions'].get('resource_id'))
# 2015-04-07T20:07:06.156986 compare upto millisec
monasca_ts = datetime.datetime.utcfromtimestamp(
r['timestamp'] / 1000.0).isoformat()[:23]
self.assertEqual(s.timestamp[:23], monasca_ts)
def test_process_sample_field_mappings(self):
s = sample.Sample(
name='test',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
)
field_map = self._field_mappings
field_map['dimensions'].remove('project_id')
field_map['dimensions'].remove('user_id')
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
with mock.patch(to_patch, side_effect=[field_map]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertIsNone(r['dimensions'].get('project_id'))
self.assertIsNone(r['dimensions'].get('user_id'))
def convert_dict_to_list(self, dct, prefix=None, outlst={}):
prefix = prefix + '.' if prefix else ""
for k, v in dct.items():
if type(v) is dict:
self.convert_dict_to_list(v, prefix + k, outlst)
else:
if v is not None:
outlst[prefix + k] = v
else:
outlst[prefix + k] = 'None'
return outlst
def test_process_sample_metadata(self):
s = sample.Sample(
name='image',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'notification',
'status': 'active',
'image_meta': {'base_url': 'http://image.url',
'base_url2': '',
'base_url3': None},
'size': 1500},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
with mock.patch(to_patch, side_effect=[self._field_mappings]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
self.assertTrue(set(self.convert_dict_to_list(
s.resource_metadata
).items()).issubset(set(r['value_meta'].items())))
def test_process_sample_metadata_with_empty_data(self):
s = sample.Sample(
name='image',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'notification',
'status': 'active',
'image_meta': {'base_url': 'http://image.url',
'base_url2': '',
'base_url3': None},
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
with mock.patch(to_patch, side_effect=[self._field_mappings]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
self.assertEqual(s.source, r['dimensions']['source'])
self.assertTrue(set(self.convert_dict_to_list(
s.resource_metadata
).items()).issubset(set(r['value_meta'].items())))
def test_process_sample_metadata_with_extendedKey(self):
s = sample.Sample(
name='image',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'notification',
'status': 'active',
'image_meta': {'base_url': 'http://image.url',
'base_url2': '',
'base_url3': None},
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
with mock.patch(to_patch, side_effect=[self._field_mappings]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
self.assertTrue(set(self.convert_dict_to_list(
s.resource_metadata
).items()).issubset(set(r['value_meta'].items())))
self.assertEqual(r.get('value_meta')['image_meta.base_url'],
s.resource_metadata.get('image_meta')
['base_url'])
self.assertEqual(r.get('value_meta')['image_meta.base_url2'],
s.resource_metadata.get('image_meta')
['base_url2'])
self.assertEqual(r.get('value_meta')['image_meta.base_url3'],
str(s.resource_metadata.get('image_meta')
['base_url3']))
self.assertEqual(r.get('value_meta')['image_meta.base_url4'],
'None')
def test_process_sample_metadata_with_jsonpath(self):
"""Test meter sample in a format produced by cinder."""
s = sample.Sample(
name='volume.create.end',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'volume.create.end',
'status': 'available',
'volume_type': None,
# 'created_at': '2017-03-21T21:05:44+00:00',
'host': 'testhost',
# this "value: , key: " format is
# how cinder reports metadata
'metadata':
[{'value': 'aaa0001',
'key': 'metering.prn_name'},
{'value': 'Cust001',
'key': 'metering.prn_type'}],
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
# use the cinder specific mapping
with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
# Using convert_dict_to_list is too simplistic for this
self.assertEqual(r.get('value_meta')['event_type'],
s.resource_metadata.get('event_type'),
"Failed to match common element.")
self.assertEqual(r.get('value_meta')['host'],
s.resource_metadata.get('host'),
"Failed to match meter specific element.")
self.assertEqual(r.get('value_meta')['size'],
s.resource_metadata.get('size'),
"Unable to handle an int.")
self.assertEqual(r.get('value_meta')['metering.prn_name'],
'aaa0001',
"Failed to extract a value "
"using specified jsonpath.")
def test_process_sample_metadata_with_jsonpath_nomatch(self):
"""Test meter sample in a format produced by cinder.
Behavior when no matching element is found for the specified jsonpath
"""
s = sample.Sample(
name='volume.create.end',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'volume.create.end',
'status': 'available',
'volume_type': None,
# 'created_at': '2017-03-21T21:05:44+00:00',
'host': 'testhost',
'metadata': [{'value': 'aaa0001',
'key': 'metering.THISWONTMATCH'}],
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
# use the cinder specific mapping
with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
# Using convert_dict_to_list is too simplistic for this
self.assertEqual(r.get('value_meta')['event_type'],
s.resource_metadata.get('event_type'),
"Failed to match common element.")
self.assertEqual(r.get('value_meta')['host'],
s.resource_metadata.get('host'),
"Failed to match meter specific element.")
self.assertEqual(r.get('value_meta')['size'],
s.resource_metadata.get('size'),
"Unable to handle an int.")
self.assertEqual(r.get('value_meta')['metering.prn_name'],
'None', "This metadata should fail to match "
"and then return 'None'.")
def test_process_sample_metadata_with_jsonpath_value_not_str(self):
"""Test where jsonpath is used but result is not a simple string"""
s = sample.Sample(
name='volume.create.end',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'volume.create.end',
'status': 'available',
'volume_type': None,
# 'created_at': '2017-03-21T21:05:44+00:00',
'host': 'testhost',
'metadata': [{'value': ['aaa0001', 'bbbb002'],
'key': 'metering.prn_name'}],
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
# use the cinder specific mapping
with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]):
data_filter = mdf.MonascaDataFilter(self.CONF)
try:
# Don't assign to a variable, this should raise
data_filter.process_sample_for_monasca(s)
except mdf.CeiloscaMappingDefinitionException as e:
self.assertEqual(
'Metadata format mismatch, value should be '
'a simple string. [\'aaa0001\', \'bbbb002\']',
e.message)
def test_process_sample_metadata_with_jsonpath_value_is_int(self):
"""Test meter sample where jsonpath result is an int."""
s = sample.Sample(
name='volume.create.end',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'volume.create.end',
'status': 'available',
'volume_type': None,
# 'created_at': '2017-03-21T21:05:44+00:00',
'host': 'testhost',
'metadata': [{'value': 13,
'key': 'metering.prn_name'}],
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
# use the cinder specific mapping
with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]):
data_filter = mdf.MonascaDataFilter(self.CONF)
r = data_filter.process_sample_for_monasca(s)
self.assertEqual(s.name, r['name'])
self.assertIsNotNone(r.get('value_meta'))
# Using convert_dict_to_list is too simplistic for this
self.assertEqual(r.get('value_meta')['event_type'],
s.resource_metadata.get('event_type'),
"Failed to match common element.")
self.assertEqual(r.get('value_meta')['host'],
s.resource_metadata.get('host'),
"Failed to match meter specific element.")
self.assertEqual(r.get('value_meta')['size'],
s.resource_metadata.get('size'),
"Unable to handle an int.")
self.assertEqual(r.get('value_meta')['metering.prn_name'],
13,
"Unable to handle an int "
"through the jsonpath processing")
def test_process_sample_metadata_with_jsonpath_bad_format(self):
"""Test handling of definition that is not written correctly"""
s = sample.Sample(
name='volume.create.end',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
source='',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'event_type': 'volume.create.end',
'status': 'available',
'volume_type': None,
# 'created_at': '2017-03-21T21:05:44+00:00',
'host': 'testhost',
'metadata': [{'value': 13,
'key': 'metering.prn_name'}],
'size': 0},
)
to_patch = ("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping")
# use the bad mapping
with mock.patch(to_patch,
side_effect=[self._field_mappings_bad_format]):
data_filter = mdf.MonascaDataFilter(self.CONF)
try:
# Don't assign to a variable as this should raise
data_filter.process_sample_for_monasca(s)
except mdf.CeiloscaMappingDefinitionException as e:
# Make sure we got the right kind of error
# Cannot check the whole message text, as python
# may reorder a dict when producing a string version
self.assertIn(
'Field definition format mismatch, should '
'have only one key:value pair.',
e.message,
"Did raise exception but wrong message - %s" % e.message)

View File

@ -1,204 +0,0 @@
#
# Copyright 2015 Hewlett Packard
# (c) Copyright 2018 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/monasca.py"""
import datetime
import fixtures
import time
from unittest import mock
from oslotest import base
from ceilometer import monasca_client as mon_client
from ceilometer.publisher import monasca as mon_publisher
from ceilometer import sample
from ceilometer import service
class FakeResponse(object):
def __init__(self, status_code):
self.status_code = status_code
class TestMonascaPublisher(base.BaseTestCase):
test_data = [
sample.Sample(
name='test',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='test2',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='test2',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
field_mappings = {
'dimensions': ['resource_id',
'project_id',
'user_id',
'geolocation',
'region',
'availability_zone'],
'metadata': {
'common': ['event_type',
'audit_period_beginning',
'audit_period_ending'],
'image': ['size', 'status'],
'image.delete': ['size', 'status'],
'image.size': ['size', 'status'],
'image.update': ['size', 'status'],
'image.upload': ['size', 'status'],
'instance': ['state', 'state_description'],
'snapshot': ['status'],
'snapshot.size': ['status'],
'volume': ['status'],
'volume.size': ['status'],
}
}
@staticmethod
def create_side_effect(exception_type, test_exception):
def side_effect(*args, **kwargs):
if test_exception.pop():
raise exception_type
else:
return FakeResponse(204)
return side_effect
def setUp(self):
super(TestMonascaPublisher, self).setUp()
self.CONF = service.prepare_service([], [])
self.parsed_url = mock.MagicMock()
def tearDown(self):
# For some reason, cfg.CONF is registered a required option named
# auth_url after these tests run, which occasionally blocks test
# case test_event_pipeline_endpoint_requeue_on_failure, so we
# unregister it here.
self.CONF.reset()
# self.CONF.unregister_opt(cfg.StrOpt('service_auth_url'),
# group='monasca')
super(TestMonascaPublisher, self).tearDown()
@mock.patch("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping",
side_effect=[field_mappings])
def test_publisher_publish(self, mapping_patch):
self.CONF.set_override('batch_mode', False, group='monasca')
publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url)
publisher.mon_client = mock.MagicMock()
with mock.patch.object(publisher.mon_client,
'metrics_create') as mock_create:
mock_create.return_value = FakeResponse(204)
publisher.publish_samples(self.test_data)
self.assertEqual(3, mock_create.call_count)
self.assertEqual(1, mapping_patch.called)
@mock.patch("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping",
side_effect=[field_mappings])
def test_publisher_batch(self, mapping_patch):
self.CONF.set_override('batch_mode', True, group='monasca')
self.CONF.set_override('batch_count', 3, group='monasca')
self.CONF.set_override('batch_polling_interval', 1, group='monasca')
publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url)
publisher.mon_client = mock.MagicMock()
with mock.patch.object(publisher.mon_client,
'metrics_create') as mock_create:
mock_create.return_value = FakeResponse(204)
publisher.publish_samples(self.test_data)
time.sleep(10)
self.assertEqual(1, mock_create.call_count)
self.assertEqual(1, mapping_patch.called)
@mock.patch("ceilometer.publisher.monasca_data_filter."
"MonascaDataFilter._get_mapping",
side_effect=[field_mappings])
def test_publisher_batch_retry(self, mapping_patch):
self.CONF.set_override('batch_mode', True, group='monasca')
self.CONF.set_override('batch_count', 3, group='monasca')
self.CONF.set_override('batch_polling_interval', 1, group='monasca')
self.CONF.set_override('retry_on_failure', True, group='monasca')
# Constant in code for @periodicals, can't override
# self.CONF.set_override('retry_interval', 2, group='monasca')
self.CONF.set_override('batch_max_retries', 1, group='monasca')
publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url)
publisher.mon_client = mock.MagicMock()
with mock.patch.object(publisher.mon_client,
'metrics_create') as mock_create:
raise_http_error = [False, False, False, True]
mock_create.side_effect = self.create_side_effect(
mon_client.MonascaServiceException,
raise_http_error)