diff --git a/monasca_persister/conf/influxdb.py b/monasca_persister/conf/influxdb.py index e0404632..0d09794f 100644 --- a/monasca_persister/conf/influxdb.py +++ b/monasca_persister/conf/influxdb.py @@ -20,6 +20,9 @@ influxdb_opts = [ cfg.StrOpt('database_name', help='database name where metrics are stored', default='mon'), + cfg.BoolOpt('db_per_tenant', + help='Whether to use a separate database per tenant', + default=False), cfg.HostAddressOpt('ip_address', help='Valid IP address or hostname ' 'to InfluxDB instance'), diff --git a/monasca_persister/repositories/abstract_repository.py b/monasca_persister/repositories/abstract_repository.py index 6e9301cd..9f909552 100644 --- a/monasca_persister/repositories/abstract_repository.py +++ b/monasca_persister/repositories/abstract_repository.py @@ -27,5 +27,5 @@ class AbstractRepository(object): pass @abc.abstractmethod - def write_batch(self, data_points): + def write_batch(self, data_points_by_tenant): pass diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py index 548b71c2..064be743 100644 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -53,9 +53,12 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep alarm_id.encode('utf8'), time_stamp) - return alarm_state_hist + return alarm_state_hist, tenant_id - def write_batch(self, alarm_state_hists): + def write_batch(self, alarm_state_hists_by_tenant): + # TODO(brtknr): At the moment, Cassandra does not have database per + # tenant implemented, so use chained list of values. + alarm_state_hists = alarm_state_hists_by_tenant.chained() while alarm_state_hists: num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size) batch = alarm_state_hists[:num_rows] diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py index d3e70151..5c1b4a60 100644 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -136,6 +136,8 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list)) metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest() + # TODO(brtknr): If database per tenant becomes the default and the + # only option, recording tenant_id will be redundant. metric = Metric(id=metric_id, region=region, tenant_id=tenant_id, @@ -165,7 +167,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) metric.dimension_names)) self._metric_batch.add_metric_query(metric_update_bound_stmt) - return metric + return metric, tenant_id self._metric_id_cache[metric.id] = metric.id @@ -216,19 +218,17 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) metric.time_stamp)) self._metric_batch.add_measurement_query(measurement_insert_bound_stmt) - return metric - - def write_batch(self, metrics): + return metric, tenant_id + def write_batch(self, metrics_by_tenant): + # TODO(brtknr): At the moment, Cassandra does not have database per + # tenant implemented, so join the list of values. + metrics = metrics_by_tenant.chained() with self._lock: batch_list = self._metric_batch.get_all_batches() - results = execute_concurrent(self._session, batch_list, raise_on_first_error=True) - self._handle_results(results) - self._metric_batch.clear() - LOG.info("flushed %s metrics", len(metrics)) @staticmethod diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 49354266..5f63a8da 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -19,6 +19,8 @@ import six from monasca_persister.repositories import abstract_repository +DATABASE_NOT_FOUND_MSG = "database not found" + @six.add_metaclass(abc.ABCMeta) class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): @@ -30,8 +32,30 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.ip_address, self.conf.influxdb.port, self.conf.influxdb.user, - self.conf.influxdb.password, - self.conf.influxdb.database_name) + self.conf.influxdb.password) - def write_batch(self, data_points): - self._influxdb_client.write_points(data_points, 'ms', protocol='line') + def write_batch(self, data_points_by_tenant): + if self.conf.influxdb.db_per_tenant: + for tenant_id, data_points in data_points_by_tenant.items(): + database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id) + self._write_batch(data_points, database) + else: + # NOTE (brtknr): Chain list of values to avoid multiple calls to + # database API endpoint (when db_per_tenant is False). + data_points = data_points_by_tenant.chained() + self._write_batch(data_points, self.conf.influxdb.database_name) + + def _write_batch(self, data_points, database): + # NOTE (brtknr): Loop twice to ensure database is created if missing. + for retry in range(2): + try: + self._influxdb_client.write_points(data_points, 'ms', + protocol='line', + database=database) + break + except influxdb.exceptions.InfluxDBClientError as ex: + if (str(ex).startswith(DATABASE_NOT_FOUND_MSG) and + self.conf.influxdb.db_per_tenant): + self._influxdb_client.create_database(database) + else: + raise diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py index 9e1da344..4bf30835 100644 --- a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -61,4 +61,4 @@ class AlarmStateHistInfluxdbRepository( LOG.debug(line) - return line + return line, tenant_id diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py index 508c045d..3917564a 100644 --- a/monasca_persister/repositories/influxdb/metrics_repository.py +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -36,6 +36,9 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): value_meta) = parse_measurement_message(message) tags = dimensions + + # TODO(brtknr): If database per tenant becomes the default and the only + # option, recording tenant_id will be redundant. tags[u'_tenant_id'] = tenant_id tags[u'_region'] = region @@ -61,4 +64,4 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): LOG.debug(data) - return data + return data, tenant_id diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index ca996317..10b9e7b4 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -24,12 +24,44 @@ from monasca_persister.repositories import singleton LOG = log.getLogger(__name__) +class DataPoints(dict): + + def __init__(self): + self.counter = 0 + + def __setitem__(self, key, value): + raise NotImplementedError('Use append(key, value) instead.') + + def __delitem__(self, key): + raise NotImplementedError('Use clear() instead.') + + def pop(self): + raise NotImplementedError('Use clear() instead.') + + def popitem(self): + raise NotImplementedError('Use clear() instead.') + + def update(self): + raise NotImplementedError('Use clear() instead.') + + def chained(self): + return [vi for vo in super(DataPoints, self).values() for vi in vo] + + def append(self, key, value): + super(DataPoints, self).setdefault(key, []).append(value) + self.counter += 1 + + def clear(self): + super(DataPoints, self).clear() + self.counter = 0 + @six.add_metaclass(singleton.Singleton) class Persister(six.with_metaclass(ABCMeta, object)): def __init__(self, kafka_conf, repository): - self._data_points = [] + self._data_points = DataPoints() + self._kafka_topic = kafka_conf.topic self._batch_size = kafka_conf.batch_size self.repository = repository() @@ -42,20 +74,20 @@ class Persister(six.with_metaclass(ABCMeta, object)): self.repository.write_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - len(self._data_points), self._kafka_topic)) + self._data_points.counter, self._kafka_topic)) - self._data_points = [] + self._data_points.clear() self._consumer.commit() except Exception as ex: if "partial write: points beyond retention policy dropped" in str(ex): LOG.warning("Some points older than retention policy were dropped") - self._data_points = [] + self._data_points.clear() self._consumer.commit() elif cfg.CONF.repositories.ignore_parse_point_error \ and "unable to parse" in str(ex): LOG.warning("Some points were unable to be parsed and were dropped") - self._data_points = [] + self._data_points.clear() self._consumer.commit() else: @@ -67,13 +99,13 @@ class Persister(six.with_metaclass(ABCMeta, object)): try: for message in self._consumer: try: - data_point = self.repository.process_message(message) - self._data_points.append(data_point) + data_point, tenant_id = self.repository.process_message(message) + self._data_points.append(tenant_id, data_point) except Exception: LOG.exception('Error processing message. Message is ' 'being dropped. {}'.format(message)) - if len(self._data_points) >= self._batch_size: + if self._data_points.counter >= self._batch_size: self._flush() except Exception: LOG.exception( diff --git a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py index c96c1e08..ddbea998 100644 --- a/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_cassandra_alarm_state_history_repository.py @@ -22,6 +22,8 @@ from oslo_config import cfg from monasca_persister.repositories.cassandra import alarm_state_history_repository from monasca_persister.repositories.cassandra import connection_util +from monasca_persister.repositories.persister import DataPoints + class TestAlarmStateHistoryRepo(base.BaseTestCase): def setUp(self): @@ -83,8 +85,9 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): b'"metric_definition":"dummy_definition"', b'"sub_alarm_state":"dummy_state"'] - output = self.alarm_state_hist_repo.process_message(message) + output, tenant_id = self.alarm_state_hist_repo.process_message(message) + self.assertEqual(tenant_id, 'dummytenantId') self.assertEqual(output[0], self.alarm_state_hist_repo._retention) self.assertEqual(output[1], b'"dummymetrics"') self.assertEqual(output[2], b'dummyoldState') @@ -103,5 +106,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase): cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1)) self._session, self._upsert_stmt = Mock(), Mock() - alarm_state_hists = ['elem'] - self.alarm_state_hist_repo.write_batch(alarm_state_hists) + alarm_state_hists_by_tenant = DataPoints() + alarm_state_hists_by_tenant.append('fake_tenant', 'elem') + self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant) diff --git a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py index 6f55e481..be603015 100644 --- a/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py +++ b/monasca_persister/tests/test_influxdb_alarm_state_history_repository.py @@ -64,7 +64,10 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase): '\\"metric_definition\\":\\"dummy_definition\\"', '\\"sub_alarm_state\\":\\"dummy_state\\"', '\\"current_values\\":\\"dummy_values\\"'] - actual_output = self.alarm_state_repo.process_message(message) + + actual_output, tenant_id = self.alarm_state_repo.process_message(message) + + self.assertEqual(tenant_id, 'dummytenantId') self.assertIn(expected_output, actual_output) for elem in expected_dict: self.assertIn(elem, actual_output) diff --git a/monasca_persister/tests/test_influxdb_metrics_repository.py b/monasca_persister/tests/test_influxdb_metrics_repository.py index 0aab456d..d1f36063 100644 --- a/monasca_persister/tests/test_influxdb_metrics_repository.py +++ b/monasca_persister/tests/test_influxdb_metrics_repository.py @@ -15,7 +15,6 @@ from mock import Mock from mock import patch -from six import string_types from oslotest import base from oslo_config import cfg @@ -35,7 +34,7 @@ class TestMetricInfluxdbRepository(base.BaseTestCase): metric = self._get_metric() with patch.object(cfg, 'CONF', return_value=None): metric_repo = MetricInfluxdbRepository() - self.assertIsInstance(metric_repo.process_message(metric), string_types) + self.assertIsInstance(metric_repo.process_message(metric), tuple) def _get_metric(self): metric = ''' diff --git a/monasca_persister/tests/test_persister_repo.py b/monasca_persister/tests/test_persister_repo.py index a3b77baa..7050bb24 100644 --- a/monasca_persister/tests/test_persister_repo.py +++ b/monasca_persister/tests/test_persister_repo.py @@ -23,6 +23,7 @@ from oslo_config import cfg from monasca_common.kafka import consumer from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister from monasca_persister.repositories.persister import LOG +from monasca_persister.repositories.persister import DataPoints class FakeException(Exception): @@ -84,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_if_consumer_is_faulty(self): with patch.object(os, '_exit', return_value=None) as mock_exit: - self.persister._data_points = [] + self.persister._data_points = DataPoints() self.persister._consumer = Mock(side_effect=FakeException) self.persister.run() mock_exit.assert_called_once_with(1) @@ -92,21 +93,22 @@ class TestPersisterRepo(base.BaseTestCase): def test_run_logs_exception_from_consumer(self): with patch.object(self.persister.repository, 'process_message', side_effect=FakeException): - self.persister._data_points = () + self.persister._data_points = DataPoints() self.persister._consumer = ['aa'] self.persister.run() self.mock_log_exception.assert_called() def test_run_commit_is_called_and_data_points_is_emptied(self): with patch.object(self.persister.repository, 'process_message', - return_value='message'): + return_value=('message', 'tenant_id')): with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer: - self.persister._data_points = ['a'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.persister._consumer.__iter__.return_value = ('aa', 'bb') self.persister._batch_size = 1 self.persister.run() mock_consumer.commit.assert_called() - self.assertEqual([], self.persister._data_points) + self.assertEqual(0, self.persister._data_points.counter) def test_flush_logs_warning_and_exception(self): exception_msgs = ['partial write: points beyond retention policy dropped', @@ -115,7 +117,8 @@ class TestPersisterRepo(base.BaseTestCase): return_value=True)): for elem in exception_msgs: with patch.object(LOG, 'info', side_effect=FakeException(elem)): - self.persister._data_points = ['some'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.persister._flush() self.mock_log_warning.assert_called() @@ -124,6 +127,7 @@ class TestPersisterRepo(base.BaseTestCase): with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error', return_value=False)): mock_log_info.side_effect.message = 'some msg' - self.persister._data_points = ['some'] + self.persister._data_points = DataPoints() + self.persister._data_points.append('fake_tenant_id', 'some') self.assertRaises(FakeException, self.persister._flush) self.mock_log_exception.assert_called() diff --git a/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml b/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml new file mode 100644 index 00000000..62ae5132 --- /dev/null +++ b/releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Configuration option `db_per_tenant` added for InfluxDB to allow + data points to be written to dedicated tenant database where the + `database_name` prefixes the tenant ID, e.g. monasca_tenantid.