From 97aab2f0044d53b965605b9986ad0f5133480aa0 Mon Sep 17 00:00:00 2001 From: Bharat Kunwar Date: Mon, 21 Oct 2019 11:53:53 +0100 Subject: [PATCH] Support default_retention_hours for influxdb At the moment, all data is retained infinitely. This change allows users to specify a default_retention_hours (defaults to 0 meaning unlimited retention) in the influxdb section of monasca persister configuration so that new projects are automatically assigned this retention policy when the given hour is greater than 0 to preserve original behaviour. Story: 2006331 Task: 37234 Change-Id: I4136df1d43954eb026a104f3f85b3a58197f5435 --- monasca_persister/conf/influxdb.py | 4 +++ .../influxdb/abstract_repository.py | 11 ++++++ .../tests/test_influxdb_metrics_repository.py | 36 ++++++++++++++++--- .../db-per-tenant/migrate-to-db-per-tenant.py | 16 +++++++-- ...ult-retention-policy-aaa446ebe6fc3cb5.yaml | 7 ++++ 5 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 releasenotes/notes/influxdb-default-retention-policy-aaa446ebe6fc3cb5.yaml diff --git a/monasca_persister/conf/influxdb.py b/monasca_persister/conf/influxdb.py index 0d09794f..f0fef3a0 100644 --- a/monasca_persister/conf/influxdb.py +++ b/monasca_persister/conf/influxdb.py @@ -23,6 +23,10 @@ influxdb_opts = [ cfg.BoolOpt('db_per_tenant', help='Whether to use a separate database per tenant', default=False), + cfg.IntOpt('default_retention_hours', + help='Default retention period in hours for new ' + 'databases automatically created by the persister', + default=0), cfg.HostAddressOpt('ip_address', help='Valid IP address or hostname ' 'to InfluxDB instance'), diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 2e851420..ccc69a66 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -61,5 +61,16 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): # to: {"error":"database not found: \"test\""} if DATABASE_NOT_FOUND_MSG in str(ex): self._influxdb_client.create_database(database) + # NOTE (brtknr): Only apply default retention policy at + # database creation time so that existing policies are + # not overridden since administrators may want different + # retention policy per tenant. + hours = self.conf.influxdb.default_retention_hours + if hours > 0: + rp = '{}h'.format(hours) + default_rp = dict(database=database, default=True, + name=rp, duration=rp, + replication='1') + self._influxdb_client.create_retention_policy(**default_rp) else: raise diff --git a/monasca_persister/tests/test_influxdb_metrics_repository.py b/monasca_persister/tests/test_influxdb_metrics_repository.py index a7fedd44..ec4c8a3b 100644 --- a/monasca_persister/tests/test_influxdb_metrics_repository.py +++ b/monasca_persister/tests/test_influxdb_metrics_repository.py @@ -47,9 +47,10 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): @patch.object(influxdb, 'InfluxDBClient') @patch.object(cfg, 'CONF', return_value=None) def _test_write_batch(self, mock_conf, mock_influxdb_client, - db_per_tenant, db_exists): + db_per_tenant, db_exists, hours=0): mock_conf.influxdb.database_name = db_name = 'db' mock_conf.influxdb.db_per_tenant = db_per_tenant + mock_conf.influxdb.default_retention_hours = hours t1 = u'fake_tenant_id_1' t2 = u'fake_tenant_id_2' m1 = self._get_metric(t1) @@ -59,23 +60,38 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): self._test_process_message(metrics_repo, data_points, m1, t1) self._test_process_message(metrics_repo, data_points, m2, t2) metrics_repo._influxdb_client = mock_influxdb_client + if db_exists: metrics_repo._influxdb_client.write_points = Mock() else: metrics_repo._influxdb_client.write_points = Mock( side_effect=[db_not_found, None, db_not_found, None]) + rp = '{}h'.format(hours) if db_per_tenant: - call1 = call('%s_%s' % (db_name, t1)) - call2 = call('%s_%s' % (db_name, t2)) - calls = [call1, call2] + db1 = '%s_%s' % (db_name, t1) + db2 = '%s_%s' % (db_name, t2) + rp1 = call(database=db1, default=True, + name=rp, duration=rp, replication='1') + rp2 = call(database=db2, default=True, + name=rp, duration=rp, replication='1') + calls = [call(db1), call(db2)] + rp_calls = [rp1, rp2] else: calls = [call(db_name)] + rp_calls = [call(database=db_name, default=True, + name=rp, duration=rp, replication='1')] metrics_repo.write_batch(data_points) if db_exists: mock_influxdb_client.create_database.assert_not_called() + mock_influxdb_client.create_retention_policy.assert_not_called() else: mock_influxdb_client.create_database.assert_has_calls( calls, any_order=True) + if hours > 0: + mock_influxdb_client.create_retention_policy.assert_has_calls( + rp_calls, any_order=True) + else: + mock_influxdb_client.create_retention_policy.assert_not_called() def _get_metric(self, tenant_id): metric = ''' @@ -110,8 +126,20 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): def test_write_batch(self): self._test_write_batch(db_per_tenant=False, db_exists=False) + def test_write_batch_db_exists_with_rp(self): + self._test_write_batch(db_per_tenant=False, db_exists=True, hours=2016) + + def test_write_batch_with_rp(self): + self._test_write_batch(db_per_tenant=False, db_exists=False, hours=2016) + def test_write_batch_db_per_tenant_db_exists(self): self._test_write_batch(db_per_tenant=True, db_exists=True) def test_write_batch_db_per_tenant(self): self._test_write_batch(db_per_tenant=True, db_exists=False) + + def test_write_batch_db_per_tenant_db_exists_with_rp(self): + self._test_write_batch(db_per_tenant=True, db_exists=True, hours=2016) + + def test_write_batch_db_per_tenant_with_rp(self): + self._test_write_batch(db_per_tenant=True, db_exists=False, hours=2016) diff --git a/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py b/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py index 3e1cb4b0..0167efd1 100755 --- a/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py +++ b/monasca_persister/tools/influxdb/db-per-tenant/migrate-to-db-per-tenant.py @@ -125,12 +125,17 @@ class MigrationHelper(object): def migrate(self, tenant_defaults={}, default_start_time_offset=0, # Default: now - default_end_time_offset=(520), # Default: 10 years + default_end_time_offset=520, # Default: 10 years skip_regex=[], measurements_file=None, success_file=None, failure_file=None, **kwargs): measurements = self.get_measurements(measurements_file) tenancy = self.get_tenancies(measurements) done = self.get_complete(success_file) + default_rp = {} + hours = self.conf.influxdb.default_retention_hours + if hours > 0: + rp = '{}h'.format(hours) + default_rp = dict(name=rp, duration=rp, replication='1', default=True) skip = set() fail = set() if failure_file: @@ -159,8 +164,15 @@ class MigrationHelper(object): end_time_offset = tenant_defaults.get( tenant_id, {}).get('end_time_offset_override', default_end_time_offset) + # NOTE (brtknr): Ensure that the default upper and lower + # time offsets are respected during migration by the + # projects with custom retention policies. + start_time_offset = max(default_start_time_offset, + start_time_offset) + end_time_offset = min(default_end_time_offset, + end_time_offset) retention_policy = tenant_defaults.get( - tenant_id, {}) .get('rp', {}) + tenant_id, {}).get('rp', default_rp) self._migrate(measurement, tenant_id, start_time_offset=start_time_offset, end_time_offset=end_time_offset, diff --git a/releasenotes/notes/influxdb-default-retention-policy-aaa446ebe6fc3cb5.yaml b/releasenotes/notes/influxdb-default-retention-policy-aaa446ebe6fc3cb5.yaml new file mode 100644 index 00000000..0c72a99f --- /dev/null +++ b/releasenotes/notes/influxdb-default-retention-policy-aaa446ebe6fc3cb5.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Support default_retention_hours config for InfluxDB for new + database that are created. Falls back to infinite retention as + before when unspecified or if the value is 0. Retention policy for + existing databases are not altered. Must be an integer.