Merge "Add tests for repositories/persister.py"

This commit is contained in:
Zuul 2019-05-30 15:09:03 +00:00 committed by Gerrit Code Review
commit 3841353945
1 changed files with 130 additions and 0 deletions

View File

@ -0,0 +1,130 @@
# (C) Copyright 2019 Fujitsu Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from mock import patch
from mock import Mock
from oslotest import base
from oslo_config import cfg
from monasca_common.kafka import consumer
from monasca_persister.repositories.persister import Persister
from monasca_persister.repositories.persister import LOG
class FakeException(Exception):
pass
class TestPersisterRepo(base.BaseTestCase):
def setUp(self):
super(TestPersisterRepo, self).setUp()
self._set_patchers()
self._set_mocks()
self.persister = Persister(self.mock_kafka, self.mock_zookeeper, Mock())
def _set_mocks(self):
self.mock_kafka = Mock()
self.mock_kafka.topic = 'topic'
self.mock_kafka.batch_size = 1
self.mock_kafka.zookeeper_path = ''
self.mock_kafka.group_id = 0
self.mock_kafka.max_wait_time_seconds = 0
self.mock_zookeeper = Mock(uri='')
self.mock_consumer_init = self.patch_kafka_init.start()
self.mock_client_init = self.patch_kafka_client_init.start()
self.mock_consumer_commit = self.patch_kafka_commit.start()
self.mock_log_warning = self.patch_log_warning.start()
self.mock_log_exception = self.patch_log_exception.start()
def _set_patchers(self):
self.patch_kafka_init = patch.object(consumer.KafkaConsumer, '__init__',
return_value=None)
self.patch_kafka_commit = patch.object(consumer.KafkaConsumer, 'commit',
return_value=FakeException())
self.patch_kafka_client_init = patch.object(consumer.kafka_client.KafkaClient, '__init__',
return_value=None)
self.patch_log_warning = patch.object(LOG, 'warning')
self.patch_log_exception = patch.object(LOG, 'exception')
def tearDown(self):
super(TestPersisterRepo, self).tearDown()
self.mock_consumer_init.reset_mock()
self.mock_client_init.reset_mock()
self.mock_consumer_commit.reset_mock()
self.mock_log_warning.reset_mock()
self.mock_log_exception.reset_mock()
self.patch_kafka_init.stop()
self.patch_kafka_commit.stop()
self.patch_kafka_client_init.stop()
self.patch_log_warning.stop()
self.patch_log_exception.stop()
def test_flush_if_data_points_is_none(self):
self.persister._data_points = None
self.assertIsNone(self.persister._flush())
def test_run_if_consumer_is_faulty(self):
with patch.object(os, '_exit', return_value=None) as mock_exit:
self.persister._data_points = []
self.persister._consumer = Mock(side_effect=FakeException)
self.persister.run()
mock_exit.assert_called_once_with(1)
def test_run_logs_exception_from_consumer(self):
with patch.object(self.persister.repository, 'process_message',
side_effect=FakeException):
self.persister._data_points = ()
self.persister._consumer = ['aa']
self.persister.run()
self.mock_log_exception.assert_called()
def test_run_commit_is_called_and_data_points_is_emptied(self):
with patch.object(self.persister.repository, 'process_message',
return_value='message'):
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
self.persister._data_points = ['a']
self.persister._consumer.__iter__.return_value = ['aa', 'bb']
self.persister._batch_size = 1
self.persister.run()
mock_consumer.commit.assert_called()
self.assertEqual([], self.persister._data_points)
@patch.object(LOG, 'info', side_effect=FakeException())
def test_flush_logs_warning_and_exception(self, mock_log_info):
exception_msgs = ['partial write: points beyond retention policy dropped',
'unable to parse points']
with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error',
return_value=True)):
for elem in exception_msgs:
mock_log_info.side_effect.message = elem
self.persister._data_points = ['some']
self.persister._flush()
self.mock_log_warning.assert_called()
@patch.object(LOG, 'info', side_effect=FakeException())
def test_flush_logs_exception(self, mock_log_info):
with(patch.object(cfg.CONF.repositories,
'ignore_parse_point_error', return_value=False)):
mock_log_info.side_effect.message = 'some msg'
self.persister._data_points = ['some']
self.assertRaises(FakeException, self.persister._flush)
self.mock_log_exception.assert_called()