support shrinking/extending policy timespan
add flexibility to allow users to shrink or expand the coverage of a granularity. this does not support changing the granularity, just the duration of the granularity (if it exists). Change-Id: I71715ea75a8d80807616830fd64f1844516d82bc
This commit is contained in:
parent
58eb564af9
commit
1d3c67574a
@ -159,6 +159,17 @@ It is also possible to list archive policies:
|
||||
|
||||
{{ scenarios['list-archive-policy']['doc'] }}
|
||||
|
||||
Existing archive policies can be modified to retain more or less data depending
|
||||
on requirements. If the policy coverage is expanded, measures are not
|
||||
retroactively calculated as backfill to accommodate the new timespan:
|
||||
|
||||
{{ scenarios['update-archive-policy']['doc'] }}
|
||||
|
||||
.. note::
|
||||
|
||||
Granularities cannot be changed to a different rate. Also, granularities
|
||||
cannot be added or dropped from a policy.
|
||||
|
||||
It is possible to delete an archive policy if it is not used by any metric:
|
||||
|
||||
{{ scenarios['delete-archive-policy']['doc'] }}
|
||||
|
@ -45,6 +45,24 @@
|
||||
- name: list-archive-policy
|
||||
request: GET /v1/archive_policy HTTP/1.1
|
||||
|
||||
- name: update-archive-policy
|
||||
request: |
|
||||
PATCH /v1/archive_policy/{{ scenarios['create-archive-policy']['response'].json['name'] }} HTTP/1.1
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"definition": [
|
||||
{
|
||||
"granularity": "1s",
|
||||
"timespan": "1 hour"
|
||||
},
|
||||
{
|
||||
"points": 48,
|
||||
"timespan": "1 day"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
- name: create-archive-policy-to-delete
|
||||
request: |
|
||||
POST /v1/archive_policy HTTP/1.1
|
||||
|
@ -20,6 +20,7 @@
|
||||
"get archive policy": "",
|
||||
"list archive policy": "",
|
||||
"create archive policy": "role:admin",
|
||||
"update archive policy": "role:admin",
|
||||
"delete archive policy": "role:admin",
|
||||
|
||||
"create archive policy rule": "role:admin",
|
||||
|
@ -115,6 +115,16 @@ class NoSuchArchivePolicy(IndexerException):
|
||||
self.archive_policy = archive_policy
|
||||
|
||||
|
||||
class UnsupportedArchivePolicyChange(IndexerException):
|
||||
"""Error raised when modifying archive policy if not supported."""
|
||||
def __init__(self, archive_policy, message):
|
||||
super(UnsupportedArchivePolicyChange, self).__init__(
|
||||
"Archive policy %s does not support change: %s" %
|
||||
(archive_policy, message))
|
||||
self.archive_policy = archive_policy
|
||||
self.message = message
|
||||
|
||||
|
||||
class ArchivePolicyInUse(IndexerException):
|
||||
"""Error raised when an archive policy is still being used."""
|
||||
def __init__(self, archive_policy):
|
||||
@ -284,6 +294,10 @@ class IndexerDriver(object):
|
||||
def get_archive_policy(name):
|
||||
raise exceptions.NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def update_archive_policy(name, ap_items):
|
||||
raise exceptions.NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def delete_archive_policy(name):
|
||||
raise exceptions.NotImplementedError
|
||||
|
@ -311,6 +311,26 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
|
||||
with self.facade.independent_reader() as session:
|
||||
return session.query(ArchivePolicy).get(name)
|
||||
|
||||
def update_archive_policy(self, name, ap_items):
|
||||
with self.facade.independent_writer() as session:
|
||||
ap = session.query(ArchivePolicy).get(name)
|
||||
if not ap:
|
||||
raise indexer.NoSuchArchivePolicy(name)
|
||||
current = sorted(ap.definition,
|
||||
key=operator.attrgetter('granularity'))
|
||||
new = sorted(ap_items, key=operator.attrgetter('granularity'))
|
||||
if len(current) != len(new):
|
||||
raise indexer.UnsupportedArchivePolicyChange(
|
||||
name, 'Cannot add or drop granularities')
|
||||
for c, n in zip(current, new):
|
||||
if c.granularity != n.granularity:
|
||||
raise indexer.UnsupportedArchivePolicyChange(
|
||||
name, '%s granularity interval was changed'
|
||||
% c.granularity)
|
||||
# NOTE(gordc): ORM doesn't update JSON column unless new
|
||||
ap.definition = ap_items
|
||||
return ap
|
||||
|
||||
def delete_archive_policy(self, name):
|
||||
constraints = [
|
||||
"fk_metric_ap_name_ap_name",
|
||||
|
@ -257,6 +257,33 @@ class ArchivePolicyController(rest.RestController):
|
||||
return ap
|
||||
abort(404, indexer.NoSuchArchivePolicy(self.archive_policy))
|
||||
|
||||
@pecan.expose('json')
|
||||
def patch(self):
|
||||
ap = pecan.request.indexer.get_archive_policy(self.archive_policy)
|
||||
if not ap:
|
||||
abort(404, indexer.NoSuchArchivePolicy(self.archive_policy))
|
||||
enforce("update archive policy", ap)
|
||||
|
||||
body = deserialize_and_validate(voluptuous.Schema({
|
||||
voluptuous.Required("definition"):
|
||||
voluptuous.All([{
|
||||
"granularity": Timespan,
|
||||
"points": PositiveNotNullInt,
|
||||
"timespan": Timespan}], voluptuous.Length(min=1)),
|
||||
}))
|
||||
# Validate the data
|
||||
try:
|
||||
ap_items = [archive_policy.ArchivePolicyItem(**item) for item in
|
||||
body['definition']]
|
||||
except ValueError as e:
|
||||
abort(400, e)
|
||||
|
||||
try:
|
||||
return pecan.request.indexer.update_archive_policy(
|
||||
self.archive_policy, ap_items)
|
||||
except indexer.UnsupportedArchivePolicyChange as e:
|
||||
abort(400, e)
|
||||
|
||||
@pecan.expose()
|
||||
def delete(self):
|
||||
# NOTE(jd) I don't think there's any point in fetching and passing the
|
||||
|
@ -189,6 +189,81 @@ tests:
|
||||
accept: text/plain
|
||||
status: 406
|
||||
|
||||
# Update archive policy
|
||||
|
||||
- name: patch archive policy with bad definition
|
||||
url: /v1/archive_policy/medium
|
||||
method: PATCH
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-roles: admin
|
||||
data:
|
||||
definition:
|
||||
- granularity: 1 second
|
||||
points: 50
|
||||
timespan: 1 hour
|
||||
- granularity: 2 second
|
||||
status: 400
|
||||
response_strings:
|
||||
- timespan ≠ granularity × points
|
||||
|
||||
- name: patch archive policy with missing granularity
|
||||
url: /v1/archive_policy/medium
|
||||
method: PATCH
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-roles: admin
|
||||
data:
|
||||
definition:
|
||||
- granularity: 1 second
|
||||
points: 50
|
||||
status: 400
|
||||
response_strings:
|
||||
- "Archive policy medium does not support change: Cannot add or drop granularities"
|
||||
|
||||
- name: patch archive policy with non-matching granularity
|
||||
url: /v1/archive_policy/medium
|
||||
method: PATCH
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-roles: admin
|
||||
data:
|
||||
definition:
|
||||
- granularity: 5 second
|
||||
points: 20
|
||||
- granularity: 2 second
|
||||
status: 400
|
||||
response_strings:
|
||||
- "Archive policy medium does not support change: 1.0 granularity interval was changed"
|
||||
|
||||
- name: patch archive policy
|
||||
url: /v1/archive_policy/medium
|
||||
method: PATCH
|
||||
request_headers:
|
||||
content-type: application/json
|
||||
x-roles: admin
|
||||
data:
|
||||
definition:
|
||||
- granularity: 1 second
|
||||
points: 50
|
||||
- granularity: 2 second
|
||||
status: 200
|
||||
response_json_paths:
|
||||
$.name: medium
|
||||
$.definition[0].granularity: "0:00:01"
|
||||
$.definition[0].points: 50
|
||||
$.definition[0].timespan: "0:00:50"
|
||||
|
||||
- name: get patched archive policy
|
||||
url: /v1/archive_policy/medium
|
||||
response_headers:
|
||||
content-type: /application/json/
|
||||
response_json_paths:
|
||||
$.name: medium
|
||||
$.definition[0].granularity: "0:00:01"
|
||||
$.definition[0].points: 50
|
||||
$.definition[0].timespan: "0:00:50"
|
||||
|
||||
# Unexpected methods
|
||||
|
||||
- name: post single archive
|
||||
|
@ -50,6 +50,52 @@ class TestIndexerDriver(tests_base.TestCase):
|
||||
{u'granularity': 86400, u'points': 30, u'timespan': 2592000}],
|
||||
'name': u'low'}, dict(ap))
|
||||
|
||||
def test_update_archive_policy(self):
|
||||
self.assertRaises(indexer.UnsupportedArchivePolicyChange,
|
||||
self.index.update_archive_policy, "low",
|
||||
[archive_policy.ArchivePolicyItem(granularity=300,
|
||||
points=10)])
|
||||
self.assertRaises(indexer.UnsupportedArchivePolicyChange,
|
||||
self.index.update_archive_policy, "low",
|
||||
[archive_policy.ArchivePolicyItem(granularity=300,
|
||||
points=12),
|
||||
archive_policy.ArchivePolicyItem(granularity=3600,
|
||||
points=12),
|
||||
archive_policy.ArchivePolicyItem(granularity=5,
|
||||
points=6)])
|
||||
ap = self.index.update_archive_policy(
|
||||
"low", [archive_policy.ArchivePolicyItem(granularity=300,
|
||||
points=6),
|
||||
archive_policy.ArchivePolicyItem(granularity=3600,
|
||||
points=24),
|
||||
archive_policy.ArchivePolicyItem(granularity=86400,
|
||||
points=30)])
|
||||
self.assertEqual({
|
||||
'back_window': 0,
|
||||
'aggregation_methods':
|
||||
set(self.conf.archive_policy.default_aggregation_methods),
|
||||
'definition': [
|
||||
{u'granularity': 300, u'points': 6, u'timespan': 1800},
|
||||
{u'granularity': 3600, u'points': 24, u'timespan': 86400},
|
||||
{u'granularity': 86400, u'points': 30, u'timespan': 2592000}],
|
||||
'name': u'low'}, dict(ap))
|
||||
ap = self.index.update_archive_policy(
|
||||
"low", [archive_policy.ArchivePolicyItem(granularity=300,
|
||||
points=12),
|
||||
archive_policy.ArchivePolicyItem(granularity=3600,
|
||||
points=24),
|
||||
archive_policy.ArchivePolicyItem(granularity=86400,
|
||||
points=30)])
|
||||
self.assertEqual({
|
||||
'back_window': 0,
|
||||
'aggregation_methods':
|
||||
set(self.conf.archive_policy.default_aggregation_methods),
|
||||
'definition': [
|
||||
{u'granularity': 300, u'points': 12, u'timespan': 3600},
|
||||
{u'granularity': 3600, u'points': 24, u'timespan': 86400},
|
||||
{u'granularity': 86400, u'points': 30, u'timespan': 2592000}],
|
||||
'name': u'low'}, dict(ap))
|
||||
|
||||
def test_delete_archive_policy(self):
|
||||
name = str(uuid.uuid4())
|
||||
self.index.create_archive_policy(
|
||||
|
@ -21,6 +21,7 @@ from oslo_utils import timeutils
|
||||
from oslotest import base
|
||||
import six.moves
|
||||
|
||||
from gnocchi import archive_policy
|
||||
from gnocchi import carbonara
|
||||
from gnocchi import storage
|
||||
from gnocchi.storage import _carbonara
|
||||
@ -520,6 +521,58 @@ class TestStorageDriver(tests_base.TestCase):
|
||||
{u"eq": 100},
|
||||
{u"≠": 50}]}))
|
||||
|
||||
def test_resize_policy(self):
|
||||
name = str(uuid.uuid4())
|
||||
ap = archive_policy.ArchivePolicy(name, 0, [(3, 5)])
|
||||
self.index.create_archive_policy(ap)
|
||||
m = storage.Metric(uuid.uuid4(), ap)
|
||||
self.index.create_metric(m.id, str(uuid.uuid4()),
|
||||
str(uuid.uuid4()), name)
|
||||
self.storage.add_measures(m, [
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 0), 1),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 5), 1),
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 10), 1),
|
||||
])
|
||||
self.storage.process_background_tasks(self.index, sync=True)
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 10), 5.0, 1.0),
|
||||
], self.storage.get_measures(m))
|
||||
# expand to more points
|
||||
self.index.update_archive_policy(
|
||||
name, [archive_policy.ArchivePolicyItem(granularity=5, points=6)])
|
||||
self.storage.add_measures(m, [
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 15), 1),
|
||||
])
|
||||
self.storage.process_background_tasks(self.index, sync=True)
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 10), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 15), 5.0, 1.0),
|
||||
], self.storage.get_measures(m))
|
||||
# shrink timespan
|
||||
self.index.update_archive_policy(
|
||||
name, [archive_policy.ArchivePolicyItem(granularity=5, points=2)])
|
||||
# unchanged after update if no samples
|
||||
self.storage.process_background_tasks(self.index, sync=True)
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 0), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 5), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 10), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 15), 5.0, 1.0),
|
||||
], self.storage.get_measures(m))
|
||||
# drop points
|
||||
self.storage.add_measures(m, [
|
||||
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 20), 1),
|
||||
])
|
||||
self.storage.process_background_tasks(self.index, sync=True)
|
||||
self.assertEqual([
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 15), 5.0, 1.0),
|
||||
(utils.datetime_utc(2014, 1, 1, 12, 0, 20), 5.0, 1.0),
|
||||
], self.storage.get_measures(m))
|
||||
|
||||
|
||||
class TestMeasureQuery(base.BaseTestCase):
|
||||
def test_equal(self):
|
||||
|
Loading…
Reference in New Issue
Block a user