Adds support to Mon publisher to archive on error
When publishing metrics to Monasca-api fails, the failed metrics need to be handled in some manner, otherwise risk data loss. Currently, Ceilometer notification agent does not support requeuing of samples on publisher error. This patch adds the ability to enable metrics archival (via conf) using FilePublisher in Ceilometer. This ability is switched off by default. Change-Id: I482d4466ae106e5292d12a768e8acd4a240f4049
This commit is contained in:
parent
652d414be2
commit
5d5dd50ecc
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -55,11 +56,16 @@ monpub_opts = [
|
|||
default=3,
|
||||
help='Maximum number of retry attempts on a publishing '
|
||||
'failure.'),
|
||||
cfg.StrOpt('failure_report_path',
|
||||
default='monclient_failures.txt',
|
||||
help='File report of samples that failed to publish to'
|
||||
' Monasca. These include samples that failed to '
|
||||
'publish on first attempt and failed samples that'
|
||||
cfg.BoolOpt('archive_on_failure',
|
||||
default=False,
|
||||
help='When turned on, archives metrics in file system when'
|
||||
'publish to Monasca fails or metric publish maxes out'
|
||||
'retry attempts.'),
|
||||
cfg.StrOpt('archive_path',
|
||||
default='mon_pub_failures.txt',
|
||||
help='File of metrics that failed to publish to '
|
||||
'Monasca. These include metrics that failed to '
|
||||
'publish on first attempt and failed metrics that'
|
||||
' maxed out their retries.'),
|
||||
]
|
||||
|
||||
|
@ -99,6 +105,14 @@ class MonascaPublisher(publisher.PublisherBase):
|
|||
interval=cfg.CONF.monasca.retry_interval,
|
||||
initial_delay=cfg.CONF.monasca.batch_polling_interval)
|
||||
|
||||
if cfg.CONF.monasca.archive_on_failure:
|
||||
archive_path = cfg.CONF.monasca.archive_path
|
||||
if not os.path.exists(archive_path):
|
||||
archive_path = cfg.CONF.find_file(archive_path)
|
||||
|
||||
self.archive_handler = publisher.get_publisher('file://' +
|
||||
str(archive_path))
|
||||
|
||||
def _publish_handler(self, func, metrics, batch=False):
|
||||
"""Handles publishing and exceptions that arise."""
|
||||
|
||||
|
@ -130,11 +144,11 @@ class MonascaPublisher(publisher.PublisherBase):
|
|||
self.retry_counter.extend(
|
||||
[0 * i for i in range(metric_count)])
|
||||
else:
|
||||
# TODO(flush metric to file)
|
||||
pass
|
||||
if hasattr(self, 'archive_handler'):
|
||||
self.archive_handler.publish_samples(None, metrics)
|
||||
except Exception:
|
||||
# TODO(flush metric to file)
|
||||
pass
|
||||
if hasattr(self, 'archive_handler'):
|
||||
self.archive_handler.publish_samples(None, metrics)
|
||||
|
||||
def publish_samples(self, context, samples):
|
||||
"""Main method called to publish samples."""
|
||||
|
@ -208,7 +222,10 @@ class MonascaPublisher(publisher.PublisherBase):
|
|||
# metrics that have maxed out their retry attempts
|
||||
for ctr in xrange(retry_count):
|
||||
if self.retry_counter[ctr] > cfg.CONF.monasca.max_retries:
|
||||
# TODO(persist maxed-out metrics to file)
|
||||
if hasattr(self, 'archive_handler'):
|
||||
self.archive_handler.publish_samples(
|
||||
None,
|
||||
[self.retry_queue[ctr]])
|
||||
LOG.debug(_('Removing metric %s from retry queue.'
|
||||
' Metric retry maxed out retry attempts') %
|
||||
self.retry_queue[ctr]['name'])
|
||||
|
@ -239,10 +256,6 @@ class MonascaPublisher(publisher.PublisherBase):
|
|||
self.retry_counter[ctr] += 1
|
||||
ctr += 1
|
||||
|
||||
def flush_to_file(self):
|
||||
# TODO(persist maxed-out metrics to file)
|
||||
pass
|
||||
|
||||
def publish_events(self, context, events):
|
||||
"""Send an event message for publishing
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import eventlet
|
|||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslotest import base
|
||||
from oslotest import mockpatch
|
||||
|
||||
from ceilometer import monasca_client as mon_client
|
||||
from ceilometer.publisher import monclient
|
||||
|
@ -107,6 +108,7 @@ class TestMonascaPublisher(base.BaseTestCase):
|
|||
def setUp(self):
|
||||
super(TestMonascaPublisher, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||
self.parsed_url = mock.MagicMock()
|
||||
ksclient.KSClient = mock.MagicMock()
|
||||
|
||||
|
@ -167,3 +169,25 @@ class TestMonascaPublisher(base.BaseTestCase):
|
|||
eventlet.sleep(5)
|
||||
self.assertEqual(4, mock_create.call_count)
|
||||
self.assertEqual(1, mapping_patch.called)
|
||||
|
||||
@mock.patch("ceilometer.publisher.monasca_data_filter."
|
||||
"MonascaDataFilter._get_mapping",
|
||||
side_effect=[field_mappings])
|
||||
def test_publisher_archival_on_failure(self, mapping_patch):
|
||||
self.CONF.set_override('archive_on_failure', True, group='monasca')
|
||||
self.fake_publisher = mock.Mock()
|
||||
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'ceilometer.publisher.file.FilePublisher',
|
||||
return_value=self.fake_publisher))
|
||||
|
||||
publisher = monclient.MonascaPublisher(self.parsed_url)
|
||||
publisher.mon_client = mock.MagicMock()
|
||||
|
||||
with mock.patch.object(publisher.mon_client,
|
||||
'metrics_create') as mock_create:
|
||||
mock_create.side_effect = Exception
|
||||
metrics_archiver = self.fake_publisher.publish_samples
|
||||
publisher.publish_samples(None, self.test_data)
|
||||
self.assertEqual(1, metrics_archiver.called)
|
||||
self.assertEqual(3, metrics_archiver.call_count)
|
||||
|
|
Loading…
Reference in New Issue