Move data_points to repository class

data_points_class can be now declared as a List or a Dictionary. The
latter is used when writing measurements per tenant. This refactoring
avoids creating the dictionary and then chaining again its values in
case of ElasticSearch and Cassandra databases or if db_per_tenant option
is disabled. The change also fixes the events pipeline.

Change-Id: I25ea80eea714acb167e70f188ed229cc90531596
Story: 2006331
Task: 37211
This commit is contained in:
Witek Bedyk 2019-10-18 16:55:40 +02:00
parent 9354132284
commit e883b5bcda
13 changed files with 90 additions and 63 deletions

View File

@ -27,5 +27,5 @@ class AbstractRepository(object):
pass
@abc.abstractmethod
def write_batch(self, data_points_by_tenant):
def write_batch(self, data_points):
pass

View File

@ -20,9 +20,11 @@ import six
from monasca_persister.repositories import abstract_repository
from monasca_persister.repositories.cassandra import connection_util
from monasca_persister.repositories import data_points
conf = cfg.CONF
@six.add_metaclass(abc.ABCMeta)
class AbstractCassandraRepository(abstract_repository.AbstractRepository):
def __init__(self):
@ -33,3 +35,4 @@ class AbstractCassandraRepository(abstract_repository.AbstractRepository):
self._retention = conf.cassandra.retention_policy * 24 * 3600
self._cache_size = conf.cassandra.max_definition_cache_size
self._max_batches = conf.cassandra.max_batches
self.data_points_class = data_points.DataPointsAsList

View File

@ -55,10 +55,7 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep
return alarm_state_hist, tenant_id
def write_batch(self, alarm_state_hists_by_tenant):
# TODO(brtknr): At the moment, Cassandra does not have database per
# tenant implemented, so use chained list of values.
alarm_state_hists = alarm_state_hists_by_tenant.chained()
def write_batch(self, alarm_state_hists):
while alarm_state_hists:
num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
batch = alarm_state_hists[:num_rows]

View File

@ -220,10 +220,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
return metric, tenant_id
def write_batch(self, metrics_by_tenant):
# TODO(brtknr): At the moment, Cassandra does not have database per
# tenant implemented, so join the list of values.
metrics = metrics_by_tenant.chained()
def write_batch(self, metrics):
with self._lock:
batch_list = self._metric_batch.get_all_batches()
results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)

View File

@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class DataPointsAsList(list):
def append(self, key, value):
super(DataPointsAsList, self).append(value)
def get_count(self):
return len(self)
def clear(self):
del self[:]
class DataPointsAsDict(dict):
def __init__(self):
self.counter = 0
def __setitem__(self, key, value):
raise NotImplementedError('Use append(key, value) instead.')
def __delitem__(self, key):
raise NotImplementedError('Use clear() instead.')
def pop(self):
raise NotImplementedError('Use clear() instead.')
def popitem(self):
raise NotImplementedError('Use clear() instead.')
def update(self):
raise NotImplementedError('Use clear() instead.')
def append(self, key, value):
super(DataPointsAsDict, self).setdefault(key, []).append(value)
self.counter += 1
def clear(self):
super(DataPointsAsDict, self).clear()
self.counter = 0
def get_count(self):
return self.counter

View File

@ -21,6 +21,7 @@ from oslo_config import cfg
from oslo_log import log
from monasca_persister.repositories import abstract_repository
from monasca_persister.repositories import data_points
from monasca_persister.repositories import utils
LOG = log.getLogger(__name__)
@ -37,6 +38,7 @@ class ElasticSearchEventsRepository(abstract_repository.AbstractRepository):
sniffer_timeout=self.conf.sniffer_timeout,
max_retries=self.conf.max_retries
)
self.data_points_class = data_points.DataPointsAsList
def process_message(self, message):
return utils.parse_events_message(message)

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
import six
from monasca_persister.repositories import abstract_repository
from monasca_persister.repositories import data_points
DATABASE_NOT_FOUND_MSG = "database not found"
@ -33,16 +34,18 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
self.conf.influxdb.port,
self.conf.influxdb.user,
self.conf.influxdb.password)
def write_batch(self, data_points_by_tenant):
if self.conf.influxdb.db_per_tenant:
for tenant_id, data_points in data_points_by_tenant.items():
database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id)
self._write_batch(data_points, database)
self.data_points_class = data_points.DataPointsAsDict
else:
self.data_points_class = data_points.DataPointsAsList
def write_batch(self, data_points):
if self.conf.influxdb.db_per_tenant:
for tenant_id, tenant_data_points in data_points.items():
database = '%s_%s' % (self.conf.influxdb.database_name,
tenant_id)
self._write_batch(tenant_data_points, database)
else:
# NOTE (brtknr): Chain list of values to avoid multiple calls to
# database API endpoint (when db_per_tenant is False).
data_points = data_points_by_tenant.chained()
self._write_batch(data_points, self.conf.influxdb.database_name)
def _write_batch(self, data_points, database):

View File

@ -24,47 +24,16 @@ from monasca_persister.repositories import singleton
LOG = log.getLogger(__name__)
class DataPoints(dict):
def __init__(self):
self.counter = 0
def __setitem__(self, key, value):
raise NotImplementedError('Use append(key, value) instead.')
def __delitem__(self, key):
raise NotImplementedError('Use clear() instead.')
def pop(self):
raise NotImplementedError('Use clear() instead.')
def popitem(self):
raise NotImplementedError('Use clear() instead.')
def update(self):
raise NotImplementedError('Use clear() instead.')
def chained(self):
return [vi for vo in super(DataPoints, self).values() for vi in vo]
def append(self, key, value):
super(DataPoints, self).setdefault(key, []).append(value)
self.counter += 1
def clear(self):
super(DataPoints, self).clear()
self.counter = 0
@six.add_metaclass(singleton.Singleton)
class Persister(six.with_metaclass(ABCMeta, object)):
def __init__(self, kafka_conf, repository):
self._data_points = DataPoints()
self._kafka_topic = kafka_conf.topic
self._batch_size = kafka_conf.batch_size
self.repository = repository()
self._data_points = self.repository.data_points_class()
def _flush(self):
if not self._data_points:
@ -74,7 +43,7 @@ class Persister(six.with_metaclass(ABCMeta, object)):
self.repository.write_batch(self._data_points)
LOG.info("Processed {} messages from topic '{}'".format(
self._data_points.counter, self._kafka_topic))
self._data_points.get_count(), self._kafka_topic))
self._data_points.clear()
self._consumer.commit()
@ -105,7 +74,7 @@ class Persister(six.with_metaclass(ABCMeta, object)):
LOG.exception('Error processing message. Message is '
'being dropped. {}'.format(message))
if self._data_points.counter >= self._batch_size:
if self._data_points.get_count() >= self._batch_size:
self._flush()
except Exception:
LOG.exception(

View File

@ -104,4 +104,4 @@ def parse_events_message(message):
project_id = decoded_message['meta']['project_id']
dimensions = decoded_message['event']['dimensions']
return project_id, timestamp, event_type, payload, dimensions
return (project_id, timestamp, event_type, payload, dimensions), project_id

View File

@ -21,8 +21,7 @@ from oslo_config import cfg
from monasca_persister.repositories.cassandra import alarm_state_history_repository
from monasca_persister.repositories.cassandra import connection_util
from monasca_persister.repositories.persister import DataPoints
from monasca_persister.repositories import data_points
class TestAlarmStateHistoryRepo(base.BaseTestCase):
@ -106,6 +105,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1))
self._session, self._upsert_stmt = Mock(), Mock()
alarm_state_hists_by_tenant = DataPoints()
alarm_state_hists_by_tenant = data_points.DataPointsAsList()
alarm_state_hists_by_tenant.append('fake_tenant', 'elem')
self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant)

View File

@ -37,7 +37,8 @@ class TestEvents(base.BaseTestCase):
def test_parse_event(self):
event = self._load_event('event_1')
project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(event)
(project_id, timestamp, event_type, payload,
dimensions), _ = utils.parse_events_message(event)
self.assertEqual('de98fbff448f4f278a56e9929db70b03', project_id)
self.assertEqual('2017-06-01 09:15:11.494606', timestamp)
self.assertEqual('compute.instance.create.start', event_type)

View File

@ -22,8 +22,8 @@ from oslo_config import cfg
from monasca_common.kafka import consumer
from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister
from monasca_persister.repositories import data_points
from monasca_persister.repositories.persister import LOG
from monasca_persister.repositories.persister import DataPoints
class FakeException(Exception):
@ -85,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase):
def test_run_if_consumer_is_faulty(self):
with patch.object(os, '_exit', return_value=None) as mock_exit:
self.persister._data_points = DataPoints()
self.persister._data_points = data_points.DataPointsAsDict()
self.persister._consumer = Mock(side_effect=FakeException)
self.persister.run()
mock_exit.assert_called_once_with(1)
@ -93,7 +93,7 @@ class TestPersisterRepo(base.BaseTestCase):
def test_run_logs_exception_from_consumer(self):
with patch.object(self.persister.repository, 'process_message',
side_effect=FakeException):
self.persister._data_points = DataPoints()
self.persister._data_points = data_points.DataPointsAsDict()
self.persister._consumer = ['aa']
self.persister.run()
self.mock_log_exception.assert_called()
@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase):
with patch.object(self.persister.repository, 'process_message',
return_value=('message', 'tenant_id')):
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
self.persister._data_points = DataPoints()
self.persister._data_points = data_points.DataPointsAsDict()
self.persister._data_points.append('fake_tenant_id', 'some')
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
self.persister._batch_size = 1
@ -117,7 +117,7 @@ class TestPersisterRepo(base.BaseTestCase):
return_value=True)):
for elem in exception_msgs:
with patch.object(LOG, 'info', side_effect=FakeException(elem)):
self.persister._data_points = DataPoints()
self.persister._data_points = data_points.DataPointsAsDict()
self.persister._data_points.append('fake_tenant_id', 'some')
self.persister._flush()
self.mock_log_warning.assert_called()
@ -127,7 +127,7 @@ class TestPersisterRepo(base.BaseTestCase):
with(patch.object(cfg.CONF.repositories,
'ignore_parse_point_error', return_value=False)):
mock_log_info.side_effect.message = 'some msg'
self.persister._data_points = DataPoints()
self.persister._data_points = data_points.DataPointsAsDict()
self.persister._data_points.append('fake_tenant_id', 'some')
self.assertRaises(FakeException, self.persister._flush)
self.mock_log_exception.assert_called()

View File

@ -104,7 +104,8 @@ class TestUtils(base.BaseTestCase):
}
}"""
project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(message)
(project_id, timestamp, event_type, payload,
dimensions), _ = utils.parse_events_message(message)
self.assertEqual(project_id, "dummy_project_id")
self.assertEqual(timestamp, "dummy_timestamp")