Merge "Move data_points to repository class"
This commit is contained in:
commit
a1f487188b
|
@ -27,5 +27,5 @@ class AbstractRepository(object):
|
|||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def write_batch(self, data_points_by_tenant):
|
||||
def write_batch(self, data_points):
|
||||
pass
|
||||
|
|
|
@ -20,9 +20,11 @@ import six
|
|||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
from monasca_persister.repositories.cassandra import connection_util
|
||||
from monasca_persister.repositories import data_points
|
||||
|
||||
conf = cfg.CONF
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AbstractCassandraRepository(abstract_repository.AbstractRepository):
|
||||
def __init__(self):
|
||||
|
@ -33,3 +35,4 @@ class AbstractCassandraRepository(abstract_repository.AbstractRepository):
|
|||
self._retention = conf.cassandra.retention_policy * 24 * 3600
|
||||
self._cache_size = conf.cassandra.max_definition_cache_size
|
||||
self._max_batches = conf.cassandra.max_batches
|
||||
self.data_points_class = data_points.DataPointsAsList
|
||||
|
|
|
@ -55,10 +55,7 @@ class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRep
|
|||
|
||||
return alarm_state_hist, tenant_id
|
||||
|
||||
def write_batch(self, alarm_state_hists_by_tenant):
|
||||
# TODO(brtknr): At the moment, Cassandra does not have database per
|
||||
# tenant implemented, so use chained list of values.
|
||||
alarm_state_hists = alarm_state_hists_by_tenant.chained()
|
||||
def write_batch(self, alarm_state_hists):
|
||||
while alarm_state_hists:
|
||||
num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
|
||||
batch = alarm_state_hists[:num_rows]
|
||||
|
|
|
@ -220,10 +220,7 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository)
|
|||
|
||||
return metric, tenant_id
|
||||
|
||||
def write_batch(self, metrics_by_tenant):
|
||||
# TODO(brtknr): At the moment, Cassandra does not have database per
|
||||
# tenant implemented, so join the list of values.
|
||||
metrics = metrics_by_tenant.chained()
|
||||
def write_batch(self, metrics):
|
||||
with self._lock:
|
||||
batch_list = self._metric_batch.get_all_batches()
|
||||
results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class DataPointsAsList(list):
|
||||
|
||||
def append(self, key, value):
|
||||
super(DataPointsAsList, self).append(value)
|
||||
|
||||
def get_count(self):
|
||||
return len(self)
|
||||
|
||||
def clear(self):
|
||||
del self[:]
|
||||
|
||||
|
||||
class DataPointsAsDict(dict):
|
||||
|
||||
def __init__(self):
|
||||
self.counter = 0
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
raise NotImplementedError('Use append(key, value) instead.')
|
||||
|
||||
def __delitem__(self, key):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def pop(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def popitem(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def append(self, key, value):
|
||||
super(DataPointsAsDict, self).setdefault(key, []).append(value)
|
||||
self.counter += 1
|
||||
|
||||
def clear(self):
|
||||
super(DataPointsAsDict, self).clear()
|
||||
self.counter = 0
|
||||
|
||||
def get_count(self):
|
||||
return self.counter
|
|
@ -21,6 +21,7 @@ from oslo_config import cfg
|
|||
from oslo_log import log
|
||||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
from monasca_persister.repositories import data_points
|
||||
from monasca_persister.repositories import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -37,6 +38,7 @@ class ElasticSearchEventsRepository(abstract_repository.AbstractRepository):
|
|||
sniffer_timeout=self.conf.sniffer_timeout,
|
||||
max_retries=self.conf.max_retries
|
||||
)
|
||||
self.data_points_class = data_points.DataPointsAsList
|
||||
|
||||
def process_message(self, message):
|
||||
return utils.parse_events_message(message)
|
||||
|
|
|
@ -18,6 +18,7 @@ from oslo_config import cfg
|
|||
import six
|
||||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
from monasca_persister.repositories import data_points
|
||||
|
||||
DATABASE_NOT_FOUND_MSG = "database not found"
|
||||
|
||||
|
@ -33,16 +34,18 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
|
|||
self.conf.influxdb.port,
|
||||
self.conf.influxdb.user,
|
||||
self.conf.influxdb.password)
|
||||
|
||||
def write_batch(self, data_points_by_tenant):
|
||||
if self.conf.influxdb.db_per_tenant:
|
||||
for tenant_id, data_points in data_points_by_tenant.items():
|
||||
database = '%s_%s' % (self.conf.influxdb.database_name, tenant_id)
|
||||
self._write_batch(data_points, database)
|
||||
self.data_points_class = data_points.DataPointsAsDict
|
||||
else:
|
||||
self.data_points_class = data_points.DataPointsAsList
|
||||
|
||||
def write_batch(self, data_points):
|
||||
if self.conf.influxdb.db_per_tenant:
|
||||
for tenant_id, tenant_data_points in data_points.items():
|
||||
database = '%s_%s' % (self.conf.influxdb.database_name,
|
||||
tenant_id)
|
||||
self._write_batch(tenant_data_points, database)
|
||||
else:
|
||||
# NOTE (brtknr): Chain list of values to avoid multiple calls to
|
||||
# database API endpoint (when db_per_tenant is False).
|
||||
data_points = data_points_by_tenant.chained()
|
||||
self._write_batch(data_points, self.conf.influxdb.database_name)
|
||||
|
||||
def _write_batch(self, data_points, database):
|
||||
|
|
|
@ -24,47 +24,16 @@ from monasca_persister.repositories import singleton
|
|||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
class DataPoints(dict):
|
||||
|
||||
def __init__(self):
|
||||
self.counter = 0
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
raise NotImplementedError('Use append(key, value) instead.')
|
||||
|
||||
def __delitem__(self, key):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def pop(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def popitem(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError('Use clear() instead.')
|
||||
|
||||
def chained(self):
|
||||
return [vi for vo in super(DataPoints, self).values() for vi in vo]
|
||||
|
||||
def append(self, key, value):
|
||||
super(DataPoints, self).setdefault(key, []).append(value)
|
||||
self.counter += 1
|
||||
|
||||
def clear(self):
|
||||
super(DataPoints, self).clear()
|
||||
self.counter = 0
|
||||
|
||||
|
||||
@six.add_metaclass(singleton.Singleton)
|
||||
class Persister(six.with_metaclass(ABCMeta, object)):
|
||||
|
||||
def __init__(self, kafka_conf, repository):
|
||||
self._data_points = DataPoints()
|
||||
|
||||
self._kafka_topic = kafka_conf.topic
|
||||
self._batch_size = kafka_conf.batch_size
|
||||
self.repository = repository()
|
||||
self._data_points = self.repository.data_points_class()
|
||||
|
||||
def _flush(self):
|
||||
if not self._data_points:
|
||||
|
@ -74,7 +43,7 @@ class Persister(six.with_metaclass(ABCMeta, object)):
|
|||
self.repository.write_batch(self._data_points)
|
||||
|
||||
LOG.info("Processed {} messages from topic '{}'".format(
|
||||
self._data_points.counter, self._kafka_topic))
|
||||
self._data_points.get_count(), self._kafka_topic))
|
||||
|
||||
self._data_points.clear()
|
||||
self._consumer.commit()
|
||||
|
@ -105,7 +74,7 @@ class Persister(six.with_metaclass(ABCMeta, object)):
|
|||
LOG.exception('Error processing message. Message is '
|
||||
'being dropped. {}'.format(message))
|
||||
|
||||
if self._data_points.counter >= self._batch_size:
|
||||
if self._data_points.get_count() >= self._batch_size:
|
||||
self._flush()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
|
|
|
@ -104,4 +104,4 @@ def parse_events_message(message):
|
|||
project_id = decoded_message['meta']['project_id']
|
||||
dimensions = decoded_message['event']['dimensions']
|
||||
|
||||
return project_id, timestamp, event_type, payload, dimensions
|
||||
return (project_id, timestamp, event_type, payload, dimensions), project_id
|
||||
|
|
|
@ -21,8 +21,7 @@ from oslo_config import cfg
|
|||
|
||||
from monasca_persister.repositories.cassandra import alarm_state_history_repository
|
||||
from monasca_persister.repositories.cassandra import connection_util
|
||||
|
||||
from monasca_persister.repositories.persister import DataPoints
|
||||
from monasca_persister.repositories import data_points
|
||||
|
||||
|
||||
class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
|
@ -106,6 +105,6 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
|||
cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1))
|
||||
|
||||
self._session, self._upsert_stmt = Mock(), Mock()
|
||||
alarm_state_hists_by_tenant = DataPoints()
|
||||
alarm_state_hists_by_tenant = data_points.DataPointsAsList()
|
||||
alarm_state_hists_by_tenant.append('fake_tenant', 'elem')
|
||||
self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant)
|
||||
|
|
|
@ -37,7 +37,8 @@ class TestEvents(base.BaseTestCase):
|
|||
|
||||
def test_parse_event(self):
|
||||
event = self._load_event('event_1')
|
||||
project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(event)
|
||||
(project_id, timestamp, event_type, payload,
|
||||
dimensions), _ = utils.parse_events_message(event)
|
||||
self.assertEqual('de98fbff448f4f278a56e9929db70b03', project_id)
|
||||
self.assertEqual('2017-06-01 09:15:11.494606', timestamp)
|
||||
self.assertEqual('compute.instance.create.start', event_type)
|
||||
|
|
|
@ -22,8 +22,8 @@ from oslo_config import cfg
|
|||
|
||||
from monasca_common.kafka import consumer
|
||||
from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister
|
||||
from monasca_persister.repositories import data_points
|
||||
from monasca_persister.repositories.persister import LOG
|
||||
from monasca_persister.repositories.persister import DataPoints
|
||||
|
||||
|
||||
class FakeException(Exception):
|
||||
|
@ -85,7 +85,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
|||
|
||||
def test_run_if_consumer_is_faulty(self):
|
||||
with patch.object(os, '_exit', return_value=None) as mock_exit:
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._consumer = Mock(side_effect=FakeException)
|
||||
self.persister.run()
|
||||
mock_exit.assert_called_once_with(1)
|
||||
|
@ -93,7 +93,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
|||
def test_run_logs_exception_from_consumer(self):
|
||||
with patch.object(self.persister.repository, 'process_message',
|
||||
side_effect=FakeException):
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._consumer = ['aa']
|
||||
self.persister.run()
|
||||
self.mock_log_exception.assert_called()
|
||||
|
@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
|||
with patch.object(self.persister.repository, 'process_message',
|
||||
return_value=('message', 'tenant_id')):
|
||||
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
|
||||
self.persister._batch_size = 1
|
||||
|
@ -117,7 +117,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
|||
return_value=True)):
|
||||
for elem in exception_msgs:
|
||||
with patch.object(LOG, 'info', side_effect=FakeException(elem)):
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._flush()
|
||||
self.mock_log_warning.assert_called()
|
||||
|
@ -127,7 +127,7 @@ class TestPersisterRepo(base.BaseTestCase):
|
|||
with(patch.object(cfg.CONF.repositories,
|
||||
'ignore_parse_point_error', return_value=False)):
|
||||
mock_log_info.side_effect.message = 'some msg'
|
||||
self.persister._data_points = DataPoints()
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.assertRaises(FakeException, self.persister._flush)
|
||||
self.mock_log_exception.assert_called()
|
||||
|
|
|
@ -104,7 +104,8 @@ class TestUtils(base.BaseTestCase):
|
|||
}
|
||||
}"""
|
||||
|
||||
project_id, timestamp, event_type, payload, dimensions = utils.parse_events_message(message)
|
||||
(project_id, timestamp, event_type, payload,
|
||||
dimensions), _ = utils.parse_events_message(message)
|
||||
|
||||
self.assertEqual(project_id, "dummy_project_id")
|
||||
self.assertEqual(timestamp, "dummy_timestamp")
|
||||
|
|
Loading…
Reference in New Issue