Merge "Support default_retention_hours for influxdb"

This commit is contained in:
Zuul 2019-11-15 18:03:38 +00:00 committed by Gerrit Code Review
commit 9c736d3579
5 changed files with 68 additions and 6 deletions

View File

@ -23,6 +23,10 @@ influxdb_opts = [
cfg.BoolOpt('db_per_tenant',
help='Whether to use a separate database per tenant',
default=False),
cfg.IntOpt('default_retention_hours',
help='Default retention period in hours for new '
'databases automatically created by the persister',
default=0),
cfg.HostAddressOpt('ip_address',
help='Valid IP address or hostname '
'to InfluxDB instance'),

View File

@ -61,5 +61,16 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
# to: {"error":"database not found: \"test\""}
if DATABASE_NOT_FOUND_MSG in str(ex):
self._influxdb_client.create_database(database)
# NOTE (brtknr): Only apply default retention policy at
# database creation time so that existing policies are
# not overridden since administrators may want different
# retention policy per tenant.
hours = self.conf.influxdb.default_retention_hours
if hours > 0:
rp = '{}h'.format(hours)
default_rp = dict(database=database, default=True,
name=rp, duration=rp,
replication='1')
self._influxdb_client.create_retention_policy(**default_rp)
else:
raise

View File

@ -47,9 +47,10 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
@patch.object(influxdb, 'InfluxDBClient')
@patch.object(cfg, 'CONF', return_value=None)
def _test_write_batch(self, mock_conf, mock_influxdb_client,
db_per_tenant, db_exists):
db_per_tenant, db_exists, hours=0):
mock_conf.influxdb.database_name = db_name = 'db'
mock_conf.influxdb.db_per_tenant = db_per_tenant
mock_conf.influxdb.default_retention_hours = hours
t1 = u'fake_tenant_id_1'
t2 = u'fake_tenant_id_2'
m1 = self._get_metric(t1)
@ -59,23 +60,38 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
self._test_process_message(metrics_repo, data_points, m1, t1)
self._test_process_message(metrics_repo, data_points, m2, t2)
metrics_repo._influxdb_client = mock_influxdb_client
if db_exists:
metrics_repo._influxdb_client.write_points = Mock()
else:
metrics_repo._influxdb_client.write_points = Mock(
side_effect=[db_not_found, None, db_not_found, None])
rp = '{}h'.format(hours)
if db_per_tenant:
call1 = call('%s_%s' % (db_name, t1))
call2 = call('%s_%s' % (db_name, t2))
calls = [call1, call2]
db1 = '%s_%s' % (db_name, t1)
db2 = '%s_%s' % (db_name, t2)
rp1 = call(database=db1, default=True,
name=rp, duration=rp, replication='1')
rp2 = call(database=db2, default=True,
name=rp, duration=rp, replication='1')
calls = [call(db1), call(db2)]
rp_calls = [rp1, rp2]
else:
calls = [call(db_name)]
rp_calls = [call(database=db_name, default=True,
name=rp, duration=rp, replication='1')]
metrics_repo.write_batch(data_points)
if db_exists:
mock_influxdb_client.create_database.assert_not_called()
mock_influxdb_client.create_retention_policy.assert_not_called()
else:
mock_influxdb_client.create_database.assert_has_calls(
calls, any_order=True)
if hours > 0:
mock_influxdb_client.create_retention_policy.assert_has_calls(
rp_calls, any_order=True)
else:
mock_influxdb_client.create_retention_policy.assert_not_called()
def _get_metric(self, tenant_id):
metric = '''
@ -110,8 +126,20 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
def test_write_batch(self):
self._test_write_batch(db_per_tenant=False, db_exists=False)
def test_write_batch_db_exists_with_rp(self):
self._test_write_batch(db_per_tenant=False, db_exists=True, hours=2016)
def test_write_batch_with_rp(self):
self._test_write_batch(db_per_tenant=False, db_exists=False, hours=2016)
def test_write_batch_db_per_tenant_db_exists(self):
self._test_write_batch(db_per_tenant=True, db_exists=True)
def test_write_batch_db_per_tenant(self):
self._test_write_batch(db_per_tenant=True, db_exists=False)
def test_write_batch_db_per_tenant_db_exists_with_rp(self):
self._test_write_batch(db_per_tenant=True, db_exists=True, hours=2016)
def test_write_batch_db_per_tenant_with_rp(self):
self._test_write_batch(db_per_tenant=True, db_exists=False, hours=2016)

View File

@ -125,12 +125,17 @@ class MigrationHelper(object):
def migrate(self,
tenant_defaults={},
default_start_time_offset=0, # Default: now
default_end_time_offset=(520), # Default: 10 years
default_end_time_offset=520, # Default: 10 years
skip_regex=[],
measurements_file=None, success_file=None, failure_file=None, **kwargs):
measurements = self.get_measurements(measurements_file)
tenancy = self.get_tenancies(measurements)
done = self.get_complete(success_file)
default_rp = {}
hours = self.conf.influxdb.default_retention_hours
if hours > 0:
rp = '{}h'.format(hours)
default_rp = dict(name=rp, duration=rp, replication='1', default=True)
skip = set()
fail = set()
if failure_file:
@ -159,8 +164,15 @@ class MigrationHelper(object):
end_time_offset = tenant_defaults.get(
tenant_id, {}).get('end_time_offset_override',
default_end_time_offset)
# NOTE (brtknr): Ensure that the default upper and lower
# time offsets are respected during migration by the
# projects with custom retention policies.
start_time_offset = max(default_start_time_offset,
start_time_offset)
end_time_offset = min(default_end_time_offset,
end_time_offset)
retention_policy = tenant_defaults.get(
tenant_id, {}) .get('rp', {})
tenant_id, {}).get('rp', default_rp)
self._migrate(measurement, tenant_id,
start_time_offset=start_time_offset,
end_time_offset=end_time_offset,

View File

@ -0,0 +1,7 @@
---
features:
- |
Support default_retention_hours config for InfluxDB for new
database that are created. Falls back to infinite retention as
before when unspecified or if the value is 0. Retention policy for
existing databases are not altered. Must be an integer.