From 126350c0ae609c5d35d54556883da2476e81e30e Mon Sep 17 00:00:00 2001 From: Joseph Davis Date: Wed, 18 Apr 2018 15:25:35 -0700 Subject: [PATCH] publisher: Contribute the Monasca publisher The Ceilosca (monasca-ceilometer) publisher has been around since before the Mitaka release and has been used in production for years. The MonascaPublisher acts as another Ceilometer publisher and sends selected metrics on to the Monasca API for storage, aggregation, alarming, etc. Once metrics are in Monasca, they may be retrieved through the Monasca API or with the python-monascaclient. This Ceilosca functionality is a key component for metering in several distributions and is used in many customer installations. With the removal of the Ceilometer v2 API (which allowed the removal of the Ceilosca storage driver, shrinking the Ceilosca code base) and continuing changes to Ceilometer, a tighter integration with the ceilometer repo may be beneficial to keep both Monasca and Telemetry in sync. Change-Id: I2cbce160503e23dfbde375722a3bd100ec86494e Story: 2001239 Task: 5769 --- ceilometer/keystone_client.py | 2 +- ceilometer/monasca_client.py | 112 ++++ ceilometer/monasca_opts.py | 92 ++++ ceilometer/opts.py | 2 + ceilometer/publisher/monasca.py | 250 +++++++++ ceilometer/publisher/monasca_data_filter.py | 229 ++++++++ .../publisher/test_monasca_data_filter.py | 512 ++++++++++++++++++ .../unit/publisher/test_monasca_publisher.py | 205 +++++++ ceilometer/tests/unit/test_monascaclient.py | 148 +++++ doc/source/admin/telemetry-data-pipelines.rst | 43 ++ doc/source/contributor/architecture.rst | 7 +- .../monasca-publisher/example_ceilometer.conf | 32 ++ .../monasca-publisher/example_pipeline.yaml | 44 ++ .../monasca_field_definitions.yaml | 58 ++ lower-constraints.txt | 1 + ...de-monasca-publisher-1f47dde52af50feb.yaml | 8 + requirements.txt | 1 + setup.cfg | 1 + 18 files changed, 1743 insertions(+), 4 deletions(-) create mode 100644 ceilometer/monasca_client.py create mode 100644 ceilometer/monasca_opts.py create mode 100755 ceilometer/publisher/monasca.py create mode 100644 ceilometer/publisher/monasca_data_filter.py create mode 100644 ceilometer/tests/unit/publisher/test_monasca_data_filter.py create mode 100755 ceilometer/tests/unit/publisher/test_monasca_publisher.py create mode 100644 ceilometer/tests/unit/test_monascaclient.py create mode 100644 etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf create mode 100644 etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml create mode 100644 etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml create mode 100644 releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml diff --git a/ceilometer/keystone_client.py b/ceilometer/keystone_client.py index 835bf3fc84..7c44bf6a66 100644 --- a/ceilometer/keystone_client.py +++ b/ceilometer/keystone_client.py @@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials" # List of group that can set auth_section to use a different # credentials section -OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar'] +OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar', 'monasca'] def get_session(conf, requests_session=None, group=None, timeout=None): diff --git a/ceilometer/monasca_client.py b/ceilometer/monasca_client.py new file mode 100644 index 0000000000..4835fdcc31 --- /dev/null +++ b/ceilometer/monasca_client.py @@ -0,0 +1,112 @@ +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from monascaclient import client +from monascaclient import exc +from oslo_log import log +import tenacity + +from ceilometer.i18n import _ +from ceilometer import keystone_client + +LOG = log.getLogger(__name__) + + +class MonascaException(Exception): + def __init__(self, message=''): + msg = 'An exception is raised from Monasca: ' + message + super(MonascaException, self).__init__(msg) + + +class MonascaServiceException(Exception): + def __init__(self, message=''): + msg = 'Monasca service is unavailable: ' + message + super(MonascaServiceException, self).__init__(msg) + + +class MonascaInvalidParametersException(Exception): + code = 400 + + def __init__(self, message=''): + msg = 'Request cannot be handled by Monasca: ' + message + super(MonascaInvalidParametersException, self).__init__(msg) + + +class Client(object): + """A client which gets information via python-monascaclient.""" + + def __init__(self, conf, parsed_url): + self.conf = conf + self._retry_interval = conf.monasca.client_retry_interval + self._max_retries = conf.monasca.client_max_retries or 1 + self._enable_api_pagination = conf.monasca.enable_api_pagination + # NOTE(zqfan): There are many concurrency requests while using + # Ceilosca, to save system resource, we don't retry too many times. + if self._max_retries < 0 or self._max_retries > 10: + LOG.warning('Reduce max retries from %s to 10', + self._max_retries) + self._max_retries = 10 + + monasca_auth_group = conf.monasca.auth_section + session = keystone_client.get_session(conf, group=monasca_auth_group) + + self._endpoint = parsed_url.netloc + parsed_url.path + LOG.info(_("monasca_client: using %s as Monasca endpoint") % + self._endpoint) + + self._get_client(session) + + def _get_client(self, session): + self._mon_client = client.Client(self.conf.monasca.clientapi_version, + endpoint=self._endpoint, + session=session) + + def call_func(self, func, **kwargs): + """General method for calling any Monasca API function.""" + @tenacity.retry( + wait=tenacity.wait_fixed(self._retry_interval), + stop=tenacity.stop_after_attempt(self._max_retries), + retry=(tenacity.retry_if_exception_type(MonascaServiceException) | + tenacity.retry_if_exception_type(MonascaException))) + def _inner(): + try: + return func(**kwargs) + except (exc.http.InternalServerError, + exc.http.ServiceUnavailable, + exc.http.BadGateway, + exc.connection.ConnectionError) as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + raise MonascaServiceException(msg) + except exc.http.HttpError as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + status_code = e.http_status + if not isinstance(status_code, int): + status_code = 500 + if 400 <= status_code < 500: + raise MonascaInvalidParametersException(msg) + else: + raise MonascaException(msg) + except Exception as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + raise MonascaException(msg) + + return _inner() + + def metrics_create(self, **kwargs): + return self.call_func(self._mon_client.metrics.create, + **kwargs) diff --git a/ceilometer/monasca_opts.py b/ceilometer/monasca_opts.py new file mode 100644 index 0000000000..2d9190f179 --- /dev/null +++ b/ceilometer/monasca_opts.py @@ -0,0 +1,92 @@ +# +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" All monasca ceilometer config opts""" + +from oslo_config import cfg + +OPTS = [ + + # from monasca_client + cfg.StrOpt('clientapi_version', + default='2_0', + help='Version of Monasca client to use while publishing.'), + cfg.BoolOpt('enable_api_pagination', + default=False, + help='Enable paging through monasca api resultset.'), + + # from monasca_data_filter + cfg.StrOpt('monasca_mappings', + default='/etc/ceilometer/monasca_field_definitions.yaml', + help='Monasca static and dynamic field mappings'), + + # from multi region opts + cfg.StrOpt('control_plane', + default='None', + help='The name of control plane'), + cfg.StrOpt('cluster', + default='None', + help='The name of cluster'), + cfg.StrOpt('cloud_name', + default='None', + help='The name of cloud'), + + # from publisher monasca + cfg.BoolOpt('batch_mode', + default=True, + help='Indicates whether samples are' + ' published in a batch.'), + cfg.IntOpt('batch_count', + default=1000, + help='Maximum number of samples in a batch.'), + cfg.IntOpt('batch_timeout', + default=15, + help='Maximum time interval(seconds) after which ' + 'samples are published in a batch.'), + cfg.IntOpt('batch_polling_interval', + default=5, + help='Frequency of checking if batch criteria is met.'), + cfg.BoolOpt('retry_on_failure', + default=False, + help='Indicates whether publisher retries publishing' + 'sample in case of failure. Only a few error cases ' + 'are queued for a retry.'), + # NOTE: the retry interval is hard coded for the periodicals decorator + cfg.IntOpt('batch_max_retries', + default=3, + help='Maximum number of retry attempts on a publishing ' + 'failure to Monasca API.'), + cfg.BoolOpt('archive_on_failure', + default=False, + help='When turned on, archives metrics in file system when ' + 'publish to Monasca fails or metric publish maxes out ' + 'retry attempts.'), + cfg.StrOpt('archive_path', + default='mon_pub_failures.txt', + help='File of metrics that failed to publish to ' + 'Monasca. These include metrics that failed to ' + 'publish on first attempt and failed metrics that' + ' maxed out their retries.'), + # For use with the monasca_client + cfg.IntOpt('client_max_retries', + default=3, + help='Maximum number of retry attempts of connecting to ' + 'Monasca API.'), + cfg.IntOpt('client_retry_interval', + default=60, + help='Frequency of attempting a retry connecting to Monasca ' + 'API.'), + +] diff --git a/ceilometer/opts.py b/ceilometer/opts.py index 8b844cc00a..e37461f981 100644 --- a/ceilometer/opts.py +++ b/ceilometer/opts.py @@ -30,6 +30,7 @@ import ceilometer.ipmi.platform.intel_node_manager import ceilometer.ipmi.pollsters import ceilometer.keystone_client import ceilometer.meter.notifications +import ceilometer.monasca_opts import ceilometer.neutron_client import ceilometer.notification import ceilometer.nova_client @@ -99,6 +100,7 @@ def list_opts(): itertools.chain(ceilometer.ipmi.platform.intel_node_manager.OPTS, ceilometer.ipmi.pollsters.OPTS)), ('meter', ceilometer.meter.notifications.OPTS), + ('monasca', ceilometer.monasca_opts.OPTS), ('notification', itertools.chain(ceilometer.notification.OPTS, ceilometer.notification.EXCHANGES_OPTS)), diff --git a/ceilometer/publisher/monasca.py b/ceilometer/publisher/monasca.py new file mode 100755 index 0000000000..37aee07ea1 --- /dev/null +++ b/ceilometer/publisher/monasca.py @@ -0,0 +1,250 @@ +# +# Copyright 2015 Hewlett Packard +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from futurist import periodics + +import os +import threading +import time + +from oslo_log import log +from six import moves + +import ceilometer +from ceilometer import monasca_client as mon_client +from ceilometer import publisher +from ceilometer.publisher.monasca_data_filter import MonascaDataFilter + +from monascaclient import exc +import traceback + +# Have to use constants rather than conf to satisfy @periodicals +BATCH_POLLING_INTERVAL = 5 +BATCH_RETRY_INTERVAL = 60 + +LOG = log.getLogger(__name__) + + +class MonascaPublisher(publisher.ConfigPublisherBase): + """Publisher to publish samples to monasca using monasca-client. + + Example URL to place in pipeline.yaml: + - monasca://http://192.168.10.4:8070/v2.0 + """ + def __init__(self, conf, parsed_url): + super(MonascaPublisher, self).__init__(conf, parsed_url) + + # list to hold metrics to be published in batch (behaves like queue) + self.metric_queue = [] + self.time_of_last_batch_run = time.time() + + self.mon_client = mon_client.Client(self.conf, parsed_url) + self.mon_filter = MonascaDataFilter(self.conf) + + # add flush_batch function to periodic callables + periodic_callables = [ + # The function to run + any automatically provided + # positional and keyword arguments to provide to it + # everytime it is activated. + (self.flush_batch, (), {}), + ] + + if self.conf.monasca.retry_on_failure: + # list to hold metrics to be re-tried (behaves like queue) + self.retry_queue = [] + # list to store retry attempts for metrics in retry_queue + self.retry_counter = [] + + # add retry_batch function to periodic callables + periodic_callables.append((self.retry_batch, (), {})) + + if self.conf.monasca.archive_on_failure: + archive_path = self.conf.monasca.archive_path + if not os.path.exists(archive_path): + archive_path = self.conf.find_file(archive_path) + + self.archive_handler = publisher.get_publisher( + self.conf, + 'file://' + + str(archive_path), + 'ceilometer.sample.publisher') + + # start periodic worker + self.periodic_worker = periodics.PeriodicWorker(periodic_callables) + self.periodic_thread = threading.Thread( + target=self.periodic_worker.start) + self.periodic_thread.daemon = True + self.periodic_thread.start() + + def _publish_handler(self, func, metrics, batch=False): + """Handles publishing and exceptions that arise.""" + + try: + metric_count = len(metrics) + if batch: + func(**{'jsonbody': metrics}) + else: + func(**metrics[0]) + LOG.info('Successfully published %d metric(s)' % metric_count) + except mon_client.MonascaServiceException: + # Assuming atomicity of create or failure - meaning + # either all succeed or all fail in a batch + LOG.error('Metric create failed for %(count)d metric(s) with' + ' name(s) %(names)s ' % + ({'count': len(metrics), + 'names': ','.join([metric['name'] + for metric in metrics])})) + if self.conf.monasca.retry_on_failure: + # retry payload in case of internal server error(500), + # service unavailable error(503),bad gateway (502) or + # Communication Error + + # append failed metrics to retry_queue + LOG.debug('Adding metrics to retry queue.') + self.retry_queue.extend(metrics) + # initialize the retry_attempt for the each failed + # metric in retry_counter + self.retry_counter.extend( + [0 * i for i in range(metric_count)]) + else: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + except Exception: + LOG.info(traceback.format_exc()) + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + + def publish_samples(self, samples): + """Main method called to publish samples.""" + + for sample in samples: + metric = self.mon_filter.process_sample_for_monasca(sample) + # In batch mode, push metric to queue, + # else publish the metric + if self.conf.monasca.batch_mode: + LOG.debug('Adding metric to queue.') + self.metric_queue.append(metric) + else: + LOG.info('Publishing metric with name %(name)s and' + ' timestamp %(ts)s to endpoint.' % + ({'name': metric['name'], + 'ts': metric['timestamp']})) + + self._publish_handler(self.mon_client.metrics_create, [metric]) + + def is_batch_ready(self): + """Method to check if batch is ready to trigger.""" + + previous_time = self.time_of_last_batch_run + current_time = time.time() + elapsed_time = current_time - previous_time + + if elapsed_time >= self.conf.monasca.batch_timeout and len(self. + metric_queue) > 0: + LOG.info('Batch timeout exceeded, triggering batch publish.') + return True + else: + if len(self.metric_queue) >= self.conf.monasca.batch_count: + LOG.info('Batch queue full, triggering batch publish.') + return True + else: + return False + + @periodics.periodic(BATCH_POLLING_INTERVAL) + def flush_batch(self): + """Method to flush the queued metrics.""" + # print "flush batch... %s" % str(time.time()) + if self.is_batch_ready(): + # publish all metrics in queue at this point + batch_count = len(self.metric_queue) + + LOG.info("batch is ready: batch_count %s" % str(batch_count)) + + self._publish_handler(self.mon_client.metrics_create, + self.metric_queue[:batch_count], + batch=True) + + self.time_of_last_batch_run = time.time() + # slice queue to remove metrics that + # published with success or failed and got queued on + # retry queue + self.metric_queue = self.metric_queue[batch_count:] + + def is_retry_ready(self): + """Method to check if retry batch is ready to trigger.""" + + if len(self.retry_queue) > 0: + LOG.info('Retry queue has items, triggering retry.') + return True + else: + return False + + @periodics.periodic(BATCH_RETRY_INTERVAL) + def retry_batch(self): + """Method to retry the failed metrics.""" + # print "retry batch...%s" % str(time.time()) + if self.is_retry_ready(): + retry_count = len(self.retry_queue) + + # Iterate over the retry_queue to eliminate + # metrics that have maxed out their retry attempts + for ctr in moves.xrange(retry_count): + if self.retry_counter[ctr] > self.conf.\ + monasca.batch_max_retries: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples( + None, + [self.retry_queue[ctr]]) + LOG.info('Removing metric %s from retry queue.' + ' Metric retry maxed out retry attempts' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + + # Iterate over the retry_queue to retry the + # publish for each metric. + # If an exception occurs, the retry count for + # the failed metric is incremented. + # If the retry succeeds, remove the metric and + # the retry count from the retry_queue and retry_counter resp. + ctr = 0 + while ctr < len(self.retry_queue): + try: + LOG.info('Retrying metric publish from retry queue.') + self.mon_client.metrics_create(**self.retry_queue[ctr]) + # remove from retry queue if publish was success + LOG.info('Retrying metric %s successful,' + ' removing metric from retry queue.' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + except exc.ClientException: + LOG.error('Exception encountered in retry. ' + 'Batch will be retried in next attempt.') + # if retry failed, increment the retry counter + self.retry_counter[ctr] += 1 + ctr += 1 + + def flush_to_file(self): + # TODO(persist maxed-out metrics to file) + pass + + def publish_events(self, events): + """Send an event message for publishing + + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/monasca_data_filter.py b/ceilometer/publisher/monasca_data_filter.py new file mode 100644 index 0000000000..62dd82339f --- /dev/null +++ b/ceilometer/publisher/monasca_data_filter.py @@ -0,0 +1,229 @@ +# +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +from jsonpath_rw_ext import parser +from oslo_log import log +from oslo_utils import timeutils +import six +import yaml + +from ceilometer import sample as sample_util + +LOG = log.getLogger(__name__) + + +class UnableToLoadMappings(Exception): + pass + + +class NoMappingsFound(Exception): + pass + + +class CeiloscaMappingDefinitionException(Exception): + def __init__(self, message, definition_cfg): + super(CeiloscaMappingDefinitionException, self).__init__(message) + self.message = message + self.definition_cfg = definition_cfg + + def __str__(self): + return '%s %s: %s' % (self.__class__.__name__, + self.definition_cfg, self.message) + + +class MonascaDataFilter(object): + JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser() + + def __init__(self, conf): + self.conf = conf + self._mapping = {} + self._mapping = self._get_mapping() + + def _get_mapping(self): + with open(self.conf.monasca.monasca_mappings, 'r') as f: + try: + return yaml.safe_load(f) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ("Invalid YAML syntax in Monasca Data " + "Filter file %(file)s at line: " + "%(line)s, column: %(column)s." + % dict(file=self.conf.monasca.monasca_mappings, + line=mark.line + 1, + column=mark.column + 1)) + else: + errmsg = ("YAML error reading Monasca Data Filter " + "file %(file)s" % + dict(file=self.conf.monasca.monasca_mappings)) + LOG.error(errmsg) + raise UnableToLoadMappings(err.message) + + def _convert_timestamp(self, timestamp): + if isinstance(timestamp, datetime.datetime): + ts = timestamp + else: + ts = timeutils.parse_isotime(timestamp) + tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo)) + # convert timestamp to milli seconds as Monasca expects + return int(tdelta.total_seconds() * 1000) + + def _convert_to_sample(self, s): + return sample_util.Sample( + name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source')).as_dict() + + def get_value_for_nested_dictionary(self, lst, dct): + val = dct + for element in lst: + if isinstance(val, dict) and element in val: + val = val.get(element) + else: + return + return val + + def parse_jsonpath(self, field): + try: + parts = self.JSONPATH_RW_PARSER.parse(field) + except Exception as e: + raise CeiloscaMappingDefinitionException( + "Parse error in JSONPath specification " + "'%(jsonpath)s': %(err)s" + % dict(jsonpath=field, err=e)) + return parts + + def _get_value_metadata_for_key(self, sample_meta, meta_key): + """Get the data for the given key, supporting JSONPath""" + if isinstance(meta_key, dict): + # extract key and jsonpath + # If following convention, dict will have one and only one + # element of the form : + if len(meta_key.keys()) == 1: + mon_key = list(meta_key.keys())[0] + else: + # If no keys or more keys than one + raise CeiloscaMappingDefinitionException( + "Field definition format mismatch, should " + "have only one key:value pair. %(meta_key)s" % + {'meta_key': meta_key}, meta_key) + json_path = meta_key[mon_key] + parts = self.parse_jsonpath(json_path) + val_matches = parts.find(sample_meta) + if len(val_matches) > 0: + # resolve the find to the first match and get value + val = val_matches[0].value + if not isinstance(val, (str, six.text_type)) \ + and not isinstance(val, int): + # Don't support lists or dicts or ... + raise CeiloscaMappingDefinitionException( + "Metadata format mismatch, value " + "should be a simple string. %(valuev)s" % + {'valuev': val}, meta_key) + else: + val = 'None' + return mon_key, val + else: + # simple string + val = sample_meta.get(meta_key, None) + if val is not None: + return meta_key, val + else: + # one more attempt using a dotted notation + # TODO(joadavis) Deprecate this . notation code + # in favor of jsonpath + if len(meta_key.split('.')) > 1: + val = self.get_value_for_nested_dictionary( + meta_key.split('.'), sample_meta) + if val is not None: + return meta_key, val + else: + return meta_key, 'None' + else: + return meta_key, 'None' + + def process_sample_for_monasca(self, sample_obj): + if not self._mapping: + raise NoMappingsFound("Unable to process the sample") + + dimensions = {} + dimensions['datasource'] = 'ceilometer' + # control_plane, cluster and cloud_name can be None, but we use + # literal 'None' for such case + dimensions['control_plane'] = self.conf.monasca.control_plane or 'None' + dimensions['cluster'] = self.conf.monasca.cluster or 'None' + dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None' + if isinstance(sample_obj, sample_util.Sample): + sample = sample_obj.as_dict() + elif isinstance(sample_obj, dict): + if 'counter_name' in sample_obj: + sample = self._convert_to_sample(sample_obj) + else: + sample = sample_obj + + for dim in self._mapping['dimensions']: + val = sample.get(dim, None) + if val: + dimensions[dim] = val + else: + dimensions[dim] = 'None' + + sample_meta = sample.get('resource_metadata', None) + value_meta = {} + + meter_name = sample.get('name') or sample.get('counter_name') + if sample_meta: + for meta_key in self._mapping['metadata']['common']: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + if meter_name in self._mapping['metadata'].keys(): + for meta_key in self._mapping['metadata'][meter_name]: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + meter_value = sample.get('volume') or sample.get('counter_volume') + if meter_value is None: + meter_value = 0 + + metric = dict( + name=meter_name, + timestamp=self._convert_timestamp(sample['timestamp']), + value=meter_value, + dimensions=dimensions, + value_meta=value_meta, + ) + + LOG.debug("Generated metric with name %(name)s," + " timestamp %(timestamp)s, value %(value)s," + " dimensions %(dimensions)s" % + {'name': metric['name'], + 'timestamp': metric['timestamp'], + 'value': metric['value'], + 'dimensions': metric['dimensions']}) + + return metric diff --git a/ceilometer/tests/unit/publisher/test_monasca_data_filter.py b/ceilometer/tests/unit/publisher/test_monasca_data_filter.py new file mode 100644 index 0000000000..8e6dfcae3b --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_monasca_data_filter.py @@ -0,0 +1,512 @@ +# +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import mock +from oslo_utils import timeutils +from oslotest import base + +from ceilometer import monasca_opts +from ceilometer.publisher import monasca_data_filter as mdf +from ceilometer import sample +from ceilometer import service + + +class TestMonUtils(base.BaseTestCase): + def setUp(self): + super(TestMonUtils, self).setUp() + self.CONF = service.prepare_service([], []) + self.CONF.register_opts(list(monasca_opts.OPTS), + 'monasca') + self._field_mappings = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending'], + 'image': ['size', 'status', 'image_meta.base_url', + 'image_meta.base_url2', 'image_meta.base_url3', + 'image_meta.base_url4'], + 'image.delete': ['size', 'status'], + 'image.size': ['size', 'status'], + 'image.update': ['size', 'status'], + 'image.upload': ['size', 'status'], + 'instance': ['state', 'state_description'], + 'snapshot': ['status'], + 'snapshot.size': ['status'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + self._field_mappings_cinder = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending', + 'arbitrary_new_field'], + 'volume.create.end': + ['size', 'status', + {'metering.prn_name': + "$.metadata[?(@.key = 'metering.prn_name')].value"}, + {'metering.prn_type': + "$.metadata[?(@.key = 'metering.prn_type')].value"}, + 'volume_type', 'created_at', + 'host'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + self._field_mappings_bad_format = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending', + 'arbitrary_new_field'], + 'volume.create.end': + ['size', 'status', + {'metering.prn_name': + "$.metadata[?(@.key = 'metering.prn_name')].value", + 'metering.prn_type': + "$.metadata[?(@.key = 'metering.prn_type')].value"}, + 'volume_type', 'created_at', + 'host'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + def test_process_sample(self): + s = sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('dimensions')) + self.assertIsNotNone(r.get('value_meta')) + self.assertIsNotNone(r.get('value')) + self.assertEqual(s.user_id, r['dimensions'].get('user_id')) + self.assertEqual(s.project_id, r['dimensions'].get('project_id')) + self.assertEqual(s.resource_id, r['dimensions'].get('resource_id')) + # 2015-04-07T20:07:06.156986 compare upto millisec + monasca_ts = \ + timeutils.iso8601_from_timestamp(r['timestamp'] / 1000.0, + microsecond=True)[:23] + self.assertEqual(s.timestamp[:23], monasca_ts) + + def test_process_sample_field_mappings(self): + s = sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ) + + field_map = self._field_mappings + field_map['dimensions'].remove('project_id') + field_map['dimensions'].remove('user_id') + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[field_map]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertIsNone(r['dimensions'].get('project_id')) + self.assertIsNone(r['dimensions'].get('user_id')) + + def convert_dict_to_list(self, dct, prefix=None, outlst={}): + prefix = prefix + '.' if prefix else "" + for k, v in dct.items(): + if type(v) is dict: + self.convert_dict_to_list(v, prefix + k, outlst) + else: + if v is not None: + outlst[prefix + k] = v + else: + outlst[prefix + k] = 'None' + return outlst + + def test_process_sample_metadata(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 1500}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + + def test_process_sample_metadata_with_empty_data(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertEqual(s.source, r['dimensions']['source']) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + + def test_process_sample_metadata_with_extendedKey(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + self.assertEqual(r.get('value_meta')['image_meta.base_url'], + s.resource_metadata.get('image_meta') + ['base_url']) + self.assertEqual(r.get('value_meta')['image_meta.base_url2'], + s.resource_metadata.get('image_meta') + ['base_url2']) + self.assertEqual(r.get('value_meta')['image_meta.base_url3'], + str(s.resource_metadata.get('image_meta') + ['base_url3'])) + self.assertEqual(r.get('value_meta')['image_meta.base_url4'], + 'None') + + def test_process_sample_metadata_with_jsonpath(self): + """Test meter sample in a format produced by cinder.""" + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + # this "value: , key: " format is + # how cinder reports metadata + 'metadata': + [{'value': 'aaa0001', + 'key': 'metering.prn_name'}, + {'value': 'Cust001', + 'key': 'metering.prn_type'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 'aaa0001', + "Failed to extract a value " + "using specified jsonpath.") + + def test_process_sample_metadata_with_jsonpath_nomatch(self): + """Test meter sample in a format produced by cinder. + + Behavior when no matching element is found for the specified jsonpath + """ + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 'aaa0001', + 'key': 'metering.THISWONTMATCH'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 'None', "This metadata should fail to match " + "and then return 'None'.") + + def test_process_sample_metadata_with_jsonpath_value_not_str(self): + """Test where jsonpath is used but result is not a simple string""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': ['aaa0001', 'bbbb002'], + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + try: + # Don't assign to a variable, this should raise + data_filter.process_sample_for_monasca(s) + except mdf.CeiloscaMappingDefinitionException as e: + self.assertEqual( + 'Metadata format mismatch, value should be ' + 'a simple string. [\'aaa0001\', \'bbbb002\']', + e.message) + + def test_process_sample_metadata_with_jsonpath_value_is_int(self): + """Test meter sample where jsonpath result is an int.""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 13, + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 13, + "Unable to handle an int " + "through the jsonpath processing") + + def test_process_sample_metadata_with_jsonpath_bad_format(self): + """Test handling of definition that is not written correctly""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 13, + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the bad mapping + with mock.patch(to_patch, + side_effect=[self._field_mappings_bad_format]): + data_filter = mdf.MonascaDataFilter(self.CONF) + try: + # Don't assign to a variable as this should raise + data_filter.process_sample_for_monasca(s) + except mdf.CeiloscaMappingDefinitionException as e: + # Make sure we got the right kind of error + # Cannot check the whole message text, as python + # may reorder a dict when producing a string version + self.assertIn( + 'Field definition format mismatch, should ' + 'have only one key:value pair.', + e.message, + "Did raise exception but wrong message - %s" % e.message) diff --git a/ceilometer/tests/unit/publisher/test_monasca_publisher.py b/ceilometer/tests/unit/publisher/test_monasca_publisher.py new file mode 100755 index 0000000000..60095c27b4 --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_monasca_publisher.py @@ -0,0 +1,205 @@ +# +# Copyright 2015 Hewlett Packard +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/monasca.py +""" + +import datetime +import fixtures +import time + +import mock +from oslotest import base + +from ceilometer import monasca_client as mon_client +from ceilometer.publisher import monasca as mon_publisher +from ceilometer import sample +from ceilometer import service + + +class FakeResponse(object): + def __init__(self, status_code): + self.status_code = status_code + + +class TestMonascaPublisher(base.BaseTestCase): + + test_data = [ + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + field_mappings = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending'], + 'image': ['size', 'status'], + 'image.delete': ['size', 'status'], + 'image.size': ['size', 'status'], + 'image.update': ['size', 'status'], + 'image.upload': ['size', 'status'], + 'instance': ['state', 'state_description'], + 'snapshot': ['status'], + 'snapshot.size': ['status'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + @staticmethod + def create_side_effect(exception_type, test_exception): + def side_effect(*args, **kwargs): + if test_exception.pop(): + raise exception_type + else: + return FakeResponse(204) + return side_effect + + def setUp(self): + super(TestMonascaPublisher, self).setUp() + + self.CONF = service.prepare_service([], []) + self.parsed_url = mock.MagicMock() + + def tearDown(self): + # For some reason, cfg.CONF is registered a required option named + # auth_url after these tests run, which occasionally blocks test + # case test_event_pipeline_endpoint_requeue_on_failure, so we + # unregister it here. + self.CONF.reset() + # self.CONF.unregister_opt(cfg.StrOpt('service_auth_url'), + # group='monasca') + super(TestMonascaPublisher, self).tearDown() + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_publish(self, mapping_patch): + self.CONF.set_override('batch_mode', False, group='monasca') + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.return_value = FakeResponse(204) + publisher.publish_samples(self.test_data) + self.assertEqual(3, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_batch(self, mapping_patch): + self.CONF.set_override('batch_mode', True, group='monasca') + self.CONF.set_override('batch_count', 3, group='monasca') + self.CONF.set_override('batch_polling_interval', 1, group='monasca') + + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.return_value = FakeResponse(204) + publisher.publish_samples(self.test_data) + time.sleep(10) + self.assertEqual(1, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_batch_retry(self, mapping_patch): + self.CONF.set_override('batch_mode', True, group='monasca') + self.CONF.set_override('batch_count', 3, group='monasca') + self.CONF.set_override('batch_polling_interval', 1, group='monasca') + self.CONF.set_override('retry_on_failure', True, group='monasca') + # Constant in code for @periodicals, can't override + # self.CONF.set_override('retry_interval', 2, group='monasca') + self.CONF.set_override('batch_max_retries', 1, group='monasca') + + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + raise_http_error = [False, False, False, True] + mock_create.side_effect = self.create_side_effect( + mon_client.MonascaServiceException, + raise_http_error) + publisher.publish_samples(self.test_data) + time.sleep(60) + self.assertEqual(4, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_archival_on_failure(self, mapping_patch): + self.CONF.set_override('archive_on_failure', True, group='monasca') + self.CONF.set_override('batch_mode', False, group='monasca') + self.fake_publisher = mock.Mock() + + self.useFixture(fixtures.MockPatch( + 'ceilometer.publisher.file.FilePublisher', + return_value=self.fake_publisher)) + + publisher = mon_publisher.MonascaPublisher(self.CONF, + self.parsed_url) + publisher.mon_client = mock.MagicMock() + + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.side_effect = Exception + metrics_archiver = self.fake_publisher.publish_samples + publisher.publish_samples(self.test_data) + self.assertEqual(1, metrics_archiver.called) + self.assertEqual(3, metrics_archiver.call_count) diff --git a/ceilometer/tests/unit/test_monascaclient.py b/ceilometer/tests/unit/test_monascaclient.py new file mode 100644 index 0000000000..33ce536ff3 --- /dev/null +++ b/ceilometer/tests/unit/test_monascaclient.py @@ -0,0 +1,148 @@ +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock + +from oslo_utils import netutils +from oslotest import base + +from ceilometer import monasca_client +from ceilometer import service + +from monascaclient import exc +import tenacity + + +class TestMonascaClient(base.BaseTestCase): + def setUp(self): + super(TestMonascaClient, self).setUp() + + self.CONF = service.prepare_service([], []) + self.CONF.set_override('client_max_retries', 0, 'monasca') + self.mc = self._get_client() + + def tearDown(self): + # For some reason, cfg.CONF is registered a required option named + # auth_url after these tests run, which occasionally blocks test + # case test_event_pipeline_endpoint_requeue_on_failure, so we + # unregister it here. + self.CONF.reset() + # self.CONF.unregister_opt(cfg.StrOpt('service_auth_url'), + # group='monasca') + super(TestMonascaClient, self).tearDown() + + @mock.patch('monascaclient.client.Client') + def _get_client(self, monclient_mock): + return monasca_client.Client( + self.CONF, + netutils.urlsplit("http://127.0.0.1:8080")) + + @mock.patch('monascaclient.client.Client') + def test_client_url_correctness(self, monclient_mock): + mon_client = monasca_client.Client( + self.CONF, + netutils.urlsplit("monasca://https://127.0.0.1:8080")) + self.assertEqual("https://127.0.0.1:8080", mon_client._endpoint) + + def test_metrics_create(self): + with mock.patch.object(self.mc._mon_client.metrics, 'create', + side_effect=[True]) as create_patch: + self.mc.metrics_create() + + self.assertEqual(1, create_patch.call_count) + + def test_metrics_create_exception(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.InternalServerError, True])\ + as create_patch: + e = self.assertRaises(tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIsInstance(original_ex, + monasca_client.MonascaServiceException) + self.assertEqual(1, create_patch.call_count) + + def test_metrics_create_unprocessable_exception(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.UnprocessableEntity, True])\ + as create_patch: + self.assertRaises(monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) + self.assertEqual(1, create_patch.call_count) + + def test_no_retry_on_invalid_parameter(self): + self.CONF.set_override('client_max_retries', 2, 'monasca') + self.CONF.set_override('client_retry_interval', 1, 'monasca') + self.mc = self._get_client() + + def _check(exception): + expected_exc = monasca_client.MonascaInvalidParametersException + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exception, True] + ) as mocked_metrics_list: + self.assertRaises(expected_exc, self.mc.metrics_create) + self.assertEqual(1, mocked_metrics_list.call_count) + + _check(exc.http.UnprocessableEntity) + _check(exc.http.BadRequest) + + def test_max_retries_not_too_much(self): + def _check(configured, expected): + self.CONF.set_override('client_max_retries', configured, + 'monasca') + self.mc = self._get_client() + self.assertEqual(expected, self.mc._max_retries) + + _check(-1, 10) + _check(11, 10) + _check(5, 5) + _check(None, 1) + + def test_meaningful_exception_message(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.InternalServerError, + exc.http.UnprocessableEntity, + KeyError]): + e = self.assertRaises( + tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIn('Monasca service is unavailable', + str(original_ex)) + + e = self.assertRaises( + monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) + self.assertIn('Request cannot be handled by Monasca', + str(e)) + + e = self.assertRaises( + tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIn('An exception is raised from Monasca', + str(original_ex)) + + @mock.patch.object(monasca_client.Client, '_get_client') + def test_metrics_create_with_401(self, rc_patch): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.Unauthorized, True]): + self.assertRaises( + monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) diff --git a/doc/source/admin/telemetry-data-pipelines.rst b/doc/source/admin/telemetry-data-pipelines.rst index 30366d0489..48547ff675 100644 --- a/doc/source/admin/telemetry-data-pipelines.rst +++ b/doc/source/admin/telemetry-data-pipelines.rst @@ -213,6 +213,49 @@ The following customization options are available: default topic defined by ``metering_topic`` and ``event_topic`` options. This option can be used to support multiple consumers. +monasca +``````` + +The monasca publisher can be used to send measurements to the Monasca API, +where it will be stored with other metrics gathered by Monasca Agent. Data +is accessible through the Monasca API and be transformed like other Monasca +metrics. + +The pipeline sink is specified with a ``publishers:`` element of the form +``- monasca://https:///metrics/v2.0`` + +Monasca API connection information is configured in the ceilometer.conf +file in a [monasca] section:: + + [monasca] + auth_section = monasca_auth + enable_api_pagination = True + client_retry_interval = 60 + client_max_retries = 5 + monasca_mappings = + + [monasca_auth] + auth_url = https:///identity + auth_type = password + username = + password = + project_name = + project_domain_id = + user_domain_id = + verify = + region_name = + + +..note:: + The username specified should be for a Keystone user that has the + ``monasca_agent`` or ``monasca_user`` role enabled. For management purposes, + this may be the ceilometer user if the appropriate role is granted. + +For more detail and history of this publisher, see the +`Ceilosca Wiki `__ and +`monasca-ceilometer README +`__. + udp ``` diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst index 832c1d1def..7971dd087a 100644 --- a/doc/source/contributor/architecture.rst +++ b/doc/source/contributor/architecture.rst @@ -164,7 +164,7 @@ Publishing the data This figure shows how a sample can be published to multiple destinations. -Currently, processed data can be published using 8 different transports: +Currently, processed data can be published using different transport options: 1. gnocchi, which publishes samples/events to Gnocchi API; 2. notifier, a notification based publisher which pushes samples to a message @@ -174,8 +174,9 @@ Currently, processed data can be published using 8 different transports: 5. file, which publishes samples to a file with specified name and location; 6. zaqar, a multi-tenant cloud messaging and notification service for web and mobile developers; -7. https, which is http over SSL and targets a REST interface. -8. prometheus, which publishes samples to Prometheus Pushgateway +7. https, which is http over SSL and targets a REST interface; +8. prometheus, which publishes samples to Prometheus Pushgateway; +9. monasca, which publishes samples to the Monasca API. Storing/Accessing the data diff --git a/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf b/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf new file mode 100644 index 0000000000..de9f345e25 --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf @@ -0,0 +1,32 @@ +[DEFAULT] +debug = False + +### +# Skipped ceilometer-config-generator common sections +### + +[monasca] +auth_section = monasca_auth +monasca_mappings = /etc/ceilometer/monasca_field_definitions.yaml + +# NOTE: These are additional options which may be set +# retry_on_failure = False +# enable_api_pagination = True +# client_retry_interval = 60 +# client_max_retries = 5 + +[monasca_auth] +# NOTE: the values here may match the values in the [service_credentials] +# section +auth_url = https://:5000/v2.0 +password = +username = ceilometer +interface = internal +auth_type = password +# project_id may also be used +project_name = admin +project_domain_id = default +user_domain_id = default +region_name = RegionOne +# Specify a CA bundle to enable SSL connection +# verify = /somewhere/ca-bundle.pem diff --git a/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml b/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml new file mode 100644 index 0000000000..b86aefd78e --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml @@ -0,0 +1,44 @@ +# An example pipeline.yaml +# Reference https://docs.openstack.org/ceilometer/latest/admin/telemetry-measurements.html +# for other possible meters. +--- +sources: + - name: compute_source + interval: 30 + meters: + - "memory" + - "vcpus" + sinks: + - meter_sink + - name: network_source + interval: 30 + meters: + - "bandwidth" + sinks: + - meter_sink + - name: image_source + interval: 30 + meters: + - "image.size" + sinks: + - meter_sink + - name: volume_source + interval: 30 + meters: + - "volume.size" + - "snapshot.size" + sinks: + - meter_sink + - name: swift_source + interval: 3600 + meters: + - "storage.objects" + - "storage.objects.size" + - "storage.objects.containers" + sinks: + - meter_sink +sinks: + - name: meter_sink + transformers: + publishers: + - monasca://https://:8070/v2.0 diff --git a/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml b/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml new file mode 100644 index 0000000000..5274101745 --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml @@ -0,0 +1,58 @@ +dimensions: + - resource_id + - project_id + - user_id + - region + - type + - unit + - source +metadata: + common: + - event_type + - audit_period_beginning + - audit_period_ending + - availability_zone + memory: + - state + - memory_mb + - root_gb + - vcpus + - disk_gb + - ephemeral_gb + - host + - instance_flavor_id + - image_ref_url + - created_at + - deleted_at + - launched_at + vcpus: + - state + - memory_mb + - root_gb + - vcpus + - disk_gb + - ephemeral_gb + - host + - instance_flavor_id + - image_ref_url + - created_at + - deleted_at + - launched_at + image.size: + - size + - status + - is_public + - properties.image_type + snapshot.size: + - volume_size + - state + - status + - created_at + - host + volume.size: + - size + - volume_type + - status + - created_at + - host + - status diff --git a/lower-constraints.txt b/lower-constraints.txt index da538f7bf5..91046ffdfa 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -33,6 +33,7 @@ pysnmp==4.2.3 python-cinderclient==3.3.0 python-glanceclient==2.8.0 python-keystoneclient==3.15.0 +python-monascaclient==1.12.0 python-neutronclient==6.7.0 python-novaclient==9.1.0 python-swiftclient==3.2.0 diff --git a/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml b/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml new file mode 100644 index 0000000000..871025a7c2 --- /dev/null +++ b/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Include a publisher for the Monasca API. A ``monasca://`` pipeline sink + will send data to a Monasca instance, using credentials configured in + ceilometer.conf. This functionality was previously available in the + Ceilosca project + (https://github.com/openstack/monasca-ceilometer). diff --git a/requirements.txt b/requirements.txt index 46edc35604..8fedb54b53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,4 +38,5 @@ tooz[zake]>=1.47.0 # Apache-2.0 os-xenapi>=0.3.3 # Apache-2.0 oslo.cache>=1.26.0 # Apache-2.0 gnocchiclient>=7.0.0 # Apache-2.0 +python-monascaclient>=1.12.0 # Apache-2.0 python-zaqarclient>=1.3.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 9488926e39..fba735beb1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -207,6 +207,7 @@ ceilometer.sample.publisher = http = ceilometer.publisher.http:HttpPublisher prometheus = ceilometer.publisher.prometheus:PrometheusPublisher https = ceilometer.publisher.http:HttpPublisher + monasca = ceilometer.publisher.monasca:MonascaPublisher gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher zaqar = ceilometer.publisher.zaqar:ZaqarPublisher