From 774e981f8780b1584050767ea22c17c85aa92727 Mon Sep 17 00:00:00 2001 From: Bharat Kunwar Date: Sun, 4 Aug 2019 12:40:47 +0100 Subject: [PATCH] Implement separate db per tenancy At present, all time series are accumulated in the same database in InfluxDB. This makes queries slow for tenants that have less data. This patch enables the option to use separate database per tenancy. This task implements the changes on monasca-persister which handles write requests to InfluxDB database. Change-Id: I7de6e0faf069b889d3953583b29a876c3d82c62c Story: 2006331 Task: 36072 --- monasca_persister/conf/influxdb.py | 3 ++ .../repositories/abstract_repository.py | 2 +- .../alarm_state_history_repository.py | 7 ++- .../cassandra/metrics_repository.py | 16 +++---- .../influxdb/abstract_repository.py | 32 +++++++++++-- .../alarm_state_history_repository.py | 2 +- .../influxdb/metrics_repository.py | 5 +- monasca_persister/repositories/persister.py | 48 +++++++++++++++---- ...assandra_alarm_state_history_repository.py | 10 ++-- ...influxdb_alarm_state_history_repository.py | 5 +- .../tests/test_influxdb_metrics_repository.py | 3 +- .../tests/test_persister_repo.py | 18 ++++--- ...rt-for-db-per-tenant-6ada0c3979de6df8.yaml | 6 +++ 13 files changed, 119 insertions(+), 38 deletions(-) create mode 100644 releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml diff --git a/monasca_persister/conf/influxdb.py b/monasca_persister/conf/influxdb.py index e0404632..0d09794f 100644 --- a/monasca_persister/conf/influxdb.py +++ b/monasca_persister/conf/influxdb.py @@ -20,6 +20,9 @@ influxdb_opts = [ cfg.StrOpt('database_name', help='database name where metrics are stored', default='mon'), + cfg.BoolOpt('db_per_tenant', + help='Whether to use a separate database per tenant', + default=False), cfg.HostAddressOpt('ip_address', help='Valid IP address or hostname ' 'to InfluxDB instance'), diff --git a/monasca_persister/repositories/abstract_repository.py b/monasca_persister/repositories/abstract_repository.py index 6e9301cd..9f909552 100644 --- a/monasca_persister/repositories/abstract_repository.py +++ b/monasca_persister/repositories/abstract_repository.py @@ -27,5 +27,5 @@ class AbstractRepository(object): pass @abc.abstractmethod - def write_batch(self, data_points): + def write_batch(self, data_points_by_tenant): pass diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py index 548b71c2..064be743 100644 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -53,9 +53,12 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep alarm_id.encode('utf8'), time_stamp) - return alarm_state_hist + return alarm_state_hist, tenant_id - def write_batch(self, alarm_state_hists): + def write_batch(self, alarm_state_hists_by_tenant): + # TODO(brtknr): At the moment, Cassandra does not have database per + # tenant implemented, so use chained list of values. + alarm_state_hists = alarm_state_hists_by_tenant.chained() while alarm_state_hists: num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size) batch = alarm_state_hists[:num_rows] diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py index d3e70151..5c1b4a60 100644 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -136,6 +136,8 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list)) metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest() + # TODO(brtknr): If database per tenant becomes the default and the + # only option, recording tenant_id will be redundant. metric = Metric(id=metric_id, region=region, tenant_id=tenant_id, @@ -165,7 +167,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) metric.dimension_names)) self._metric_batch.add_metric_query(metric_update_bound_stmt) - return metric + return metric, tenant_id self._metric_id_cache[metric.id] = metric.id @@ -216,19 +218,17 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) metric.time_stamp)) self._metric_batch.add_measurement_query(measurement_insert_bound_stmt) - return metric - - def write_batch(self, metrics): + return metric, tenant_id + def write_batch(self, metrics_by_tenant): + # TODO(brtknr): At the moment, Cassandra does not have database per + # tenant implemented, so join the list of values. + metrics = metrics_by_tenant.chained() with self._lock: batch_list = self._metric_batch.get_all_batches() - results = execute_concurrent(self._session, batch_list, raise_on_first_error=True) - self._handle_results(results) - self._metric_batch.clear() - LOG.info("flushed %s metrics", len(metrics)) @staticmethod diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 49354266..5f63a8da 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -19,6 +19,8 @@ import six from monasca_persister.repositories import abstract_repository +DATABASE_NOT_FOUND_MSG = "database not found" + @six.add_metaclass(abc.ABCMeta) class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): @@ -30,8 +32,30 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.ip_address, self.conf.influxdb.port, self.conf.influxdb.user, - self.conf.influxdb.password, - self.conf.influxdb.database_name) + self.conf.influxdb.password) - def write_batch(self, data_points): - self._influxdb_client.write_points(data_points, 'ms', protocol='line') + def write_batch(self, data_points_by_tenant): + if self.conf.influxdb.db_per_tenant: + for tenant_id, data_points in data_points_by_tenant.items(): + database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id) + self._write_batch(data_points, database) + else: + # NOTE (brtknr): Chain list of values to avoid multiple calls to + # database API endpoint (when db_per_tenant is False). + data_points = data_points_by_tenant.chained() + self._write_batch(data_points, self.conf.influxdb.database_name) + + def _write_batch(self, data_points, database): + # NOTE (brtknr): Loop twice to ensure database is created if missing. + for retry in range(2): + try: + self._influxdb_client.write_points(data_points, 'ms', + protocol='line', + database=database) + break + except influxdb.exceptions.InfluxDBClientError as ex: + if (str(ex).startswith(DATABASE_NOT_FOUND_MSG) and + self.conf.influxdb.db_per_tenant): + self._influxdb_client.create_database(database) + else: + raise diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py index 9e1da344..4bf30835 100644 --- a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -61,4 +61,4 @@ class AlarmStateHistInfluxdbRepository( LOG.debug(line) - return line + return line, tenant_id diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py index 508c045d..3917564a 100644 --- a/monasca_persister/repositories/influxdb/metrics_repository.py +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -36,6 +36,9 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): value_meta) = parse_measurement_message(message) tags = dimensions + + # TODO(brtknr): If database per tenant becomes the default and the only + # option, recording tenant_id will be redundant. tags[u'_tenant_id'] = tenant_id tags[u'_region'] = region @@ -61,4 +64,4 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): LOG.debug(data) - return data + return data, tenant_id diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index ca996317..10b9e7b4 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -24,12 +24,44 @@ from monasca_persister.repositories import singleton LOG = log.getLogger(__name__) +class DataPoints(dict): + + def __init__(self): + self.counter = 0 + + def __setitem__(self, key, value): + raise NotImplementedError('Use append(key, value) instead.') + + def __delitem__(self, key): + raise NotImplementedError('Use clear() instead.') + + def pop(self): + raise NotImplementedError('Use clear() instead.') + + def popitem(self): + raise NotImplementedError('Use clear() instead.') + + def update(self): + raise NotImplementedError('Use clear() instead.') + + def chained(self): + return [vi for vo in super(DataPoints, self).values() for vi in vo] + + def append(self, key, value): + super(DataPoints, self).setdefault(key, []).append(value) + self.counter += 1 + + def clear(self): + super(DataPoints, self).clear() + self.counter = 0 + @six.add_metaclass(singleton.Singleton) class Persister(six.with_metaclass(ABCMeta, object)): def __init__(self, kafka_conf, repository): - self._data_points = [] + self._data_points = DataPoints() + self._kafka_topic = kafka_conf.topic self._batch_size = kafka_conf.batch_size self.repository = repository() @@ -42,20 +74,20 @@ class Persister(six.with_metaclass(ABCMeta, object)): self.repository.write_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - len(self._data_points), self._kafka_topic)) + self._data_points.counter, self._kafka_topic)) - self._data_points = [] + self._data_points.clear() self._consumer.commit() except Exception as ex: if "partial write: points beyond retention policy dropped" in str(ex): LOG.warning("Some points older than retention policy were dropped") - self._data_points = [] + self._data_points.clear() self._consumer.commit() elif cfg.CONF.repositories.ignore_parse_point_error \ and "unable to parse" in str(ex): LOG.warning("Some points were unable to be parsed and were dropped") - self._data_points = [] + self._data_points.clear() self._consumer.commit() else: @@ -67,13 +99,13 @@ class Persister(six.with_metaclass(ABCMeta, object)): try: for message in self._consumer: try: - data_point = self.repository.process_message(message) - self._data_points.append(data_point) + data_point, tenant_id = self.repository.process_message(message) + self._data_points.append(tenant_id, data_point) except Exception: LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if len(self._data_points) >= self._batch_size: + if self._data_points.counter >= self._batch_size: self._flush() except Exception: LOG.exception( diff --git a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py index c96c1e08..ddbea998 100644 --- a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py @@ -22,6 +22,8 @@ from oslo_config import cfg from monasca_persister.repositories.cassandra import alarm_state_history_repository from monasca_persister.repositories.cassandra import connection_util +from monasca_persister.repositories.persister import DataPoints + class TestAlarmStateHistoryRepo(base.BaseTestCase): def setUp(self): @@ -83,8 +85,9 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): b'"metric_definition":"dummy_definition"', b'"sub_alarm_state":"dummy_state"'] - output = self.alarm_state_hist_repo.process_message(message) + output, tenant_id = self.alarm_state_hist_repo.process_message(message) + self.assertEqual(tenant_id, 'dummytenantId') self.assertEqual(output[0], self.alarm_state_hist_repo._retention) self.assertEqual(output[1], b'"dummymetrics"') self.assertEqual(output[2], b'dummyoldState') @@ -103,5 +106,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1)) self._session, self._upsert_stmt = Mock(), Mock() - alarm_state_hists = ['elem'] - self.alarm_state_hist_repo.write_batch(alarm_state_hists) + alarm_state_hists_by_tenant = DataPoints() + alarm_state_hists_by_tenant.append('fake_tenant', 'elem') + self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant) diff --git a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py index 6f55e481..be603015 100644 --- a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py @@ -64,7 +64,10 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase): '\\"metric_definition\\":\\"dummy_definition\\"', '\\"sub_alarm_state\\":\\"dummy_state\\"', '\\"current_values\\":\\"dummy_values\\"'] - actual_output = self.alarm_state_repo.process_message(message) + + actual_output, tenant_id = self.alarm_state_repo.process_message(message) + + self.assertEqual(tenant_id, 'dummytenantId') self.assertIn(expected_output, actual_output) for elem in expected_dict: self.assertIn(elem, actual_output) diff --git a/monasca_persister/tests/test_influxdb_metrics_repository.py b/monasca_persister/tests/test_influxdb_metrics_repository.py index 0aab456d..d1f36063 100644 --- a/monasca_persister/tests/test_influxdb_metrics_repository.py +++ b/monasca_persister/tests/test_influxdb_metrics_repository.py @@ -15,7 +15,6 @@ from mock import Mock from mock import patch -from six import string_types from oslotest import base from oslo_config import cfg @@ -35,7 +34,7 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): metric = self._get_metric() with patch.object(cfg, 'CONF', return_value=None): metric_repo = MetricInfluxdbRepository() - self.assertIsInstance(metric_repo.process_message(metric), string_types) + self.assertIsInstance(metric_repo.process_message(metric), tuple) def _get_metric(self): metric = ''' diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py index a3b77baa..7050bb24 100644 --- a/monasca_persister/tests/test_persister_repo.py +++ b/monasca_persister/tests/test_persister_repo.py @@ -23,6 +23,7 @@ from oslo_config import cfg from monasca_common.kafka import consumer from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister from monasca_persister.repositories.persister import LOG +from monasca_persister.repositories.persister import DataPoints class FakeException(Exception): @@ -84,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_if_consumer_is_faulty(self): with patch.object(os, '_exit', return_value=None) as mock_exit: - self.persister._data_points = [] + self.persister._data_points = DataPoints() self.persister._consumer = Mock(side_effect=FakeException) self.persister.run() mock_exit.assert_called_once_with(1) @@ -92,21 +93,22 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_logs_exception_from_consumer(self): with patch.object(self.persister.repository, 'process_message', side_effect=FakeException): - self.persister._data_points = () + self.persister._data_points = DataPoints() self.persister._consumer = ['aa'] self.persister.run() self.mock_log_exception.assert_called() def test_run_commit_is_called_and_data_points_is_emptied(self): with patch.object(self.persister.repository, 'process_message', - return_value='message'): + return_value=('message', 'tenant_id')): with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: - self.persister._data_points = ['a'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.persister._consumer.__iter__.return_value = ('aa', 'bb') self.persister._batch_size = 1 self.persister.run() mock_consumer.commit.assert_called() - self.assertEqual([], self.persister._data_points) + self.assertEqual(0, self.persister._data_points.counter) def test_flush_logs_warning_and_exception(self): exception_msgs = ['partial write: points beyond retention policy dropped', @@ -115,7 +117,8 @@ class TestPersisterRepo(base.BaseTestCase): return_value=True)): for elem in exception_msgs: with patch.object(LOG, 'info', side_effect=FakeException(elem)): - self.persister._data_points = ['some'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.persister._flush() self.mock_log_warning.assert_called() @@ -124,6 +127,7 @@ class TestPersisterRepo(base.BaseTestCase): with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error', return_value=False)): mock_log_info.side_effect.message = 'some msg' - self.persister._data_points = ['some'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.assertRaises(FakeException, self.persister._flush) self.mock_log_exception.assert_called() diff --git a/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml b/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml new file mode 100644 index 00000000..62ae5132 --- /dev/null +++ b/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Configuration option `db_per_tenant` added for InfluxDB to allow + data points to be written to dedicated tenant database where the + `database_name` prefixes the tenant ID, e.g. monasca_tenantid.