From 8f4b4142458c819dfdc3ca645b403a1551d2023b Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Thu, 16 Jul 2015 16:11:32 +0200 Subject: [PATCH] gnocchi: reduce the number of patch to gnocchi API This refactor the processing of the samples to more clearly separation the resource creation/update and the measurements posting. This refactor have permit to do only on PATCH of resource per resource received into samples. So, more samples are present in the received message more this code is efficient. Note, the workflow have changing, now resource are updated even the posting of the measurements fail. Change-Id: I01b758a13388da34636dc983a6c8c1550046d419 --- ceilometer/dispatcher/gnocchi.py | 149 +++++++++----------- ceilometer/tests/dispatcher/test_gnocchi.py | 22 ++- 2 files changed, 84 insertions(+), 87 deletions(-) diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index 4a7fe42b..a1b56b99 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -320,89 +320,79 @@ class GnocchiDispatcher(dispatcher.Base): data, key=operator.itemgetter('resource_id')) for resource_id, samples_of_resource in resource_grouped_samples: - resource_need_to_be_updated = True - metric_grouped_samples = itertools.groupby( list(samples_of_resource), key=operator.itemgetter('counter_name')) - for metric_name, samples in metric_grouped_samples: - samples = list(samples) - rd = self._get_resource_definition(metric_name) - if rd: - self._process_samples(rd, resource_id, metric_name, - samples, - resource_need_to_be_updated) - else: - LOG.warn("metric %s is not handled by gnocchi" % - metric_name) - # FIXME(sileht): Does it reasonable to skip the resource - # update here ? Does differents kind of counter_name - # can have different metadata set ? - # (ie: one have only flavor_id, and an other one have only - # image_ref ?) - # - # resource_need_to_be_updated = False + self._process_resource(resource_id, metric_grouped_samples) @log_and_ignore_unexpected_workflow_error - def _process_samples(self, resource_def, resource_id, metric_name, samples, - resource_need_to_be_updated): - resource_type = resource_def.cfg['resource_type'] - measure_attributes = [{'timestamp': sample['timestamp'], - 'value': sample['counter_volume']} - for sample in samples] + def _process_resource(self, resource_id, metric_grouped_samples): + # TODO(sileht): Any HTTP 50X/401 error is just logged and this method + # stop, perhaps we can be smarter and retry later in case of 50X and + # directly in case of 401. A gnocchiclient would help a lot for the + # latest. + + resource_extra = {} + for metric_name, samples in metric_grouped_samples: + samples = list(samples) + rd = self._get_resource_definition(metric_name) + if rd is None: + LOG.warn("metric %s is not handled by gnocchi" % + metric_name) + continue + + resource_type = rd.cfg['resource_type'] + resource = { + "id": resource_id, + "user_id": samples[0]['user_id'], + "project_id": samples[0]['project_id'], + "metrics": rd.metrics(), + } + measures = [] + + for sample in samples: + resource_extra.update(rd.attributes(sample)) + measures.append({'timestamp': sample['timestamp'], + 'value': sample['counter_volume']}) + + resource.update(resource_extra) - try: - self._post_measure(resource_type, resource_id, metric_name, - measure_attributes) - except NoSuchMetric: - # NOTE(sileht): we try first to create the resource, because - # they more chance that the resource doesn't exists than the metric - # is missing, the should be reduce the number of resource API call - resource_attributes = self._get_resource_attributes( - resource_def, resource_id, metric_name, samples) try: - self._create_resource(resource_type, resource_id, - resource_attributes) - except ResourceAlreadyExists: + self._post_measure(resource_type, resource_id, metric_name, + measures) + except NoSuchMetric: + # TODO(sileht): Make gnocchi smarter to be able to detect 404 + # for 'resource doesn't exist' and for 'metric doesn't exist' + # https://bugs.launchpad.net/gnocchi/+bug/1476186 + self._ensure_resource_and_metric(resource_type, resource, + metric_name) + try: - archive_policy = (resource_def.metrics()[metric_name]) - self._create_metric(resource_type, resource_id, - metric_name, archive_policy) - except MetricAlreadyExists: - # NOTE(sileht): Just ignore the metric have been created in - # the meantime. - pass - else: - # No need to update it we just created it - # with everything we need - resource_need_to_be_updated = False + self._post_measure(resource_type, resource_id, + metric_name, measures) + except NoSuchMetric: + LOG.error(_LE("Fail to post measures for " + "%(resource_id)s/%(metric_name)s") % + dict(resource_id=resource_id, + metric_name=metric_name)) - # NOTE(sileht): we retry to post the measure but if it fail we - # don't catch the exception to just log it and continue to process - # other samples - self._post_measure(resource_type, resource_id, metric_name, - measure_attributes) + if resource_extra: + self._update_resource(resource_type, resource_id, resource_extra) - if resource_need_to_be_updated: - resource_attributes = self._get_resource_attributes( - resource_def, resource_id, metric_name, samples, - for_update=True) - if resource_attributes: - self._update_resource(resource_type, resource_id, - resource_attributes) - - def _get_resource_attributes(self, resource_def, resource_id, metric_name, - samples, for_update=False): - # FIXME(sileht): Should I merge attibutes of all samples ? - # Or keep only the last one is sufficient ? - attributes = resource_def.attributes(samples[-1]) - if not for_update: - attributes["id"] = resource_id - attributes["user_id"] = samples[-1]['user_id'] - attributes["project_id"] = samples[-1]['project_id'] - attributes["metrics"] = resource_def.metrics() - return attributes + def _ensure_resource_and_metric(self, resource_type, resource, + metric_name): + try: + self._create_resource(resource_type, resource) + except ResourceAlreadyExists: + try: + archive_policy = resource['metrics'][metric_name] + self._create_metric(resource_type, resource['id'], + metric_name, archive_policy) + except MetricAlreadyExists: + # NOTE(sileht): Just ignore the metric have been + # created in the meantime. + pass def _post_measure(self, resource_type, resource_id, metric_name, measure_attributes): @@ -432,33 +422,32 @@ class GnocchiDispatcher(dispatcher.Base): LOG.debug("Measure posted on metric %s of resource %s", metric_name, resource_id) - def _create_resource(self, resource_type, resource_id, - resource_attributes): + def _create_resource(self, resource_type, resource): r = self.gnocchi_api.post("%s/v1/resource/%s" % (self.gnocchi_url, resource_type), headers=self._get_headers(), - data=json.dumps(resource_attributes)) + data=json.dumps(resource)) if r.status_code == 409: - LOG.debug("Resource %s already exists", resource_id) + LOG.debug("Resource %s already exists", resource['id']) raise ResourceAlreadyExists elif int(r.status_code / 100) != 2: raise UnexpectedWorkflowError( _("Resource %(resource_id)s creation failed with " "status: %(status_code)d: %(msg)s") % - {'resource_id': resource_id, + {'resource_id': resource['id'], 'status_code': r.status_code, 'msg': r.text}) else: - LOG.debug("Resource %s created", resource_id) + LOG.debug("Resource %s created", resource['id']) def _update_resource(self, resource_type, resource_id, - resource_attributes): + resource_extra): r = self.gnocchi_api.patch( "%s/v1/resource/%s/%s" % (self.gnocchi_url, resource_type, resource_id), headers=self._get_headers(), - data=json.dumps(resource_attributes)) + data=json.dumps(resource_extra)) if int(r.status_code / 100) != 2: raise UnexpectedWorkflowError( diff --git a/ceilometer/tests/dispatcher/test_gnocchi.py b/ceilometer/tests/dispatcher/test_gnocchi.py index 70318b61..a681cbd1 100644 --- a/ceilometer/tests/dispatcher/test_gnocchi.py +++ b/ceilometer/tests/dispatcher/test_gnocchi.py @@ -139,15 +139,23 @@ class DispatcherTest(base.BaseTestCase): gnocchi.GnocchiDispatcher, self.conf.conf) @mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher' - '._process_samples') - def _do_test_activity_filter(self, expected_samples, fake_process_samples): + '._process_resource') + def _do_test_activity_filter(self, expected_samples, + fake_process_resource): + + def assert_samples(resource_id, metric_grouped_samples): + samples = [] + for metric_name, s in metric_grouped_samples: + samples.extend(list(s)) + self.assertEqual(expected_samples, samples) + + fake_process_resource.side_effect = assert_samples + d = gnocchi.GnocchiDispatcher(self.conf.conf) d.record_metering_data(self.samples) - fake_process_samples.assert_called_with( - mock.ANY, self.resource_id, 'disk.root.size', - expected_samples, True, - ) + fake_process_resource.assert_called_with(self.resource_id, + mock.ANY) def test_archive_policy_map_config(self): archive_policy_map = yaml.dump({ @@ -263,7 +271,7 @@ class DispatcherWorkflowTest(base.BaseTestCase, ('normal_workflow', dict(measure=204, post_resource=None, metric=None, measure_retry=None, patch_resource=204)), ('new_resource', dict(measure=404, post_resource=204, metric=None, - measure_retry=204, patch_resource=None)), + measure_retry=204, patch_resource=204)), ('new_resource_fail', dict(measure=404, post_resource=500, metric=None, measure_retry=None, patch_resource=None)), ('resource_update_fail', dict(measure=204, post_resource=None,