diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index 4a7fe42b..a1b56b99 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -320,89 +320,79 @@ class GnocchiDispatcher(dispatcher.Base): data, key=operator.itemgetter('resource_id')) for resource_id, samples_of_resource in resource_grouped_samples: - resource_need_to_be_updated = True - metric_grouped_samples = itertools.groupby( list(samples_of_resource), key=operator.itemgetter('counter_name')) - for metric_name, samples in metric_grouped_samples: - samples = list(samples) - rd = self._get_resource_definition(metric_name) - if rd: - self._process_samples(rd, resource_id, metric_name, - samples, - resource_need_to_be_updated) - else: - LOG.warn("metric %s is not handled by gnocchi" % - metric_name) - # FIXME(sileht): Does it reasonable to skip the resource - # update here ? Does differents kind of counter_name - # can have different metadata set ? - # (ie: one have only flavor_id, and an other one have only - # image_ref ?) - # - # resource_need_to_be_updated = False + self._process_resource(resource_id, metric_grouped_samples) @log_and_ignore_unexpected_workflow_error - def _process_samples(self, resource_def, resource_id, metric_name, samples, - resource_need_to_be_updated): - resource_type = resource_def.cfg['resource_type'] - measure_attributes = [{'timestamp': sample['timestamp'], - 'value': sample['counter_volume']} - for sample in samples] + def _process_resource(self, resource_id, metric_grouped_samples): + # TODO(sileht): Any HTTP 50X/401 error is just logged and this method + # stop, perhaps we can be smarter and retry later in case of 50X and + # directly in case of 401. A gnocchiclient would help a lot for the + # latest. + + resource_extra = {} + for metric_name, samples in metric_grouped_samples: + samples = list(samples) + rd = self._get_resource_definition(metric_name) + if rd is None: + LOG.warn("metric %s is not handled by gnocchi" % + metric_name) + continue + + resource_type = rd.cfg['resource_type'] + resource = { + "id": resource_id, + "user_id": samples[0]['user_id'], + "project_id": samples[0]['project_id'], + "metrics": rd.metrics(), + } + measures = [] + + for sample in samples: + resource_extra.update(rd.attributes(sample)) + measures.append({'timestamp': sample['timestamp'], + 'value': sample['counter_volume']}) + + resource.update(resource_extra) - try: - self._post_measure(resource_type, resource_id, metric_name, - measure_attributes) - except NoSuchMetric: - # NOTE(sileht): we try first to create the resource, because - # they more chance that the resource doesn't exists than the metric - # is missing, the should be reduce the number of resource API call - resource_attributes = self._get_resource_attributes( - resource_def, resource_id, metric_name, samples) try: - self._create_resource(resource_type, resource_id, - resource_attributes) - except ResourceAlreadyExists: + self._post_measure(resource_type, resource_id, metric_name, + measures) + except NoSuchMetric: + # TODO(sileht): Make gnocchi smarter to be able to detect 404 + # for 'resource doesn't exist' and for 'metric doesn't exist' + # https://bugs.launchpad.net/gnocchi/+bug/1476186 + self._ensure_resource_and_metric(resource_type, resource, + metric_name) + try: - archive_policy = (resource_def.metrics()[metric_name]) - self._create_metric(resource_type, resource_id, - metric_name, archive_policy) - except MetricAlreadyExists: - # NOTE(sileht): Just ignore the metric have been created in - # the meantime. - pass - else: - # No need to update it we just created it - # with everything we need - resource_need_to_be_updated = False + self._post_measure(resource_type, resource_id, + metric_name, measures) + except NoSuchMetric: + LOG.error(_LE("Fail to post measures for " + "%(resource_id)s/%(metric_name)s") % + dict(resource_id=resource_id, + metric_name=metric_name)) - # NOTE(sileht): we retry to post the measure but if it fail we - # don't catch the exception to just log it and continue to process - # other samples - self._post_measure(resource_type, resource_id, metric_name, - measure_attributes) + if resource_extra: + self._update_resource(resource_type, resource_id, resource_extra) - if resource_need_to_be_updated: - resource_attributes = self._get_resource_attributes( - resource_def, resource_id, metric_name, samples, - for_update=True) - if resource_attributes: - self._update_resource(resource_type, resource_id, - resource_attributes) - - def _get_resource_attributes(self, resource_def, resource_id, metric_name, - samples, for_update=False): - # FIXME(sileht): Should I merge attibutes of all samples ? - # Or keep only the last one is sufficient ? - attributes = resource_def.attributes(samples[-1]) - if not for_update: - attributes["id"] = resource_id - attributes["user_id"] = samples[-1]['user_id'] - attributes["project_id"] = samples[-1]['project_id'] - attributes["metrics"] = resource_def.metrics() - return attributes + def _ensure_resource_and_metric(self, resource_type, resource, + metric_name): + try: + self._create_resource(resource_type, resource) + except ResourceAlreadyExists: + try: + archive_policy = resource['metrics'][metric_name] + self._create_metric(resource_type, resource['id'], + metric_name, archive_policy) + except MetricAlreadyExists: + # NOTE(sileht): Just ignore the metric have been + # created in the meantime. + pass def _post_measure(self, resource_type, resource_id, metric_name, measure_attributes): @@ -432,33 +422,32 @@ class GnocchiDispatcher(dispatcher.Base): LOG.debug("Measure posted on metric %s of resource %s", metric_name, resource_id) - def _create_resource(self, resource_type, resource_id, - resource_attributes): + def _create_resource(self, resource_type, resource): r = self.gnocchi_api.post("%s/v1/resource/%s" % (self.gnocchi_url, resource_type), headers=self._get_headers(), - data=json.dumps(resource_attributes)) + data=json.dumps(resource)) if r.status_code == 409: - LOG.debug("Resource %s already exists", resource_id) + LOG.debug("Resource %s already exists", resource['id']) raise ResourceAlreadyExists elif int(r.status_code / 100) != 2: raise UnexpectedWorkflowError( _("Resource %(resource_id)s creation failed with " "status: %(status_code)d: %(msg)s") % - {'resource_id': resource_id, + {'resource_id': resource['id'], 'status_code': r.status_code, 'msg': r.text}) else: - LOG.debug("Resource %s created", resource_id) + LOG.debug("Resource %s created", resource['id']) def _update_resource(self, resource_type, resource_id, - resource_attributes): + resource_extra): r = self.gnocchi_api.patch( "%s/v1/resource/%s/%s" % (self.gnocchi_url, resource_type, resource_id), headers=self._get_headers(), - data=json.dumps(resource_attributes)) + data=json.dumps(resource_extra)) if int(r.status_code / 100) != 2: raise UnexpectedWorkflowError( diff --git a/ceilometer/tests/dispatcher/test_gnocchi.py b/ceilometer/tests/dispatcher/test_gnocchi.py index b0e5bac8..f38adce0 100644 --- a/ceilometer/tests/dispatcher/test_gnocchi.py +++ b/ceilometer/tests/dispatcher/test_gnocchi.py @@ -139,15 +139,23 @@ class DispatcherTest(base.BaseTestCase): gnocchi.GnocchiDispatcher, self.conf.conf) @mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher' - '._process_samples') - def _do_test_activity_filter(self, expected_samples, fake_process_samples): + '._process_resource') + def _do_test_activity_filter(self, expected_samples, + fake_process_resource): + + def assert_samples(resource_id, metric_grouped_samples): + samples = [] + for metric_name, s in metric_grouped_samples: + samples.extend(list(s)) + self.assertEqual(expected_samples, samples) + + fake_process_resource.side_effect = assert_samples + d = gnocchi.GnocchiDispatcher(self.conf.conf) d.record_metering_data(self.samples) - fake_process_samples.assert_called_with( - mock.ANY, self.resource_id, 'disk.root.size', - expected_samples, True, - ) + fake_process_resource.assert_called_with(self.resource_id, + mock.ANY) def test_archive_policy_map_config(self): archive_policy_map = yaml.dump({ @@ -284,7 +292,7 @@ class DispatcherWorkflowTest(base.BaseTestCase, ('normal_workflow', dict(measure=204, post_resource=None, metric=None, measure_retry=None, patch_resource=204)), ('new_resource', dict(measure=404, post_resource=204, metric=None, - measure_retry=204, patch_resource=None)), + measure_retry=204, patch_resource=204)), ('new_resource_fail', dict(measure=404, post_resource=500, metric=None, measure_retry=None, patch_resource=None)), ('resource_update_fail', dict(measure=204, post_resource=None,