fill series when aggregating cross metrics

this fills in series so that overlap of all series is not required
to aggregate. we only fill in if at least one of the metrics contains
a point in the timeslot.

Closes-Bug: #1642661
Change-Id: I1a29f0095387c326bdc87af760a3ccb4a95de828
This commit is contained in:
gord chung 2016-11-17 23:37:00 +00:00 committed by gordon chung
parent fb312488dd
commit 5edc39ad91
9 changed files with 282 additions and 64 deletions

View File

@ -533,16 +533,29 @@ well.
Resampling is done prior to any reaggregation if both parameters are
specified.
Also aggregation across metrics have different behavior depending
on if boundary are set ('start' and 'stop') and if 'needed_overlap' is set.
Also, aggregation across metrics have different behavior depending
on whether boundary values are set ('start' and 'stop') and if 'needed_overlap'
is set.
If boundaries are not set, Gnocchi makes the aggregation only with points
at timestamp present in all timeseries.
at timestamp present in all timeseries. When boundaries are set, Gnocchi
expects that we have certain percent of timestamps common between timeseries,
this percent is controlled by needed_overlap (defaulted with 100%). If this
percent is not reached an error is returned.
The ability to fill in points missing from a subset of timeseries is supported
by specifying a `fill` value. Valid fill values include any valid float or
`null` which will compute aggregation with only the points that exist. The
`fill` parameter will not backfill timestamps which contain no points in any
of the timeseries. Only timestamps which have datapoints in at least one of
the timeseries is returned.
.. note::
A granularity must be specified when using the `fill` parameter.
{{ scenarios['get-across-metrics-measures-by-metric-ids-fill']['doc'] }}
But when boundaries are set, Gnocchi expects that we have certain
percent of timestamps common between timeseries, this percent is controlled
by needed_overlap (defaulted with 100%). If this percent is not reached an
error is returned.
Capabilities
============

View File

@ -726,6 +726,10 @@
request: |
GET /v1/aggregation/metric?metric={{ scenarios['create-resource-instance-with-metrics']['response'].json['metrics']['cpu.util'] }}&metric={{ scenarios['create-resource-instance-with-dynamic-metrics']['response'].json['metrics']['cpu.util'] }}&aggregation=mean&reaggregation=min HTTP/1.1
- name: get-across-metrics-measures-by-metric-ids-fill
request: |
GET /v1/aggregation/metric?metric={{ scenarios['create-resource-instance-with-metrics']['response'].json['metrics']['cpu.util'] }}&metric={{ scenarios['create-resource-instance-with-dynamic-metrics']['response'].json['metrics']['cpu.util'] }}&fill=0&granularity=1 HTTP/1.1
- name: append-metrics-to-resource
request: |
POST /v1/resource/generic/{{ scenarios['create-resource-instance-with-metrics']['response'].json['id'] }}/metric HTTP/1.1

View File

@ -713,7 +713,8 @@ class AggregatedTimeSerie(TimeSerie):
@staticmethod
def aggregated(timeseries, aggregation, from_timestamp=None,
to_timestamp=None, needed_percent_of_overlap=100.0):
to_timestamp=None, needed_percent_of_overlap=100.0,
fill=None):
index = ['timestamp', 'granularity']
columns = ['timestamp', 'granularity', 'value']
@ -737,57 +738,65 @@ class AggregatedTimeSerie(TimeSerie):
set(ts.sampling for ts in timeseries)
)
grouped = pandas.concat(dataframes).groupby(level=index)
left_boundary_ts = None
right_boundary_ts = None
maybe_next_timestamp_is_left_boundary = False
if fill is not None:
fill_df = pandas.concat(dataframes, axis=1)
if fill != 'null':
fill_df = fill_df.fillna(fill)
single_df = pandas.concat([series for __, series in
fill_df.iteritems()]).to_frame()
grouped = single_df.groupby(level=index)
else:
grouped = pandas.concat(dataframes).groupby(level=index)
maybe_next_timestamp_is_left_boundary = False
left_holes = 0
right_holes = 0
holes = 0
for (timestamp, __), group in grouped:
if group.count()['value'] != number_of_distinct_datasource:
maybe_next_timestamp_is_left_boundary = True
if left_boundary_ts is not None:
right_holes += 1
left_holes = 0
right_holes = 0
holes = 0
for (timestamp, __), group in grouped:
if group.count()['value'] != number_of_distinct_datasource:
maybe_next_timestamp_is_left_boundary = True
if left_boundary_ts is not None:
right_holes += 1
else:
left_holes += 1
elif maybe_next_timestamp_is_left_boundary:
left_boundary_ts = timestamp
maybe_next_timestamp_is_left_boundary = False
else:
left_holes += 1
elif maybe_next_timestamp_is_left_boundary:
left_boundary_ts = timestamp
maybe_next_timestamp_is_left_boundary = False
else:
right_boundary_ts = timestamp
right_boundary_ts = timestamp
holes += right_holes
right_holes = 0
if to_timestamp is not None:
holes += left_holes
if from_timestamp is not None:
holes += right_holes
right_holes = 0
if to_timestamp is not None:
holes += left_holes
if from_timestamp is not None:
holes += right_holes
if to_timestamp is not None or from_timestamp is not None:
maximum = len(grouped)
percent_of_overlap = (float(maximum - holes) * 100.0 /
float(maximum))
if percent_of_overlap < needed_percent_of_overlap:
raise UnAggregableTimeseries(
'Less than %f%% of datapoints overlap in this '
'timespan (%.2f%%)' % (needed_percent_of_overlap,
percent_of_overlap))
if (needed_percent_of_overlap > 0 and
(right_boundary_ts == left_boundary_ts or
(right_boundary_ts is None
and maybe_next_timestamp_is_left_boundary))):
LOG.debug("We didn't find points that overlap in those "
"timeseries. "
"right_boundary_ts=%(right_boundary_ts)s, "
"left_boundary_ts=%(left_boundary_ts)s, "
"groups=%(groups)s", {
'right_boundary_ts': right_boundary_ts,
'left_boundary_ts': left_boundary_ts,
'groups': list(grouped)
})
raise UnAggregableTimeseries('No overlap')
if to_timestamp is not None or from_timestamp is not None:
maximum = len(grouped)
percent_of_overlap = (float(maximum - holes) * 100.0 /
float(maximum))
if percent_of_overlap < needed_percent_of_overlap:
raise UnAggregableTimeseries(
'Less than %f%% of datapoints overlap in this '
'timespan (%.2f%%)' % (needed_percent_of_overlap,
percent_of_overlap))
if (needed_percent_of_overlap > 0 and
(right_boundary_ts == left_boundary_ts or
(right_boundary_ts is None
and maybe_next_timestamp_is_left_boundary))):
LOG.debug("We didn't find points that overlap in those "
"timeseries. "
"right_boundary_ts=%(right_boundary_ts)s, "
"left_boundary_ts=%(left_boundary_ts)s, "
"groups=%(groups)s", {
'right_boundary_ts': right_boundary_ts,
'left_boundary_ts': left_boundary_ts,
'groups': list(grouped)
})
raise UnAggregableTimeseries('No overlap')
# NOTE(sileht): this call the aggregation method on already
# aggregated values, for some kind of aggregation this can

View File

@ -1496,7 +1496,7 @@ class AggregationResourceController(rest.RestController):
@pecan.expose('json')
def post(self, start=None, stop=None, aggregation='mean',
reaggregation=None, granularity=None, needed_overlap=100.0,
groupby=None, refresh=False, resample=None):
groupby=None, fill=None, refresh=False, resample=None):
# First, set groupby in the right format: a sorted list of unique
# strings.
groupby = sorted(set(arg_to_list(groupby)))
@ -1520,7 +1520,7 @@ class AggregationResourceController(rest.RestController):
for r in resources)))
return AggregationController.get_cross_metric_measures_from_objs(
metrics, start, stop, aggregation, reaggregation,
granularity, needed_overlap, refresh, resample)
granularity, needed_overlap, fill, refresh, resample)
def groupper(r):
return tuple((attr, r[attr]) for attr in groupby)
@ -1534,7 +1534,7 @@ class AggregationResourceController(rest.RestController):
"group": dict(key),
"measures": AggregationController.get_cross_metric_measures_from_objs( # noqa
metrics, start, stop, aggregation, reaggregation,
granularity, needed_overlap, refresh, resample)
granularity, needed_overlap, fill, refresh, resample)
})
return results
@ -1564,7 +1564,7 @@ class AggregationController(rest.RestController):
aggregation='mean',
reaggregation=None,
granularity=None,
needed_overlap=100.0,
needed_overlap=100.0, fill=None,
refresh=False, resample=None):
try:
needed_overlap = float(needed_overlap)
@ -1611,6 +1611,15 @@ class AggregationController(rest.RestController):
except ValueError as e:
abort(400, e)
if fill is not None:
if granularity is None:
abort(400, "Unable to fill without a granularity")
try:
fill = float(fill)
except ValueError as e:
if fill != 'null':
abort(400, "fill must be a float or \'null\': %s" % e)
try:
if strutils.bool_from_string(refresh):
pecan.request.storage.process_new_measures(
@ -1625,7 +1634,7 @@ class AggregationController(rest.RestController):
else:
measures = pecan.request.storage.get_cross_metric_measures(
metrics, start, stop, aggregation,
reaggregation, resample, granularity, needed_overlap)
reaggregation, resample, granularity, needed_overlap, fill)
# Replace timestamp keys by their string versions
return [(timestamp.isoformat(), offset, v)
for timestamp, offset, v in measures]
@ -1640,7 +1649,8 @@ class AggregationController(rest.RestController):
@pecan.expose('json')
def get_metric(self, metric=None, start=None, stop=None,
aggregation='mean', reaggregation=None, granularity=None,
needed_overlap=100.0, refresh=False, resample=None):
needed_overlap=100.0, fill=None,
refresh=False, resample=None):
# Check RBAC policy
metric_ids = arg_to_list(metric)
metrics = pecan.request.indexer.list_metrics(ids=metric_ids)
@ -1652,7 +1662,7 @@ class AggregationController(rest.RestController):
missing_metric_ids.pop()))
return self.get_cross_metric_measures_from_objs(
metrics, start, stop, aggregation, reaggregation,
granularity, needed_overlap, refresh, resample)
granularity, needed_overlap, fill, refresh, resample)
class CapabilityController(rest.RestController):

View File

@ -257,8 +257,8 @@ class StorageDriver(object):
def get_cross_metric_measures(metrics, from_timestamp=None,
to_timestamp=None, aggregation='mean',
reaggregation=None, resample=None,
granularity=None,
needed_overlap=None):
granularity=None, needed_overlap=None,
fill=None):
"""Get aggregated measures of multiple entities.
:param entities: The entities measured to aggregate.
@ -269,6 +269,7 @@ class StorageDriver(object):
:param reaggregation: The type of aggregation to compute
on the retrieved measures.
:param resample: The granularity to resample to.
:param fill: The value to use to fill in missing data in series.
"""
for metric in metrics:
if aggregation not in metric.archive_policy.aggregation_methods:

View File

@ -538,7 +538,8 @@ class CarbonaraBasedStorage(storage.StorageDriver):
def get_cross_metric_measures(self, metrics, from_timestamp=None,
to_timestamp=None, aggregation='mean',
reaggregation=None, resample=None,
granularity=None, needed_overlap=100.0):
granularity=None, needed_overlap=100.0,
fill=None):
super(CarbonaraBasedStorage, self).get_cross_metric_measures(
metrics, from_timestamp, to_timestamp,
aggregation, reaggregation, resample, granularity, needed_overlap)
@ -584,7 +585,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
for timestamp, r, v
in carbonara.AggregatedTimeSerie.aggregated(
tss, reaggregation, from_timestamp, to_timestamp,
needed_overlap)]
needed_overlap, fill)]
except carbonara.UnAggregableTimeseries as e:
raise storage.MetricUnaggregatable(metrics, e.reason)

View File

@ -59,6 +59,8 @@ tests:
value: 3.1
- timestamp: "2015-03-06T14:34:12"
value: 2
- timestamp: "2015-03-06T14:35:12"
value: 5
status: 202
- name: get metric list to get aggregates
@ -128,6 +130,43 @@ tests:
- ['2015-03-06T14:33:00+00:00', 60.0, 23.1]
- ['2015-03-06T14:34:00+00:00', 60.0, 7.0]
- name: get metric list to push metric 6
GET: /v1/metric
- name: get measure aggregates with fill zero
GET: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=1&fill=0
response_json_paths:
$:
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]
- ['2015-03-06T14:35:12+00:00', 1.0, 2.5]
- name: get metric list to push metric 7
GET: /v1/metric
- name: get measure aggregates with fill null
GET: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=1&fill=null
response_json_paths:
$:
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]
- ['2015-03-06T14:35:12+00:00', 1.0, 5.0]
- name: get metric list to push metric 8
GET: /v1/metric
- name: get measure aggregates with fill missing granularity
GET: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&fill=0
status: 400
- name: get metric list to push metric 9
GET: /v1/metric
- name: get measure aggregates with bad fill
GET: /v1/aggregation/metric?metric=$RESPONSE['$[0].id']&metric=$RESPONSE['$[1].id']&granularity=1&fill=asdf
status: 400
# Aggregation by resource and metric_name
- name: post a resource
@ -180,6 +219,8 @@ tests:
value: 3.1
- timestamp: "2015-03-06T14:34:12"
value: 2
- timestamp: "2015-03-06T14:35:12"
value: 5
status: 202
- name: get measure aggregates by granularity from resources with refresh
@ -264,6 +305,19 @@ tests:
- ['2015-03-06T14:33:57+00:00', 1.0, 3.1]
- ['2015-03-06T14:34:12+00:00', 1.0, 2.0]
- name: get measure aggregates from resources with fill zero
POST: /v1/aggregation/resource/generic/metric/agg_meter?granularity=1&fill=0
request_headers:
x-user-id: 0fbb231484614b1a80131fc22f6afc9c
x-project-id: f3d41b770cc14f0bb94a1d5be9c0e3ea
content-type: application/json
response_json_paths:
$:
- ['2015-03-06T14:33:57+00:00', 1.0, 23.1]
- ['2015-03-06T14:34:12+00:00', 1.0, 7.0]
- ['2015-03-06T14:35:12+00:00', 1.0, 2.5]
# Some negative tests
- name: get measure aggregates with wrong GET

View File

@ -511,6 +511,126 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
(datetime.datetime(2014, 1, 1, 12, 6), 60.0, 4.0)
], ts['return'].fetch(datetime.datetime(2014, 1, 1, 12, 0, 0)))
def test_aggregated_some_overlap_with_fill_zero(self):
tsc1 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb1 = carbonara.BoundTimeSerie(block_size=tsc1['sampling'])
tsc2 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb2 = carbonara.BoundTimeSerie(block_size=tsc2['sampling'])
tsb1.set_values([
(datetime.datetime(2014, 1, 1, 12, 3, 0), 9),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 1),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 2),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 7),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 5),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 3),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc1))
tsb2.set_values([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 6),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 2),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 13),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 24),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 4),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 16),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 12),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc2))
output = carbonara.AggregatedTimeSerie.aggregated([
tsc1['return'], tsc2['return']], aggregation='mean', fill=0)
self.assertEqual([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 60.0, 3.0),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 60.0, 1.0),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 60.0, 6.5),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 60.0, 16.5),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 60.0, 2.5),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 60.0, 9.0),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 60.0, 9.5),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 60.0, 2.5),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 60.0, 1.5),
], output)
def test_aggregated_some_overlap_with_fill_null(self):
tsc1 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb1 = carbonara.BoundTimeSerie(block_size=tsc1['sampling'])
tsc2 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb2 = carbonara.BoundTimeSerie(block_size=tsc2['sampling'])
tsb1.set_values([
(datetime.datetime(2014, 1, 1, 12, 3, 0), 9),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 1),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 2),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 7),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 5),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 3),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc1))
tsb2.set_values([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 6),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 2),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 13),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 24),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 4),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 16),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 12),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc2))
output = carbonara.AggregatedTimeSerie.aggregated([
tsc1['return'], tsc2['return']], aggregation='mean', fill='null')
self.assertEqual([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 60.0, 6.0),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 60.0, 2.0),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 60.0, 13.0),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 60.0, 16.5),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 60.0, 2.5),
(datetime.datetime(2014, 1, 1, 12, 5, 0), 60.0, 9.0),
(datetime.datetime(2014, 1, 1, 12, 6, 0), 60.0, 9.5),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 60.0, 5.0),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 60.0, 3.0),
], output)
def test_aggregate_no_points_with_fill_zero(self):
tsc1 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb1 = carbonara.BoundTimeSerie(block_size=tsc1['sampling'])
tsc2 = {'sampling': 60, 'size': 10, 'agg': 'mean'}
tsb2 = carbonara.BoundTimeSerie(block_size=tsc2['sampling'])
tsb1.set_values([
(datetime.datetime(2014, 1, 1, 12, 3, 0), 9),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 1),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 5),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 3),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc1))
tsb2.set_values([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 6),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 2),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 13),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 24),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 4),
], before_truncate_callback=functools.partial(
self._resample_and_merge, agg_dict=tsc2))
output = carbonara.AggregatedTimeSerie.aggregated([
tsc1['return'], tsc2['return']], aggregation='mean', fill=0)
self.assertEqual([
(datetime.datetime(2014, 1, 1, 12, 0, 0), 60.0, 3.0),
(datetime.datetime(2014, 1, 1, 12, 1, 0), 60.0, 1.0),
(datetime.datetime(2014, 1, 1, 12, 2, 0), 60.0, 6.5),
(datetime.datetime(2014, 1, 1, 12, 3, 0), 60.0, 16.5),
(datetime.datetime(2014, 1, 1, 12, 4, 0), 60.0, 2.5),
(datetime.datetime(2014, 1, 1, 12, 7, 0), 60.0, 2.5),
(datetime.datetime(2014, 1, 1, 12, 8, 0), 60.0, 1.5),
], output)
def test_fetch_agg_pct(self):
ts = {'sampling': 1, 'size': 3600 * 24, 'agg': '90pct'}
tsb = carbonara.BoundTimeSerie(block_size=ts['sampling'])

View File

@ -0,0 +1,6 @@
---
features:
- Add support to backfill timestamps with missing points in a subset of
timeseries when computing aggregation across multiple metrics. User can
specify `fill` value with either a float or `null` value. A granularity
must be specified in addition to `fill`.