gnocchi: batch measurements

This change leverages the Gnocchi Batch API by using it
in the Ceilometer dispatcher for measurements.

It introduces a small behavior change in the dispatcher.
The resource metadata update is now decorrelated from measures posting.
So, even if posting measurements fail, the resource metadata will
be updated.

Change-Id: I140e7301a31bbee50c2c8bc3ff2d78925680a573
This commit is contained in:
Mehdi Abaakouk 2016-03-08 10:21:51 +01:00
parent 6a0f84dcbb
commit 4a451fac0e
3 changed files with 223 additions and 162 deletions

View File

@ -16,11 +16,13 @@ from collections import defaultdict
from hashlib import md5
import itertools
import operator
import re
import threading
import uuid
from gnocchiclient import client
from gnocchiclient import exceptions as gnocchi_exc
from gnocchiclient import utils as gnocchi_utils
from keystoneauth1 import session as ka_session
from oslo_config import cfg
from oslo_log import log
@ -70,17 +72,6 @@ def cache_key_mangler(key):
return uuid.uuid5(CACHE_NAMESPACE, key).hex
def log_and_ignore_unexpected_workflow_error(func):
def log_and_ignore(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
except gnocchi_exc.ClientException as e:
LOG.error(six.text_type(e))
except Exception as e:
LOG.error(six.text_type(e), exc_info=True)
return log_and_ignore
class ResourcesDefinitionException(Exception):
def __init__(self, message, definition_cfg):
msg = '%s %s: %s' % (self.__class__.__name__, definition_cfg, message)
@ -304,79 +295,130 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
resource_grouped_samples = itertools.groupby(
data, key=operator.itemgetter('resource_id'))
gnocchi_data = {}
measures = {}
stats = dict(measures=0, resources=0, metrics=0)
for resource_id, samples_of_resource in resource_grouped_samples:
stats['resources'] += 1
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
self._process_resource(resource_id, metric_grouped_samples)
# NOTE(sileht): We convert resource id to Gnocchi format
# because batch_resources_metrics_measures exception
# returns this id and not the ceilometer one
gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
res_info = gnocchi_data[gnocchi_id] = {}
for metric_name, samples in metric_grouped_samples:
stats['metrics'] += 1
@log_and_ignore_unexpected_workflow_error
def _process_resource(self, resource_id, metric_grouped_samples):
resource_extra = {}
for metric_name, samples in metric_grouped_samples:
samples = list(samples)
rd = self._get_resource_definition(metric_name)
if rd is None:
LOG.warning("metric %s is not handled by gnocchi" %
metric_name)
samples = list(samples)
rd = self._get_resource_definition(metric_name)
if rd is None:
LOG.warning(_LW("metric %s is not handled by Gnocchi") %
metric_name)
continue
if rd.cfg.get("ignore"):
continue
res_info['resource_type'] = rd.cfg['resource_type']
res_info.setdefault("resource", {}).update({
"id": resource_id,
"user_id": samples[0]['user_id'],
"project_id": samples[0]['project_id'],
"metrics": rd.metrics,
})
for sample in samples:
res_info.setdefault("resource_extra", {}).update(
rd.attributes(sample))
m = measures.setdefault(gnocchi_id, {}).setdefault(
metric_name, [])
m.append({'timestamp': sample['timestamp'],
'value': sample['counter_volume']})
stats['measures'] += len(measures[gnocchi_id][metric_name])
res_info["resource"].update(res_info["resource_extra"])
try:
self.batch_measures(measures, gnocchi_data, stats)
except gnocchi_exc.ClientException as e:
LOG.error(six.text_type(e))
except Exception as e:
LOG.error(six.text_type(e), exc_info=True)
for gnocchi_id, info in gnocchi_data.items():
resource = info["resource"]
resource_type = info["resource_type"]
resource_extra = info["resource_extra"]
if not resource_extra:
continue
if rd.cfg.get("ignore"):
continue
resource_type = rd.cfg['resource_type']
resource = {
"id": resource_id,
"user_id": samples[0]['user_id'],
"project_id": samples[0]['project_id'],
"metrics": rd.metrics,
}
measures = []
for sample in samples:
resource_extra.update(rd.attributes(sample))
measures.append({'timestamp': sample['timestamp'],
'value': sample['counter_volume']})
resource.update(resource_extra)
retry = True
try:
self._gnocchi.metric.add_measures(metric_name, measures,
resource_id)
except gnocchi_exc.ResourceNotFound:
self._if_not_cached("create", resource_type, resource,
self._create_resource)
self._if_not_cached("update", resource_type, resource,
self._update_resource, resource_extra)
except gnocchi_exc.ClientException as e:
LOG.error(six.text_type(e))
except Exception as e:
LOG.error(six.text_type(e), exc_info=True)
except gnocchi_exc.MetricNotFound:
metric = {'resource_id': resource['id'],
'name': metric_name}
metric.update(rd.metrics[metric_name])
RE_UNKNOW_METRICS = re.compile("Unknown metrics: (.*) \(HTTP 400\)")
RE_UNKNOW_METRICS_LIST = re.compile("([^/ ,]*)/([^,]*)")
def batch_measures(self, measures, resource_infos, stats):
# NOTE(sileht): We don't care about error here, we want
# resources metadata always been updated
try:
self._gnocchi.metric.batch_resources_metrics_measures(measures)
except gnocchi_exc.BadRequest as e:
m = self.RE_UNKNOW_METRICS.match(six.text_type(e))
if m is None:
raise
# NOTE(sileht): Create all missing resources and metrics
metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1))
gnocchi_ids_freshly_handled = set()
for gnocchi_id, metric_name in metric_list:
if gnocchi_id in gnocchi_ids_freshly_handled:
continue
resource = resource_infos[gnocchi_id]['resource']
resource_type = resource_infos[gnocchi_id]['resource_type']
try:
self._gnocchi.metric.create(metric)
except gnocchi_exc.NamedMetricAlreadyExists:
# NOTE(sileht): metric created in the meantime
pass
else:
retry = False
self._if_not_cached("create", resource_type, resource,
self._create_resource)
except gnocchi_exc.ResourceAlreadyExists:
metric = {'resource_id': resource['id'],
'name': metric_name}
metric.update(resource["metrics"][metric_name])
try:
self._gnocchi.metric.create(metric)
except gnocchi_exc.NamedMetricAlreadyExists:
# NOTE(sileht): metric created in the meantime
pass
except gnocchi_exc.ClientException as e:
LOG.error(six.text_type(e))
# We cannot post measures for this metric
del measures[gnocchi_id][metric_name]
if not measures[gnocchi_id]:
del measures[gnocchi_id]
except gnocchi_exc.ClientException as e:
LOG.error(six.text_type(e))
# We cannot post measures for this resource
del measures[gnocchi_id]
gnocchi_ids_freshly_handled.add(gnocchi_id)
else:
gnocchi_ids_freshly_handled.add(gnocchi_id)
if retry:
self._gnocchi.metric.add_measures(metric_name, measures,
resource_id)
LOG.debug("Measure posted on metric %s of resource %s",
metric_name, resource_id)
# NOTE(sileht): we have created missing resources/metrics,
# now retry to post measures
self._gnocchi.metric.batch_resources_metrics_measures(measures)
if resource_extra:
self._if_not_cached("update", resource_type, resource,
self._update_resource, resource_extra)
# FIXME(sileht): take care of measures removed in stats
LOG.debug("%(measures)d measures posted against %(metrics)d "
"metrics through %(resources)d resources", stats)
def _create_resource(self, resource_type, resource):
try:
self._gnocchi.resource.create(resource_type, resource)
LOG.debug('Resource %s created', resource["id"])
except gnocchi_exc.ResourceAlreadyExists:
# NOTE(sileht): resource created in the meantime
pass
self._gnocchi.resource.create(resource_type, resource)
LOG.debug('Resource %s created', resource["id"])
def _update_resource(self, resource_type, resource, resource_extra):
self._gnocchi.resource.update(resource_type,
@ -389,6 +431,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
if self.cache:
cache_key = resource['id']
attribute_hash = self._check_resource_cache(cache_key, resource)
hit = False
if attribute_hash:
with self._gnocchi_resource_lock[cache_key]:
# NOTE(luogangyi): there is a possibility that the
@ -400,11 +443,15 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
method(resource_type, resource, *args, **kwargs)
self.cache.set(cache_key, attribute_hash)
else:
hit = True
LOG.debug('resource cache recheck hit for '
'%s %s', operation, cache_key)
self._gnocchi_resource_lock.pop(cache_key, None)
else:
hit = True
LOG.debug('Resource cache hit for %s %s', operation, cache_key)
if hit and operation == "create":
raise gnocchi_exc.ResourceAlreadyExists()
else:
method(resource_type, resource, *args, **kwargs)

View File

@ -19,6 +19,7 @@ import os
import uuid
from gnocchiclient import exceptions as gnocchi_exc
from gnocchiclient import utils as gnocchi_utils
import mock
from oslo_config import fixture as config_fixture
from oslo_utils import fileutils
@ -128,36 +129,29 @@ class DispatcherTest(base.BaseTestCase):
self.assertEqual(0, len(d.resources_definition))
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
'._process_resource')
def _do_test_activity_filter(self, expected_samples,
fake_process_resource):
def assert_samples(resource_id, metric_grouped_samples):
samples = []
for metric_name, s in metric_grouped_samples:
samples.extend(list(s))
self.assertEqual(expected_samples, samples)
fake_process_resource.side_effect = assert_samples
'._if_not_cached')
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
'.batch_measures')
def _do_test_activity_filter(self, expected_measures, fake_batch, __):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(self.samples)
fake_process_resource.assert_called_with(self.resource_id,
mock.ANY)
fake_batch.assert_called_with(
mock.ANY, mock.ANY,
{'metrics': 1, 'resources': 1, 'measures': expected_measures})
def test_activity_filter_match_project_id(self):
self.samples[0]['project_id'] = (
'a2d42c23-d518-46b6-96ab-3fba2e146859')
self._do_test_activity_filter([self.samples[1]])
self._do_test_activity_filter(1)
def test_activity_filter_match_swift_event(self):
self.samples[0]['counter_name'] = 'storage.api.request'
self.samples[0]['resource_id'] = 'a2d42c23-d518-46b6-96ab-3fba2e146859'
self._do_test_activity_filter([self.samples[1]])
self._do_test_activity_filter(1)
def test_activity_filter_nomatch(self):
self._do_test_activity_filter(self.samples)
self._do_test_activity_filter(2)
class MockResponse(mock.NonCallableMock):
@ -251,30 +245,36 @@ class DispatcherWorkflowTest(base.BaseTestCase,
resource_type='ipmi')),
]
worflow_scenarios = [
('normal_workflow', dict(measure=204, post_resource=None, metric=None,
measure_retry=None, patch_resource=204)),
('new_resource', dict(measure=404, post_resource=204, metric=None,
measure_retry=204, patch_resource=204)),
('new_resource_fail', dict(measure=404, post_resource=500, metric=None,
measure_retry=None, patch_resource=None)),
('resource_update_fail', dict(measure=204, post_resource=None,
metric=None, measure_retry=None,
patch_resource=500)),
('new_metric', dict(measure=404, post_resource=None, metric=204,
measure_retry=204, patch_resource=204)),
('new_metric_fail', dict(measure=404, post_resource=None, metric=500,
measure_retry=None, patch_resource=None)),
('retry_fail', dict(measure=404, post_resource=409, metric=None,
measure_retry=500, patch_resource=None)),
('measure_fail', dict(measure=500, post_resource=None, metric=None,
measure_retry=None, patch_resource=None)),
default_workflow = dict(resource_exists=True,
metric_exists=True,
post_measure_fail=False,
create_resource_fail=False,
create_metric_fail=False,
update_resource_fail=False,
retry_post_measures_fail=False)
workflow_scenarios = [
('normal_workflow', {}),
('new_resource', dict(resource_exists=False)),
('new_resource_fail', dict(resource_exists=False,
create_resource_fail=True)),
('resource_update_fail', dict(update_resource_fail=True)),
('new_metric', dict(metric_exists=False)),
('new_metric_fail', dict(metric_exists=False,
create_metric_fail=True)),
('retry_fail', dict(resource_exists=False,
retry_post_measures_fail=True)),
('measure_fail', dict(post_measure_fail=True)),
]
@classmethod
def generate_scenarios(cls):
workflow_scenarios = []
for name, wf_change in cls.workflow_scenarios:
wf = cls.default_workflow.copy()
wf.update(wf_change)
workflow_scenarios.append((name, wf))
cls.scenarios = testscenarios.multiply_scenarios(cls.sample_scenarios,
cls.worflow_scenarios)
workflow_scenarios)
def setUp(self):
super(DispatcherWorkflowTest, self).setUp()
@ -314,25 +314,27 @@ class DispatcherWorkflowTest(base.BaseTestCase,
# encode the resource_id
resource_id = self.sample['resource_id'] # .replace("/", "%2F"),
metric_name = self.sample['counter_name']
gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
expected_calls = [
mock.call.capabilities.list(),
mock.call.metric.add_measures(metric_name,
self.measures_attributes,
resource_id)]
mock.call.metric.batch_resources_metrics_measures(
{gnocchi_id: {metric_name: self.measures_attributes}})
]
expected_debug = [
mock.call('gnocchi project found: %s',
'a2d42c23-d518-46b6-96ab-3fba2e146859'),
]
add_measures_side_effect = []
if self.measure == 404 and self.post_resource:
add_measures_side_effect += [
gnocchi_exc.ResourceNotFound(404)]
elif self.measure == 404 and self.metric:
add_measures_side_effect += [
gnocchi_exc.MetricNotFound(404)]
elif self.measure == 500:
add_measures_side_effect += [Exception('boom!')]
if self.post_resource:
measures_posted = False
batch_side_effect = []
if self.post_measure_fail:
batch_side_effect += [Exception('boom!')]
elif not self.resource_exists or not self.metric_exists:
batch_side_effect += [
gnocchi_exc.BadRequest(
400, "Unknown metrics: %s/%s" % (gnocchi_id,
metric_name))]
attributes = self.postable_attributes.copy()
attributes.update(self.patchable_attributes)
attributes['id'] = self.sample['resource_id']
@ -340,63 +342,75 @@ class DispatcherWorkflowTest(base.BaseTestCase,
for metric_name in self.metric_names)
expected_calls.append(mock.call.resource.create(
self.resource_type, attributes))
if self.post_resource == 409:
if self.create_resource_fail:
fakeclient.resource.create.side_effect = [Exception('boom!')]
elif self.resource_exists:
fakeclient.resource.create.side_effect = [
gnocchi_exc.ResourceAlreadyExists(409)]
elif self.post_resource == 500:
fakeclient.resource.create.side_effect = [Exception('boom!')]
if self.metric:
expected_calls.append(mock.call.metric.create({
'name': self.sample['counter_name'],
'resource_id': resource_id}))
if self.metric == 409:
fakeclient.metric.create.side_effect = [
gnocchi_exc.NamedMetricAreadyExists(409)]
elif self.metric == 500:
fakeclient.metric.create.side_effect = [Exception('boom!')]
expected_calls.append(mock.call.metric.create({
'name': self.sample['counter_name'],
'resource_id': resource_id}))
if self.create_metric_fail:
fakeclient.metric.create.side_effect = [Exception('boom!')]
elif self.metric_exists:
fakeclient.metric.create.side_effect = [
gnocchi_exc.NamedMetricAreadyExists(409)]
else:
fakeclient.metric.create.side_effect = [None]
else: # not resource_exists
expected_debug.append(mock.call(
'Resource %s created', self.sample['resource_id']))
if not self.create_resource_fail and not self.create_metric_fail:
expected_calls.append(
mock.call.metric.batch_resources_metrics_measures(
{gnocchi_id: {metric_name: self.measures_attributes}})
)
if self.retry_post_measures_fail:
batch_side_effect += [Exception('boom!')]
else:
measures_posted = True
if self.measure_retry:
expected_calls.append(mock.call.metric.add_measures(
metric_name,
self.measures_attributes,
resource_id))
if self.measure_retry == 204:
add_measures_side_effect += [None]
elif self.measure_retry == 500:
add_measures_side_effect += [
Exception('boom!')]
else:
add_measures_side_effect += [None]
measures_posted = True
if self.patch_resource and self.patchable_attributes:
if measures_posted:
batch_side_effect += [None]
expected_debug.append(
mock.call("%(measures)d measures posted against %(metrics)d "
"metrics through %(resources)d resources", dict(
measures=len(self.measures_attributes),
metrics=1, resources=1))
)
if self.patchable_attributes:
expected_calls.append(mock.call.resource.update(
self.resource_type, resource_id,
self.patchable_attributes))
if self.patch_resource == 500:
if self.update_resource_fail:
fakeclient.resource.update.side_effect = [Exception('boom!')]
else:
expected_debug.append(mock.call(
'Resource %s updated', self.sample['resource_id']))
fakeclient.metric.add_measures.side_effect = add_measures_side_effect
batch = fakeclient.metric.batch_resources_metrics_measures
batch.side_effect = batch_side_effect
self.dispatcher.record_metering_data([self.sample])
# Check that the last log message is the expected one
if (self.measure == 500 or self.measure_retry == 500 or
self.metric == 500 or self.post_resource == 500 or
(self.patch_resource == 500 and self.patchable_attributes)):
if (self.post_measure_fail or self.create_metric_fail
or self.create_resource_fail
or self.retry_post_measures_fail
or (self.update_resource_fail and self.patchable_attributes)):
logger.error.assert_called_with('boom!', exc_info=True)
elif self.patch_resource == 204 and self.patchable_attributes:
logger.debug.assert_called_with(
'Resource %s updated', self.sample['resource_id'])
else:
self.assertEqual(0, logger.error.call_count)
elif self.measure == 200:
logger.debug.assert_called_with(
"Measure posted on metric %s of resource %s",
self.sample['counter_name'],
self.sample['resource_id'])
self.assertEqual(0, logger.error.call_count)
self.assertEqual(expected_calls, fakeclient.mock_calls)
self.assertEqual(expected_debug, logger.debug.mock_calls)
DispatcherWorkflowTest.generate_scenarios()

View File

@ -23,7 +23,7 @@ overtest>=0.10.0 # Apache-2.0
psycopg2>=2.5 # LGPL/ZPL
pylint==1.4.5 # GNU GPL v2
pymongo!=3.1,>=3.0.2 # Apache-2.0
gnocchiclient>=2.1.0 # Apache-2.0
gnocchiclient>=2.2.0 # Apache-2.0
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD
sphinxcontrib-httpdomain # BSD