# # Copyright 2015 Hewlett Packard # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from futurist import periodics import os import threading import time from oslo_config import cfg from oslo_log import log import ceilometer from ceilometer.i18n import _ from ceilometer import monasca_client as mon_client from ceilometer import publisher from ceilometer.publisher.monasca_data_filter import MonascaDataFilter from monascaclient import exc monpub_opts = [ cfg.BoolOpt('batch_mode', default=True, help='Indicates whether samples are' ' published in a batch.'), cfg.IntOpt('batch_count', default=1000, help='Maximum number of samples in a batch.'), cfg.IntOpt('batch_timeout', default=15, help='Maximum time interval(seconds) after which ' 'samples are published in a batch.'), cfg.IntOpt('batch_polling_interval', default=5, help='Frequency of checking if batch criteria is met.'), cfg.BoolOpt('retry_on_failure', default=False, help='Indicates whether publisher retries publishing' 'sample in case of failure. Only a few error cases' 'are queued for a retry.'), cfg.IntOpt('retry_interval', default=60, help='Frequency of attempting a retry.'), cfg.IntOpt('max_retries', default=3, help='Maximum number of retry attempts on a publishing ' 'failure.'), cfg.BoolOpt('archive_on_failure', default=False, help='When turned on, archives metrics in file system when' 'publish to Monasca fails or metric publish maxes out' 'retry attempts.'), cfg.StrOpt('archive_path', default='mon_pub_failures.txt', help='File of metrics that failed to publish to ' 'Monasca. These include metrics that failed to ' 'publish on first attempt and failed metrics that' ' maxed out their retries.'), ] cfg.CONF.register_opts(monpub_opts, group='monasca') cfg.CONF.import_group('service_credentials', 'ceilometer.service') LOG = log.getLogger(__name__) class MonascaPublisher(publisher.PublisherBase): """Publisher to publish samples to monasca using monasca-client. Example URL to place in pipeline.yaml: - monclient://http://192.168.10.4:8070/v2.0 """ def __init__(self, parsed_url): super(MonascaPublisher, self).__init__(parsed_url) # list to hold metrics to be published in batch (behaves like queue) self.metric_queue = [] self.time_of_last_batch_run = time.time() self.mon_client = mon_client.Client(parsed_url) self.mon_filter = MonascaDataFilter() # add flush_batch function to periodic callables periodic_callables = [ # The function to run + any automatically provided # positional and keyword arguments to provide to it # everytime it is activated. (self.flush_batch, (), {}), ] if cfg.CONF.monasca.retry_on_failure: # list to hold metrics to be re-tried (behaves like queue) self.retry_queue = [] # list to store retry attempts for metrics in retry_queue self.retry_counter = [] # add retry_batch function to periodic callables periodic_callables.append((self.retry_batch, (), {})) if cfg.CONF.monasca.archive_on_failure: archive_path = cfg.CONF.monasca.archive_path if not os.path.exists(archive_path): archive_path = cfg.CONF.find_file(archive_path) self.archive_handler = publisher.get_publisher('file://' + str(archive_path)) # start periodic worker self.periodic_worker = periodics.PeriodicWorker(periodic_callables) self.periodic_thread = threading.Thread( target=self.periodic_worker.start) self.periodic_thread.daemon = True self.periodic_thread.start() def _publish_handler(self, func, metrics, batch=False): """Handles publishing and exceptions that arise.""" try: metric_count = len(metrics) if batch: func(**{'jsonbody': metrics}) else: func(**metrics[0]) LOG.debug(_('Successfully published %d metric(s)') % metric_count) except mon_client.MonascaServiceException: # Assuming atomicity of create or failure - meaning # either all succeed or all fail in a batch LOG.error(_('Metric create failed for %(count)d metric(s) with' ' name(s) %(names)s ') % ({'count': len(metrics), 'names': ','.join([metric['name'] for metric in metrics])})) if cfg.CONF.monasca.retry_on_failure: # retry payload in case of internal server error(500), # service unavailable error(503),bad gateway (502) or # Communication Error # append failed metrics to retry_queue LOG.debug(_('Adding metrics to retry queue.')) self.retry_queue.extend(metrics) # initialize the retry_attempt for the each failed # metric in retry_counter self.retry_counter.extend( [0 * i for i in range(metric_count)]) else: if hasattr(self, 'archive_handler'): self.archive_handler.publish_samples(None, metrics) except Exception: if hasattr(self, 'archive_handler'): self.archive_handler.publish_samples(None, metrics) def publish_samples(self, samples): """Main method called to publish samples.""" for sample in samples: metric = self.mon_filter.process_sample_for_monasca(sample) # In batch mode, push metric to queue, # else publish the metric if cfg.CONF.monasca.batch_mode: LOG.debug(_('Adding metric to queue.')) self.metric_queue.append(metric) else: LOG.debug(_('Publishing metric with name %(name)s and' ' timestamp %(ts)s to endpoint.') % ({'name': metric['name'], 'ts': metric['timestamp']})) self._publish_handler(self.mon_client.metrics_create, [metric]) def is_batch_ready(self): """Method to check if batch is ready to trigger.""" previous_time = self.time_of_last_batch_run current_time = time.time() elapsed_time = current_time - previous_time if elapsed_time >= cfg.CONF.monasca.batch_timeout and len(self. metric_queue) > 0: LOG.debug(_('Batch timeout exceeded, triggering batch publish.')) return True else: if len(self.metric_queue) >= cfg.CONF.monasca.batch_count: LOG.debug(_('Batch queue full, triggering batch publish.')) return True else: return False @periodics.periodic(cfg.CONF.monasca.batch_polling_interval) def flush_batch(self): """Method to flush the queued metrics.""" # print "flush batch... %s" % str(time.time()) if self.is_batch_ready(): # publish all metrics in queue at this point batch_count = len(self.metric_queue) self._publish_handler(self.mon_client.metrics_create, self.metric_queue[:batch_count], batch=True) self.time_of_last_batch_run = time.time() # slice queue to remove metrics that # published with success or failed and got queued on # retry queue self.metric_queue = self.metric_queue[batch_count:] def is_retry_ready(self): """Method to check if retry batch is ready to trigger.""" if len(self.retry_queue) > 0: LOG.debug(_('Retry queue has items, triggering retry.')) return True else: return False @periodics.periodic(cfg.CONF.monasca.retry_interval) def retry_batch(self): """Method to retry the failed metrics.""" # print "retry batch...%s" % str(time.time()) if self.is_retry_ready(): retry_count = len(self.retry_queue) # Iterate over the retry_queue to eliminate # metrics that have maxed out their retry attempts for ctr in xrange(retry_count): if self.retry_counter[ctr] > cfg.CONF.monasca.max_retries: if hasattr(self, 'archive_handler'): self.archive_handler.publish_samples( None, [self.retry_queue[ctr]]) LOG.debug(_('Removing metric %s from retry queue.' ' Metric retry maxed out retry attempts') % self.retry_queue[ctr]['name']) del self.retry_queue[ctr] del self.retry_counter[ctr] # Iterate over the retry_queue to retry the # publish for each metric. # If an exception occurs, the retry count for # the failed metric is incremented. # If the retry succeeds, remove the metric and # the retry count from the retry_queue and retry_counter resp. ctr = 0 while ctr < len(self.retry_queue): try: LOG.debug(_('Retrying metric publish from retry queue.')) self.mon_client.metrics_create(**self.retry_queue[ctr]) # remove from retry queue if publish was success LOG.debug(_('Retrying metric %s successful,' ' removing metric from retry queue.') % self.retry_queue[ctr]['name']) del self.retry_queue[ctr] del self.retry_counter[ctr] except exc.BaseException: LOG.error(_('Exception encountered in retry. ' 'Batch will be retried in next attempt.')) # if retry failed, increment the retry counter self.retry_counter[ctr] += 1 ctr += 1 def flush_to_file(self): # TODO(persist maxed-out metrics to file) pass def publish_events(self, events): """Send an event message for publishing :param events: events from pipeline after transformation """ raise ceilometer.NotImplementedError