Merge "gnocchi: reduce the number of patch to gnocchi API"
This commit is contained in:
commit
6e969335d1
@ -320,89 +320,79 @@ class GnocchiDispatcher(dispatcher.Base):
|
||||
data, key=operator.itemgetter('resource_id'))
|
||||
|
||||
for resource_id, samples_of_resource in resource_grouped_samples:
|
||||
resource_need_to_be_updated = True
|
||||
|
||||
metric_grouped_samples = itertools.groupby(
|
||||
list(samples_of_resource),
|
||||
key=operator.itemgetter('counter_name'))
|
||||
for metric_name, samples in metric_grouped_samples:
|
||||
samples = list(samples)
|
||||
rd = self._get_resource_definition(metric_name)
|
||||
if rd:
|
||||
self._process_samples(rd, resource_id, metric_name,
|
||||
samples,
|
||||
resource_need_to_be_updated)
|
||||
else:
|
||||
LOG.warn("metric %s is not handled by gnocchi" %
|
||||
metric_name)
|
||||
|
||||
# FIXME(sileht): Does it reasonable to skip the resource
|
||||
# update here ? Does differents kind of counter_name
|
||||
# can have different metadata set ?
|
||||
# (ie: one have only flavor_id, and an other one have only
|
||||
# image_ref ?)
|
||||
#
|
||||
# resource_need_to_be_updated = False
|
||||
self._process_resource(resource_id, metric_grouped_samples)
|
||||
|
||||
@log_and_ignore_unexpected_workflow_error
|
||||
def _process_samples(self, resource_def, resource_id, metric_name, samples,
|
||||
resource_need_to_be_updated):
|
||||
resource_type = resource_def.cfg['resource_type']
|
||||
measure_attributes = [{'timestamp': sample['timestamp'],
|
||||
'value': sample['counter_volume']}
|
||||
for sample in samples]
|
||||
def _process_resource(self, resource_id, metric_grouped_samples):
|
||||
# TODO(sileht): Any HTTP 50X/401 error is just logged and this method
|
||||
# stop, perhaps we can be smarter and retry later in case of 50X and
|
||||
# directly in case of 401. A gnocchiclient would help a lot for the
|
||||
# latest.
|
||||
|
||||
resource_extra = {}
|
||||
for metric_name, samples in metric_grouped_samples:
|
||||
samples = list(samples)
|
||||
rd = self._get_resource_definition(metric_name)
|
||||
if rd is None:
|
||||
LOG.warn("metric %s is not handled by gnocchi" %
|
||||
metric_name)
|
||||
continue
|
||||
|
||||
resource_type = rd.cfg['resource_type']
|
||||
resource = {
|
||||
"id": resource_id,
|
||||
"user_id": samples[0]['user_id'],
|
||||
"project_id": samples[0]['project_id'],
|
||||
"metrics": rd.metrics(),
|
||||
}
|
||||
measures = []
|
||||
|
||||
for sample in samples:
|
||||
resource_extra.update(rd.attributes(sample))
|
||||
measures.append({'timestamp': sample['timestamp'],
|
||||
'value': sample['counter_volume']})
|
||||
|
||||
resource.update(resource_extra)
|
||||
|
||||
try:
|
||||
self._post_measure(resource_type, resource_id, metric_name,
|
||||
measure_attributes)
|
||||
except NoSuchMetric:
|
||||
# NOTE(sileht): we try first to create the resource, because
|
||||
# they more chance that the resource doesn't exists than the metric
|
||||
# is missing, the should be reduce the number of resource API call
|
||||
resource_attributes = self._get_resource_attributes(
|
||||
resource_def, resource_id, metric_name, samples)
|
||||
try:
|
||||
self._create_resource(resource_type, resource_id,
|
||||
resource_attributes)
|
||||
except ResourceAlreadyExists:
|
||||
self._post_measure(resource_type, resource_id, metric_name,
|
||||
measures)
|
||||
except NoSuchMetric:
|
||||
# TODO(sileht): Make gnocchi smarter to be able to detect 404
|
||||
# for 'resource doesn't exist' and for 'metric doesn't exist'
|
||||
# https://bugs.launchpad.net/gnocchi/+bug/1476186
|
||||
self._ensure_resource_and_metric(resource_type, resource,
|
||||
metric_name)
|
||||
|
||||
try:
|
||||
archive_policy = (resource_def.metrics()[metric_name])
|
||||
self._create_metric(resource_type, resource_id,
|
||||
metric_name, archive_policy)
|
||||
except MetricAlreadyExists:
|
||||
# NOTE(sileht): Just ignore the metric have been created in
|
||||
# the meantime.
|
||||
pass
|
||||
else:
|
||||
# No need to update it we just created it
|
||||
# with everything we need
|
||||
resource_need_to_be_updated = False
|
||||
self._post_measure(resource_type, resource_id,
|
||||
metric_name, measures)
|
||||
except NoSuchMetric:
|
||||
LOG.error(_LE("Fail to post measures for "
|
||||
"%(resource_id)s/%(metric_name)s") %
|
||||
dict(resource_id=resource_id,
|
||||
metric_name=metric_name))
|
||||
|
||||
# NOTE(sileht): we retry to post the measure but if it fail we
|
||||
# don't catch the exception to just log it and continue to process
|
||||
# other samples
|
||||
self._post_measure(resource_type, resource_id, metric_name,
|
||||
measure_attributes)
|
||||
if resource_extra:
|
||||
self._update_resource(resource_type, resource_id, resource_extra)
|
||||
|
||||
if resource_need_to_be_updated:
|
||||
resource_attributes = self._get_resource_attributes(
|
||||
resource_def, resource_id, metric_name, samples,
|
||||
for_update=True)
|
||||
if resource_attributes:
|
||||
self._update_resource(resource_type, resource_id,
|
||||
resource_attributes)
|
||||
|
||||
def _get_resource_attributes(self, resource_def, resource_id, metric_name,
|
||||
samples, for_update=False):
|
||||
# FIXME(sileht): Should I merge attibutes of all samples ?
|
||||
# Or keep only the last one is sufficient ?
|
||||
attributes = resource_def.attributes(samples[-1])
|
||||
if not for_update:
|
||||
attributes["id"] = resource_id
|
||||
attributes["user_id"] = samples[-1]['user_id']
|
||||
attributes["project_id"] = samples[-1]['project_id']
|
||||
attributes["metrics"] = resource_def.metrics()
|
||||
return attributes
|
||||
def _ensure_resource_and_metric(self, resource_type, resource,
|
||||
metric_name):
|
||||
try:
|
||||
self._create_resource(resource_type, resource)
|
||||
except ResourceAlreadyExists:
|
||||
try:
|
||||
archive_policy = resource['metrics'][metric_name]
|
||||
self._create_metric(resource_type, resource['id'],
|
||||
metric_name, archive_policy)
|
||||
except MetricAlreadyExists:
|
||||
# NOTE(sileht): Just ignore the metric have been
|
||||
# created in the meantime.
|
||||
pass
|
||||
|
||||
def _post_measure(self, resource_type, resource_id, metric_name,
|
||||
measure_attributes):
|
||||
@ -432,33 +422,32 @@ class GnocchiDispatcher(dispatcher.Base):
|
||||
LOG.debug("Measure posted on metric %s of resource %s",
|
||||
metric_name, resource_id)
|
||||
|
||||
def _create_resource(self, resource_type, resource_id,
|
||||
resource_attributes):
|
||||
def _create_resource(self, resource_type, resource):
|
||||
r = self.gnocchi_api.post("%s/v1/resource/%s"
|
||||
% (self.gnocchi_url, resource_type),
|
||||
headers=self._get_headers(),
|
||||
data=json.dumps(resource_attributes))
|
||||
data=json.dumps(resource))
|
||||
if r.status_code == 409:
|
||||
LOG.debug("Resource %s already exists", resource_id)
|
||||
LOG.debug("Resource %s already exists", resource['id'])
|
||||
raise ResourceAlreadyExists
|
||||
|
||||
elif int(r.status_code / 100) != 2:
|
||||
raise UnexpectedWorkflowError(
|
||||
_("Resource %(resource_id)s creation failed with "
|
||||
"status: %(status_code)d: %(msg)s") %
|
||||
{'resource_id': resource_id,
|
||||
{'resource_id': resource['id'],
|
||||
'status_code': r.status_code,
|
||||
'msg': r.text})
|
||||
else:
|
||||
LOG.debug("Resource %s created", resource_id)
|
||||
LOG.debug("Resource %s created", resource['id'])
|
||||
|
||||
def _update_resource(self, resource_type, resource_id,
|
||||
resource_attributes):
|
||||
resource_extra):
|
||||
r = self.gnocchi_api.patch(
|
||||
"%s/v1/resource/%s/%s"
|
||||
% (self.gnocchi_url, resource_type, resource_id),
|
||||
headers=self._get_headers(),
|
||||
data=json.dumps(resource_attributes))
|
||||
data=json.dumps(resource_extra))
|
||||
|
||||
if int(r.status_code / 100) != 2:
|
||||
raise UnexpectedWorkflowError(
|
||||
|
@ -139,15 +139,23 @@ class DispatcherTest(base.BaseTestCase):
|
||||
gnocchi.GnocchiDispatcher, self.conf.conf)
|
||||
|
||||
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
|
||||
'._process_samples')
|
||||
def _do_test_activity_filter(self, expected_samples, fake_process_samples):
|
||||
'._process_resource')
|
||||
def _do_test_activity_filter(self, expected_samples,
|
||||
fake_process_resource):
|
||||
|
||||
def assert_samples(resource_id, metric_grouped_samples):
|
||||
samples = []
|
||||
for metric_name, s in metric_grouped_samples:
|
||||
samples.extend(list(s))
|
||||
self.assertEqual(expected_samples, samples)
|
||||
|
||||
fake_process_resource.side_effect = assert_samples
|
||||
|
||||
d = gnocchi.GnocchiDispatcher(self.conf.conf)
|
||||
d.record_metering_data(self.samples)
|
||||
|
||||
fake_process_samples.assert_called_with(
|
||||
mock.ANY, self.resource_id, 'disk.root.size',
|
||||
expected_samples, True,
|
||||
)
|
||||
fake_process_resource.assert_called_with(self.resource_id,
|
||||
mock.ANY)
|
||||
|
||||
def test_archive_policy_map_config(self):
|
||||
archive_policy_map = yaml.dump({
|
||||
@ -284,7 +292,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
|
||||
('normal_workflow', dict(measure=204, post_resource=None, metric=None,
|
||||
measure_retry=None, patch_resource=204)),
|
||||
('new_resource', dict(measure=404, post_resource=204, metric=None,
|
||||
measure_retry=204, patch_resource=None)),
|
||||
measure_retry=204, patch_resource=204)),
|
||||
('new_resource_fail', dict(measure=404, post_resource=500, metric=None,
|
||||
measure_retry=None, patch_resource=None)),
|
||||
('resource_update_fail', dict(measure=204, post_resource=None,
|
||||
|
Loading…
Reference in New Issue
Block a user