gnocchi: reduce the number of patch to gnocchi API

This refactor the processing of the samples to
more clearly separation the resource creation/update
and the measurements posting.

This refactor have permit to do only on PATCH of resource
per resource received into samples.

So, more samples are present in the received message more this code
is efficient.

Note, the workflow have changing, now resource are updated even
the posting of the measurements fail.

Change-Id: I01b758a13388da34636dc983a6c8c1550046d419
This commit is contained in:
Mehdi Abaakouk 2015-07-16 16:11:32 +02:00
parent 29d3cd405b
commit 8f4b414245
2 changed files with 84 additions and 87 deletions

View File

@ -320,89 +320,79 @@ class GnocchiDispatcher(dispatcher.Base):
data, key=operator.itemgetter('resource_id'))
for resource_id, samples_of_resource in resource_grouped_samples:
resource_need_to_be_updated = True
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
for metric_name, samples in metric_grouped_samples:
samples = list(samples)
rd = self._get_resource_definition(metric_name)
if rd:
self._process_samples(rd, resource_id, metric_name,
samples,
resource_need_to_be_updated)
else:
LOG.warn("metric %s is not handled by gnocchi" %
metric_name)
# FIXME(sileht): Does it reasonable to skip the resource
# update here ? Does differents kind of counter_name
# can have different metadata set ?
# (ie: one have only flavor_id, and an other one have only
# image_ref ?)
#
# resource_need_to_be_updated = False
self._process_resource(resource_id, metric_grouped_samples)
@log_and_ignore_unexpected_workflow_error
def _process_samples(self, resource_def, resource_id, metric_name, samples,
resource_need_to_be_updated):
resource_type = resource_def.cfg['resource_type']
measure_attributes = [{'timestamp': sample['timestamp'],
'value': sample['counter_volume']}
for sample in samples]
def _process_resource(self, resource_id, metric_grouped_samples):
# TODO(sileht): Any HTTP 50X/401 error is just logged and this method
# stop, perhaps we can be smarter and retry later in case of 50X and
# directly in case of 401. A gnocchiclient would help a lot for the
# latest.
resource_extra = {}
for metric_name, samples in metric_grouped_samples:
samples = list(samples)
rd = self._get_resource_definition(metric_name)
if rd is None:
LOG.warn("metric %s is not handled by gnocchi" %
metric_name)
continue
resource_type = rd.cfg['resource_type']
resource = {
"id": resource_id,
"user_id": samples[0]['user_id'],
"project_id": samples[0]['project_id'],
"metrics": rd.metrics(),
}
measures = []
for sample in samples:
resource_extra.update(rd.attributes(sample))
measures.append({'timestamp': sample['timestamp'],
'value': sample['counter_volume']})
resource.update(resource_extra)
try:
self._post_measure(resource_type, resource_id, metric_name,
measure_attributes)
except NoSuchMetric:
# NOTE(sileht): we try first to create the resource, because
# they more chance that the resource doesn't exists than the metric
# is missing, the should be reduce the number of resource API call
resource_attributes = self._get_resource_attributes(
resource_def, resource_id, metric_name, samples)
try:
self._create_resource(resource_type, resource_id,
resource_attributes)
except ResourceAlreadyExists:
self._post_measure(resource_type, resource_id, metric_name,
measures)
except NoSuchMetric:
# TODO(sileht): Make gnocchi smarter to be able to detect 404
# for 'resource doesn't exist' and for 'metric doesn't exist'
# https://bugs.launchpad.net/gnocchi/+bug/1476186
self._ensure_resource_and_metric(resource_type, resource,
metric_name)
try:
archive_policy = (resource_def.metrics()[metric_name])
self._create_metric(resource_type, resource_id,
metric_name, archive_policy)
except MetricAlreadyExists:
# NOTE(sileht): Just ignore the metric have been created in
# the meantime.
pass
else:
# No need to update it we just created it
# with everything we need
resource_need_to_be_updated = False
self._post_measure(resource_type, resource_id,
metric_name, measures)
except NoSuchMetric:
LOG.error(_LE("Fail to post measures for "
"%(resource_id)s/%(metric_name)s") %
dict(resource_id=resource_id,
metric_name=metric_name))
# NOTE(sileht): we retry to post the measure but if it fail we
# don't catch the exception to just log it and continue to process
# other samples
self._post_measure(resource_type, resource_id, metric_name,
measure_attributes)
if resource_extra:
self._update_resource(resource_type, resource_id, resource_extra)
if resource_need_to_be_updated:
resource_attributes = self._get_resource_attributes(
resource_def, resource_id, metric_name, samples,
for_update=True)
if resource_attributes:
self._update_resource(resource_type, resource_id,
resource_attributes)
def _get_resource_attributes(self, resource_def, resource_id, metric_name,
samples, for_update=False):
# FIXME(sileht): Should I merge attibutes of all samples ?
# Or keep only the last one is sufficient ?
attributes = resource_def.attributes(samples[-1])
if not for_update:
attributes["id"] = resource_id
attributes["user_id"] = samples[-1]['user_id']
attributes["project_id"] = samples[-1]['project_id']
attributes["metrics"] = resource_def.metrics()
return attributes
def _ensure_resource_and_metric(self, resource_type, resource,
metric_name):
try:
self._create_resource(resource_type, resource)
except ResourceAlreadyExists:
try:
archive_policy = resource['metrics'][metric_name]
self._create_metric(resource_type, resource['id'],
metric_name, archive_policy)
except MetricAlreadyExists:
# NOTE(sileht): Just ignore the metric have been
# created in the meantime.
pass
def _post_measure(self, resource_type, resource_id, metric_name,
measure_attributes):
@ -432,33 +422,32 @@ class GnocchiDispatcher(dispatcher.Base):
LOG.debug("Measure posted on metric %s of resource %s",
metric_name, resource_id)
def _create_resource(self, resource_type, resource_id,
resource_attributes):
def _create_resource(self, resource_type, resource):
r = self.gnocchi_api.post("%s/v1/resource/%s"
% (self.gnocchi_url, resource_type),
headers=self._get_headers(),
data=json.dumps(resource_attributes))
data=json.dumps(resource))
if r.status_code == 409:
LOG.debug("Resource %s already exists", resource_id)
LOG.debug("Resource %s already exists", resource['id'])
raise ResourceAlreadyExists
elif int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(
_("Resource %(resource_id)s creation failed with "
"status: %(status_code)d: %(msg)s") %
{'resource_id': resource_id,
{'resource_id': resource['id'],
'status_code': r.status_code,
'msg': r.text})
else:
LOG.debug("Resource %s created", resource_id)
LOG.debug("Resource %s created", resource['id'])
def _update_resource(self, resource_type, resource_id,
resource_attributes):
resource_extra):
r = self.gnocchi_api.patch(
"%s/v1/resource/%s/%s"
% (self.gnocchi_url, resource_type, resource_id),
headers=self._get_headers(),
data=json.dumps(resource_attributes))
data=json.dumps(resource_extra))
if int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(

View File

@ -139,15 +139,23 @@ class DispatcherTest(base.BaseTestCase):
gnocchi.GnocchiDispatcher, self.conf.conf)
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
'._process_samples')
def _do_test_activity_filter(self, expected_samples, fake_process_samples):
'._process_resource')
def _do_test_activity_filter(self, expected_samples,
fake_process_resource):
def assert_samples(resource_id, metric_grouped_samples):
samples = []
for metric_name, s in metric_grouped_samples:
samples.extend(list(s))
self.assertEqual(expected_samples, samples)
fake_process_resource.side_effect = assert_samples
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(self.samples)
fake_process_samples.assert_called_with(
mock.ANY, self.resource_id, 'disk.root.size',
expected_samples, True,
)
fake_process_resource.assert_called_with(self.resource_id,
mock.ANY)
def test_archive_policy_map_config(self):
archive_policy_map = yaml.dump({
@ -263,7 +271,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
('normal_workflow', dict(measure=204, post_resource=None, metric=None,
measure_retry=None, patch_resource=204)),
('new_resource', dict(measure=404, post_resource=204, metric=None,
measure_retry=204, patch_resource=None)),
measure_retry=204, patch_resource=204)),
('new_resource_fail', dict(measure=404, post_resource=500, metric=None,
measure_retry=None, patch_resource=None)),
('resource_update_fail', dict(measure=204, post_resource=None,