Implement separate db per tenancy

At present, all time series are accumulated in the same database in
InfluxDB. This makes queries slow for tenants that have less data. This
patch enables the option to use separate database per tenancy.

This task implements the changes on monasca-persister which handles
write requests to InfluxDB database.

Change-Id: I7de6e0faf069b889d3953583b29a876c3d82c62c
Story: 2006331
Task: 36072
changes/84/674484/23
Bharat Kunwar 3 years ago
parent 2a76a04bcc
commit 774e981f87
  1. 3
      monasca_persister/conf/influxdb.py
  2. 2
      monasca_persister/repositories/abstract_repository.py
  3. 7
      monasca_persister/repositories/cassandra/alarm_state_history_repository.py
  4. 16
      monasca_persister/repositories/cassandra/metrics_repository.py
  5. 32
      monasca_persister/repositories/influxdb/abstract_repository.py
  6. 2
      monasca_persister/repositories/influxdb/alarm_state_history_repository.py
  7. 5
      monasca_persister/repositories/influxdb/metrics_repository.py
  8. 48
      monasca_persister/repositories/persister.py
  9. 10
      monasca_persister/tests/test_cassandra_alarm_state_history_repository.py
  10. 5
      monasca_persister/tests/test_influxdb_alarm_state_history_repository.py
  11. 3
      monasca_persister/tests/test_influxdb_metrics_repository.py
  12. 18
      monasca_persister/tests/test_persister_repo.py
  13. 6
      releasenotes/notes/influxdb-support-for-db-per-tenant-6ada0c3979de6df8.yaml

@ -20,6 +20,9 @@ influxdb_opts = [
cfg.StrOpt('database_name',
help='database name where metrics are stored',
default='mon'),
cfg.BoolOpt('db_per_tenant',
help='Whether to use a separate database per tenant',
default=False),
cfg.HostAddressOpt('ip_address',
help='Valid IP address or hostname '
'to InfluxDB instance'),

@ -27,5 +27,5 @@ class AbstractRepository(object):
pass
@abc.abstractmethod
def write_batch(self, data_points):
def write_batch(self, data_points_by_tenant):
pass

@ -53,9 +53,12 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep
alarm_id.encode('utf8'),
time_stamp)
return alarm_state_hist
return alarm_state_hist, tenant_id
def write_batch(self, alarm_state_hists):
def write_batch(self, alarm_state_hists_by_tenant):
# TODO(brtknr): At the moment, Cassandra does not have database per
# tenant implemented, so use chained list of values.
alarm_state_hists = alarm_state_hists_by_tenant.chained()
while alarm_state_hists:
num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
batch = alarm_state_hists[:num_rows]

@ -136,6 +136,8 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list))
metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest()
# TODO(brtknr): If database per tenant becomes the default and the
# only option, recording tenant_id will be redundant.
metric = Metric(id=metric_id,
region=region,
tenant_id=tenant_id,
@ -165,7 +167,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
metric.dimension_names))
self._metric_batch.add_metric_query(metric_update_bound_stmt)
return metric
return metric, tenant_id
self._metric_id_cache[metric.id] = metric.id
@ -216,19 +218,17 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
metric.time_stamp))
self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
return metric
def write_batch(self, metrics):
return metric, tenant_id
def write_batch(self, metrics_by_tenant):
# TODO(brtknr): At the moment, Cassandra does not have database per
# tenant implemented, so join the list of values.
metrics = metrics_by_tenant.chained()
with self._lock:
batch_list = self._metric_batch.get_all_batches()
results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
self._handle_results(results)
self._metric_batch.clear()
LOG.info("flushed %s metrics", len(metrics))
@staticmethod

@ -19,6 +19,8 @@ import six
from monasca_persister.repositories import abstract_repository
DATABASE_NOT_FOUND_MSG = "database not found"
@six.add_metaclass(abc.ABCMeta)
class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
@ -30,8 +32,30 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
self.conf.influxdb.ip_address,
self.conf.influxdb.port,
self.conf.influxdb.user,
self.conf.influxdb.password,
self.conf.influxdb.database_name)
self.conf.influxdb.password)
def write_batch(self, data_points_by_tenant):
if self.conf.influxdb.db_per_tenant:
for tenant_id, data_points in data_points_by_tenant.items():
database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id)
self._write_batch(data_points, database)
else:
# NOTE (brtknr): Chain list of values to avoid multiple calls to
# database API endpoint (when db_per_tenant is False).
data_points = data_points_by_tenant.chained()
self._write_batch(data_points, self.conf.influxdb.database_name)
def write_batch(self, data_points):
self._influxdb_client.write_points(data_points, 'ms', protocol='line')
def _write_batch(self, data_points, database):
# NOTE (brtknr): Loop twice to ensure database is created if missing.
for retry in range(2):
try:
self._influxdb_client.write_points(data_points, 'ms',
protocol='line',
database=database)
break
except influxdb.exceptions.InfluxDBClientError as ex:
if (str(ex).startswith(DATABASE_NOT_FOUND_MSG) and
self.conf.influxdb.db_per_tenant):
self._influxdb_client.create_database(database)
else:
raise

@ -61,4 +61,4 @@ class AlarmStateHistInfluxdbRepository(
LOG.debug(line)
return line
return line, tenant_id

@ -36,6 +36,9 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
value_meta) = parse_measurement_message(message)
tags = dimensions
# TODO(brtknr): If database per tenant becomes the default and the only
# option, recording tenant_id will be redundant.
tags[u'_tenant_id'] = tenant_id
tags[u'_region'] = region
@ -61,4 +64,4 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
LOG.debug(data)
return data
return data, tenant_id

@ -24,12 +24,44 @@ from monasca_persister.repositories import singleton
LOG = log.getLogger(__name__)
class DataPoints(dict):
def __init__(self):
self.counter = 0
def __setitem__(self, key, value):
raise NotImplementedError('Use append(key, value) instead.')
def __delitem__(self, key):
raise NotImplementedError('Use clear() instead.')
def pop(self):
raise NotImplementedError('Use clear() instead.')
def popitem(self):
raise NotImplementedError('Use clear() instead.')
def update(self):
raise NotImplementedError('Use clear() instead.')
def chained(self):
return [vi for vo in super(DataPoints, self).values() for vi in vo]
def append(self, key, value):
super(DataPoints, self).setdefault(key, []).append(value)
self.counter += 1
def clear(self):
super(DataPoints, self).clear()
self.counter = 0
@six.add_metaclass(singleton.Singleton)
class Persister(six.with_metaclass(ABCMeta, object)):
def __init__(self, kafka_conf, repository):
self._data_points = []
self._data_points = DataPoints()
self._kafka_topic = kafka_conf.topic
self._batch_size = kafka_conf.batch_size
self.repository = repository()
@ -42,20 +74,20 @@ class Persister(six.with_metaclass(ABCMeta, object)):
self.repository.write_batch(self._data_points)
LOG.info("Processed {} messages from topic '{}'".format(
len(self._data_points), self._kafka_topic))
self._data_points.counter, self._kafka_topic))
self._data_points = []
self._data_points.clear()
self._consumer.commit()
except Exception as ex:
if "partial write: points beyond retention policy dropped" in str(ex):
LOG.warning("Some points older than retention policy were dropped")
self._data_points = []
self._data_points.clear()
self._consumer.commit()
elif cfg.CONF.repositories.ignore_parse_point_error \
and "unable to parse" in str(ex):
LOG.warning("Some points were unable to be parsed and were dropped")
self._data_points = []
self._data_points.clear()
self._consumer.commit()
else:
@ -67,13 +99,13 @@ class Persister(six.with_metaclass(ABCMeta, object)):
try:
for message in self._consumer:
try:
data_point = self.repository.process_message(message)
self._data_points.append(data_point)
data_point, tenant_id = self.repository.process_message(message)
self._data_points.append(tenant_id, data_point)
except Exception:
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
if len(self._data_points) >= self._batch_size:
if self._data_points.counter >= self._batch_size:
self._flush()
except Exception:
LOG.exception(

@ -22,6 +22,8 @@ from oslo_config import cfg
from monasca_persister.repositories.cassandra import alarm_state_history_repository
from monasca_persister.repositories.cassandra import connection_util
from monasca_persister.repositories.persister import DataPoints
class TestAlarmStateHistoryRepo(base.BaseTestCase):
def setUp(self):
@ -83,8 +85,9 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
b'"metric_definition":"dummy_definition"',
b'"sub_alarm_state":"dummy_state"']
output = self.alarm_state_hist_repo.process_message(message)
output, tenant_id = self.alarm_state_hist_repo.process_message(message)
self.assertEqual(tenant_id, 'dummytenantId')
self.assertEqual(output[0], self.alarm_state_hist_repo._retention)
self.assertEqual(output[1], b'"dummymetrics"')
self.assertEqual(output[2], b'dummyoldState')
@ -103,5 +106,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1))
self._session, self._upsert_stmt = Mock(), Mock()
alarm_state_hists = ['elem']
self.alarm_state_hist_repo.write_batch(alarm_state_hists)
alarm_state_hists_by_tenant = DataPoints()
alarm_state_hists_by_tenant.append('fake_tenant', 'elem')
self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant)

@ -64,7 +64,10 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase):
'\\"metric_definition\\":\\"dummy_definition\\"',
'\\"sub_alarm_state\\":\\"dummy_state\\"',
'\\"current_values\\":\\"dummy_values\\"']
actual_output = self.alarm_state_repo.process_message(message)
actual_output, tenant_id = self.alarm_state_repo.process_message(message)
self.assertEqual(tenant_id, 'dummytenantId')
self.assertIn(expected_output, actual_output)
for elem in expected_dict:
self.assertIn(elem, actual_output)

@ -15,7 +15,6 @@
from mock import Mock
from mock import patch
from six import string_types
from oslotest import base
from oslo_config import cfg
@ -35,7 +34,7 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
metric = self._get_metric()
with patch.object(cfg, 'CONF', return_value=None):
metric_repo = MetricInfluxdbRepository()
self.assertIsInstance(metric_repo.process_message(metric), string_types)
self.assertIsInstance(metric_repo.process_message(metric), tuple)
def _get_metric(self):
metric = '''

@ -23,6 +23,7 @@ from oslo_config import cfg
from monasca_common.kafka import consumer
from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister
from monasca_persister.repositories.persister import LOG
from monasca_persister.repositories.persister import DataPoints
class FakeException(Exception):
@ -84,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase):
def test_run_if_consumer_is_faulty(self):
with patch.object(os, '_exit', return_value=None) as mock_exit:
self.persister._data_points = []
self.persister._data_points = DataPoints()
self.persister._consumer = Mock(side_effect=FakeException)
self.persister.run()
mock_exit.assert_called_once_with(1)
@ -92,21 +93,22 @@ class TestPersisterRepo(base.BaseTestCase):
def test_run_logs_exception_from_consumer(self):
with patch.object(self.persister.repository, 'process_message',
side_effect=FakeException):
self.persister._data_points = ()
self.persister._data_points = DataPoints()
self.persister._consumer = ['aa']
self.persister.run()
self.mock_log_exception.assert_called()
def test_run_commit_is_called_and_data_points_is_emptied(self):
with patch.object(self.persister.repository, 'process_message',
return_value='message'):
return_value=('message', 'tenant_id')):
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
self.persister._data_points = ['a']
self.persister._data_points = DataPoints()
self.persister._data_points.append('fake_tenant_id', 'some')
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
self.persister._batch_size = 1
self.persister.run()
mock_consumer.commit.assert_called()
self.assertEqual([], self.persister._data_points)
self.assertEqual(0, self.persister._data_points.counter)
def test_flush_logs_warning_and_exception(self):
exception_msgs = ['partial write: points beyond retention policy dropped',
@ -115,7 +117,8 @@ class TestPersisterRepo(base.BaseTestCase):
return_value=True)):
for elem in exception_msgs:
with patch.object(LOG, 'info', side_effect=FakeException(elem)):
self.persister._data_points = ['some']
self.persister._data_points = DataPoints()
self.persister._data_points.append('fake_tenant_id', 'some')
self.persister._flush()
self.mock_log_warning.assert_called()
@ -124,6 +127,7 @@ class TestPersisterRepo(base.BaseTestCase):
with(patch.object(cfg.CONF.repositories,
'ignore_parse_point_error', return_value=False)):
mock_log_info.side_effect.message = 'some msg'
self.persister._data_points = ['some']
self.persister._data_points = DataPoints()
self.persister._data_points.append('fake_tenant_id', 'some')
self.assertRaises(FakeException, self.persister._flush)
self.mock_log_exception.assert_called()

@ -0,0 +1,6 @@
---
features:
- |
Configuration option `db_per_tenant` added for InfluxDB to allow
data points to be written to dedicated tenant database where the
`database_name` prefixes the tenant ID, e.g. monasca_tenantid.
Loading…
Cancel
Save