Support default_retention_hours for influxdb
At the moment, all data is retained infinitely. This change allows users to specify a default_retention_hours (defaults to 0 meaning unlimited retention) in the influxdb section of monasca persister configuration so that new projects are automatically assigned this retention policy when the given hour is greater than 0 to preserve original behaviour. Story: 2006331 Task: 37234 Change-Id: I4136df1d43954eb026a104f3f85b3a58197f5435
This commit is contained in:
parent
df12bd8628
commit
97aab2f004
|
@ -23,6 +23,10 @@ influxdb_opts = [
|
||||||
cfg.BoolOpt('db_per_tenant',
|
cfg.BoolOpt('db_per_tenant',
|
||||||
help='Whether to use a separate database per tenant',
|
help='Whether to use a separate database per tenant',
|
||||||
default=False),
|
default=False),
|
||||||
|
cfg.IntOpt('default_retention_hours',
|
||||||
|
help='Default retention period in hours for new '
|
||||||
|
'databases automatically created by the persister',
|
||||||
|
default=0),
|
||||||
cfg.HostAddressOpt('ip_address',
|
cfg.HostAddressOpt('ip_address',
|
||||||
help='Valid IP address or hostname '
|
help='Valid IP address or hostname '
|
||||||
'to InfluxDB instance'),
|
'to InfluxDB instance'),
|
||||||
|
|
|
@ -61,5 +61,16 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
|
||||||
# to: {"error":"database not found: \"test\""}
|
# to: {"error":"database not found: \"test\""}
|
||||||
if DATABASE_NOT_FOUND_MSG in str(ex):
|
if DATABASE_NOT_FOUND_MSG in str(ex):
|
||||||
self._influxdb_client.create_database(database)
|
self._influxdb_client.create_database(database)
|
||||||
|
# NOTE (brtknr): Only apply default retention policy at
|
||||||
|
# database creation time so that existing policies are
|
||||||
|
# not overridden since administrators may want different
|
||||||
|
# retention policy per tenant.
|
||||||
|
hours = self.conf.influxdb.default_retention_hours
|
||||||
|
if hours > 0:
|
||||||
|
rp = '{}h'.format(hours)
|
||||||
|
default_rp = dict(database=database, default=True,
|
||||||
|
name=rp, duration=rp,
|
||||||
|
replication='1')
|
||||||
|
self._influxdb_client.create_retention_policy(**default_rp)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -47,9 +47,10 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||||
@patch.object(influxdb, 'InfluxDBClient')
|
@patch.object(influxdb, 'InfluxDBClient')
|
||||||
@patch.object(cfg, 'CONF', return_value=None)
|
@patch.object(cfg, 'CONF', return_value=None)
|
||||||
def _test_write_batch(self, mock_conf, mock_influxdb_client,
|
def _test_write_batch(self, mock_conf, mock_influxdb_client,
|
||||||
db_per_tenant, db_exists):
|
db_per_tenant, db_exists, hours=0):
|
||||||
mock_conf.influxdb.database_name = db_name = 'db'
|
mock_conf.influxdb.database_name = db_name = 'db'
|
||||||
mock_conf.influxdb.db_per_tenant = db_per_tenant
|
mock_conf.influxdb.db_per_tenant = db_per_tenant
|
||||||
|
mock_conf.influxdb.default_retention_hours = hours
|
||||||
t1 = u'fake_tenant_id_1'
|
t1 = u'fake_tenant_id_1'
|
||||||
t2 = u'fake_tenant_id_2'
|
t2 = u'fake_tenant_id_2'
|
||||||
m1 = self._get_metric(t1)
|
m1 = self._get_metric(t1)
|
||||||
|
@ -59,23 +60,38 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||||
self._test_process_message(metrics_repo, data_points, m1, t1)
|
self._test_process_message(metrics_repo, data_points, m1, t1)
|
||||||
self._test_process_message(metrics_repo, data_points, m2, t2)
|
self._test_process_message(metrics_repo, data_points, m2, t2)
|
||||||
metrics_repo._influxdb_client = mock_influxdb_client
|
metrics_repo._influxdb_client = mock_influxdb_client
|
||||||
|
|
||||||
if db_exists:
|
if db_exists:
|
||||||
metrics_repo._influxdb_client.write_points = Mock()
|
metrics_repo._influxdb_client.write_points = Mock()
|
||||||
else:
|
else:
|
||||||
metrics_repo._influxdb_client.write_points = Mock(
|
metrics_repo._influxdb_client.write_points = Mock(
|
||||||
side_effect=[db_not_found, None, db_not_found, None])
|
side_effect=[db_not_found, None, db_not_found, None])
|
||||||
|
rp = '{}h'.format(hours)
|
||||||
if db_per_tenant:
|
if db_per_tenant:
|
||||||
call1 = call('%s_%s' % (db_name, t1))
|
db1 = '%s_%s' % (db_name, t1)
|
||||||
call2 = call('%s_%s' % (db_name, t2))
|
db2 = '%s_%s' % (db_name, t2)
|
||||||
calls = [call1, call2]
|
rp1 = call(database=db1, default=True,
|
||||||
|
name=rp, duration=rp, replication='1')
|
||||||
|
rp2 = call(database=db2, default=True,
|
||||||
|
name=rp, duration=rp, replication='1')
|
||||||
|
calls = [call(db1), call(db2)]
|
||||||
|
rp_calls = [rp1, rp2]
|
||||||
else:
|
else:
|
||||||
calls = [call(db_name)]
|
calls = [call(db_name)]
|
||||||
|
rp_calls = [call(database=db_name, default=True,
|
||||||
|
name=rp, duration=rp, replication='1')]
|
||||||
metrics_repo.write_batch(data_points)
|
metrics_repo.write_batch(data_points)
|
||||||
if db_exists:
|
if db_exists:
|
||||||
mock_influxdb_client.create_database.assert_not_called()
|
mock_influxdb_client.create_database.assert_not_called()
|
||||||
|
mock_influxdb_client.create_retention_policy.assert_not_called()
|
||||||
else:
|
else:
|
||||||
mock_influxdb_client.create_database.assert_has_calls(
|
mock_influxdb_client.create_database.assert_has_calls(
|
||||||
calls, any_order=True)
|
calls, any_order=True)
|
||||||
|
if hours > 0:
|
||||||
|
mock_influxdb_client.create_retention_policy.assert_has_calls(
|
||||||
|
rp_calls, any_order=True)
|
||||||
|
else:
|
||||||
|
mock_influxdb_client.create_retention_policy.assert_not_called()
|
||||||
|
|
||||||
def _get_metric(self, tenant_id):
|
def _get_metric(self, tenant_id):
|
||||||
metric = '''
|
metric = '''
|
||||||
|
@ -110,8 +126,20 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||||
def test_write_batch(self):
|
def test_write_batch(self):
|
||||||
self._test_write_batch(db_per_tenant=False, db_exists=False)
|
self._test_write_batch(db_per_tenant=False, db_exists=False)
|
||||||
|
|
||||||
|
def test_write_batch_db_exists_with_rp(self):
|
||||||
|
self._test_write_batch(db_per_tenant=False, db_exists=True, hours=2016)
|
||||||
|
|
||||||
|
def test_write_batch_with_rp(self):
|
||||||
|
self._test_write_batch(db_per_tenant=False, db_exists=False, hours=2016)
|
||||||
|
|
||||||
def test_write_batch_db_per_tenant_db_exists(self):
|
def test_write_batch_db_per_tenant_db_exists(self):
|
||||||
self._test_write_batch(db_per_tenant=True, db_exists=True)
|
self._test_write_batch(db_per_tenant=True, db_exists=True)
|
||||||
|
|
||||||
def test_write_batch_db_per_tenant(self):
|
def test_write_batch_db_per_tenant(self):
|
||||||
self._test_write_batch(db_per_tenant=True, db_exists=False)
|
self._test_write_batch(db_per_tenant=True, db_exists=False)
|
||||||
|
|
||||||
|
def test_write_batch_db_per_tenant_db_exists_with_rp(self):
|
||||||
|
self._test_write_batch(db_per_tenant=True, db_exists=True, hours=2016)
|
||||||
|
|
||||||
|
def test_write_batch_db_per_tenant_with_rp(self):
|
||||||
|
self._test_write_batch(db_per_tenant=True, db_exists=False, hours=2016)
|
||||||
|
|
|
@ -125,12 +125,17 @@ class MigrationHelper(object):
|
||||||
def migrate(self,
|
def migrate(self,
|
||||||
tenant_defaults={},
|
tenant_defaults={},
|
||||||
default_start_time_offset=0, # Default: now
|
default_start_time_offset=0, # Default: now
|
||||||
default_end_time_offset=(520), # Default: 10 years
|
default_end_time_offset=520, # Default: 10 years
|
||||||
skip_regex=[],
|
skip_regex=[],
|
||||||
measurements_file=None, success_file=None, failure_file=None, **kwargs):
|
measurements_file=None, success_file=None, failure_file=None, **kwargs):
|
||||||
measurements = self.get_measurements(measurements_file)
|
measurements = self.get_measurements(measurements_file)
|
||||||
tenancy = self.get_tenancies(measurements)
|
tenancy = self.get_tenancies(measurements)
|
||||||
done = self.get_complete(success_file)
|
done = self.get_complete(success_file)
|
||||||
|
default_rp = {}
|
||||||
|
hours = self.conf.influxdb.default_retention_hours
|
||||||
|
if hours > 0:
|
||||||
|
rp = '{}h'.format(hours)
|
||||||
|
default_rp = dict(name=rp, duration=rp, replication='1', default=True)
|
||||||
skip = set()
|
skip = set()
|
||||||
fail = set()
|
fail = set()
|
||||||
if failure_file:
|
if failure_file:
|
||||||
|
@ -159,8 +164,15 @@ class MigrationHelper(object):
|
||||||
end_time_offset = tenant_defaults.get(
|
end_time_offset = tenant_defaults.get(
|
||||||
tenant_id, {}).get('end_time_offset_override',
|
tenant_id, {}).get('end_time_offset_override',
|
||||||
default_end_time_offset)
|
default_end_time_offset)
|
||||||
|
# NOTE (brtknr): Ensure that the default upper and lower
|
||||||
|
# time offsets are respected during migration by the
|
||||||
|
# projects with custom retention policies.
|
||||||
|
start_time_offset = max(default_start_time_offset,
|
||||||
|
start_time_offset)
|
||||||
|
end_time_offset = min(default_end_time_offset,
|
||||||
|
end_time_offset)
|
||||||
retention_policy = tenant_defaults.get(
|
retention_policy = tenant_defaults.get(
|
||||||
tenant_id, {}) .get('rp', {})
|
tenant_id, {}).get('rp', default_rp)
|
||||||
self._migrate(measurement, tenant_id,
|
self._migrate(measurement, tenant_id,
|
||||||
start_time_offset=start_time_offset,
|
start_time_offset=start_time_offset,
|
||||||
end_time_offset=end_time_offset,
|
end_time_offset=end_time_offset,
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Support default_retention_hours config for InfluxDB for new
|
||||||
|
database that are created. Falls back to infinite retention as
|
||||||
|
before when unspecified or if the value is 0. Retention policy for
|
||||||
|
existing databases are not altered. Must be an integer.
|
Loading…
Reference in New Issue