Fix processing of "rate:xxx" re-aggregation configuration
The aggregation API in Gnocchi does not return anything for Granularity 3600, when the time-range is just one hour for "rate:xxx" re-aggregation methods. That behavior on Gnocchi side is correct. The "rate:xxx" aggregation method needs at least two data points (we are taking the diff, right?). Therefore, when configured to work with "rate:xxx" CloudKitty needs to call the aggregation API with a two hours interval (for granularity 3600), and not just one. With this patch, when CloudKitty detects a "rate:xxx" configuration, it will use the range as two times (*) the granularity we are using to process data. Therefore, if the granularity is one hour, it uses a two-hour time frame; on the other hand, if the granularity is configured to 10 minutes, it would consider a 20 minutes timeframe. I am doing that to retrieve two data points at a time in the aggregation API; thus, we can compute the diffs between the data points. Depends-On: https://review.opendev.org/#/c/704787/ Change-Id: I570db3db4c969042038a286829c8df2ee06fb5d7
This commit is contained in:
@@ -288,7 +288,6 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
"""Get metric during the timeframe.
|
||||
|
||||
:param metric_name: metric name to filter on.
|
||||
:type resource_name: str
|
||||
:param start: Start of the timeframe.
|
||||
:param end: End of the timeframe if needed.
|
||||
:param project_id: Filter on a specific tenant/project.
|
||||
@@ -296,37 +295,12 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
:param q_filter: Append a custom filter.
|
||||
:type q_filter: list
|
||||
"""
|
||||
agg_kwargs = self.get_aggregation_api_arguments(end, metric_name,
|
||||
project_id,
|
||||
q_filter,
|
||||
start)
|
||||
|
||||
# Get gnocchi specific conf
|
||||
extra_args = self.conf[metric_name]['extra_args']
|
||||
|
||||
# get resource type
|
||||
resource_type = extra_args['resource_type']
|
||||
scope_key = CONF.collect.scope_key
|
||||
|
||||
# build search query using resource type and project_id if provided
|
||||
query_parameters = list()
|
||||
query_parameters.append(
|
||||
self.gen_filter(cop="=", type=resource_type))
|
||||
|
||||
if project_id:
|
||||
kwargs = {scope_key: project_id}
|
||||
query_parameters.append(self.gen_filter(**kwargs))
|
||||
if q_filter:
|
||||
query_parameters.append(q_filter)
|
||||
|
||||
op = self.build_operation_command(extra_args, metric_name)
|
||||
|
||||
agg_kwargs = {
|
||||
'resource_type': resource_type,
|
||||
'start': start,
|
||||
'stop': end,
|
||||
'groupby': self.conf[metric_name]['groupby'],
|
||||
'search': self.extend_filter(*query_parameters),
|
||||
}
|
||||
if extra_args['force_granularity'] > 0:
|
||||
agg_kwargs['granularity'] = extra_args['force_granularity']
|
||||
|
||||
op = self.build_operation_command(metric_name)
|
||||
try:
|
||||
measurements = self._conn.aggregates.fetch(op, **agg_kwargs)
|
||||
LOG.debug("Measurements [%s] received with operation [%s] and "
|
||||
@@ -342,8 +316,50 @@ class GnocchiCollector(collector.BaseCollector):
|
||||
'current cycle.'.format(scope=project_id, err=e))
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def build_operation_command(extra_args, metric_name):
|
||||
def get_aggregation_api_arguments(self, end, metric_name, project_id,
|
||||
q_filter, start):
|
||||
extra_args = self.conf[metric_name]['extra_args']
|
||||
resource_type = extra_args['resource_type']
|
||||
|
||||
query_parameters = self.build_query_parameters(project_id, q_filter,
|
||||
resource_type)
|
||||
agg_kwargs = {
|
||||
'resource_type': resource_type,
|
||||
'start': start,
|
||||
'stop': end,
|
||||
'groupby': self.conf[metric_name]['groupby'],
|
||||
'search': self.extend_filter(*query_parameters),
|
||||
}
|
||||
|
||||
force_granularity = extra_args['force_granularity']
|
||||
if force_granularity > 0:
|
||||
agg_kwargs['granularity'] = force_granularity
|
||||
|
||||
re_aggregation_method = extra_args['re_aggregation_method']
|
||||
if re_aggregation_method.startswith('rate:'):
|
||||
agg_kwargs['start'] = start - timedelta(seconds=force_granularity)
|
||||
LOG.debug("Re-aggregation method for metric [%s] configured as"
|
||||
" [%s]. Therefore, we need two data points. Start date"
|
||||
" modified from [%s] to [%s].", metric_name,
|
||||
re_aggregation_method, start, agg_kwargs['start'])
|
||||
|
||||
return agg_kwargs
|
||||
|
||||
def build_query_parameters(self, project_id, q_filter, resource_type):
|
||||
query_parameters = list()
|
||||
query_parameters.append(
|
||||
self.gen_filter(cop="=", type=resource_type))
|
||||
if project_id:
|
||||
scope_key = CONF.collect.scope_key
|
||||
kwargs = {scope_key: project_id}
|
||||
query_parameters.append(self.gen_filter(**kwargs))
|
||||
if q_filter:
|
||||
query_parameters.append(q_filter)
|
||||
return query_parameters
|
||||
|
||||
def build_operation_command(self, metric_name):
|
||||
extra_args = self.conf[metric_name]['extra_args']
|
||||
|
||||
re_aggregation_method = extra_args['re_aggregation_method']
|
||||
op = ["aggregate", re_aggregation_method,
|
||||
["metric", metric_name, extra_args['aggregation_method']]]
|
||||
|
||||
Reference in New Issue
Block a user