Migrate from ujson to simplejson
The change updates the imports to use simplejson library and fixes three unit tests. In particular TestUtils.test_parse_alarm_state_hist_message has been slightly changed to make testing more robust. Also, Python 2 specific implemetation has been removed from influxdb.line_utils . Additionally, standard library unittest.mock is used instead of the third party mock lib. Depends-On: https://review.opendev.org/718014 Depends-On: https://review.opendev.org/720188 Change-Id: I64b1a60e8be929c25c005a0f429b1274cb8570e6 Story: 2007549 Task: 39390
This commit is contained in:
parent
26844c1dd0
commit
dc8956d7b8
@ -54,6 +54,7 @@ reno==2.5.0
|
||||
requests==2.14.2
|
||||
requestsexceptions==1.2.0
|
||||
rfc3986==0.3.1
|
||||
simplejson==3.8.1
|
||||
six==1.10.0
|
||||
smmap==0.9.0
|
||||
Sphinx===1.6.2
|
||||
|
@ -14,11 +14,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ujson as json
|
||||
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories.cassandra import abstract_repository
|
||||
from monasca_persister.repositories.utils import parse_alarm_state_hist_message
|
||||
|
@ -19,10 +19,10 @@ from cachetools import LRUCache
|
||||
from collections import namedtuple
|
||||
import hashlib
|
||||
import threading
|
||||
import ujson as json
|
||||
|
||||
from cassandra.concurrent import execute_concurrent
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories.cassandra import abstract_repository
|
||||
from monasca_persister.repositories.cassandra import token_range_query_manager
|
||||
|
@ -13,12 +13,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import ujson
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from elasticsearch import Elasticsearch
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories import abstract_repository
|
||||
from monasca_persister.repositories import data_points
|
||||
@ -61,7 +61,7 @@ class ElasticSearchEventsRepository(abstract_repository.AbstractRepository):
|
||||
self.es.create(
|
||||
index=index,
|
||||
doc_type='event',
|
||||
body=ujson.dumps(body)
|
||||
body=json.dumps(body)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
@ -12,9 +12,9 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import ujson as json
|
||||
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories.influxdb import abstract_repository
|
||||
from monasca_persister.repositories.influxdb import line_utils
|
||||
@ -38,26 +38,26 @@ class AlarmStateHistInfluxdbRepository(
|
||||
time_stamp) = parse_alarm_state_hist_message(
|
||||
message)
|
||||
|
||||
name = u'alarm_state_history'
|
||||
name = 'alarm_state_history'
|
||||
fields = []
|
||||
fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id))
|
||||
fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id))
|
||||
fields.append(u'metrics=' + line_utils.escape_value(
|
||||
fields.append('tenant_id=' + line_utils.escape_value(tenant_id))
|
||||
fields.append('alarm_id=' + line_utils.escape_value(alarm_id))
|
||||
fields.append('metrics=' + line_utils.escape_value(
|
||||
json.dumps(metrics, ensure_ascii=False)))
|
||||
fields.append(u'new_state=' + line_utils.escape_value(new_state))
|
||||
fields.append(u'old_state=' + line_utils.escape_value(old_state))
|
||||
fields.append(u'link=' + line_utils.escape_value(link))
|
||||
fields.append(u'lifecycle_state=' + line_utils.escape_value(
|
||||
fields.append('new_state=' + line_utils.escape_value(new_state))
|
||||
fields.append('old_state=' + line_utils.escape_value(old_state))
|
||||
fields.append('link=' + line_utils.escape_value(link))
|
||||
fields.append('lifecycle_state=' + line_utils.escape_value(
|
||||
lifecycle_state))
|
||||
fields.append(u'reason=' + line_utils.escape_value(
|
||||
fields.append('reason=' + line_utils.escape_value(
|
||||
state_change_reason))
|
||||
fields.append(u'reason_data=' + line_utils.escape_value("{}"))
|
||||
fields.append(u'sub_alarms=' + line_utils.escape_value(
|
||||
fields.append('reason_data=' + line_utils.escape_value("{}"))
|
||||
fields.append('sub_alarms=' + line_utils.escape_value(
|
||||
sub_alarms_json_snake_case))
|
||||
|
||||
line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id)
|
||||
line += u' ' + u','.join(fields)
|
||||
line += u' ' + str(int(time_stamp))
|
||||
line = name + ',tenant_id=' + line_utils.escape_tag(tenant_id)
|
||||
line += ' ' + ','.join(fields)
|
||||
line += ' ' + str(int(time_stamp))
|
||||
|
||||
LOG.debug(line)
|
||||
|
||||
|
@ -13,39 +13,26 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six import PY2
|
||||
|
||||
|
||||
def escape_tag(tag):
|
||||
tag = get_unicode(tag)
|
||||
return tag.replace(
|
||||
u"\\", u"\\\\"
|
||||
return str(tag).replace(
|
||||
"\\", "\\\\"
|
||||
).replace(
|
||||
u" ", u"\\ "
|
||||
" ", "\\ "
|
||||
).replace(
|
||||
u",", u"\\,"
|
||||
",", "\\,"
|
||||
).replace(
|
||||
u"=", u"\\="
|
||||
"=", "\\="
|
||||
)
|
||||
|
||||
|
||||
def get_unicode(data):
|
||||
if PY2:
|
||||
if isinstance(data, unicode):
|
||||
return data
|
||||
else:
|
||||
return data.decode('utf-8')
|
||||
else:
|
||||
return str(data)
|
||||
|
||||
|
||||
def escape_value(value):
|
||||
return u"\"{0}\"".format(
|
||||
get_unicode(value).replace(
|
||||
u"\\", u"\\\\"
|
||||
return "\"{0}\"".format(
|
||||
str(value).replace(
|
||||
"\\", "\\\\"
|
||||
).replace(
|
||||
u"\"", u"\\\""
|
||||
"\"", "\\\""
|
||||
).replace(
|
||||
u"\n", u"\\n"
|
||||
"\n", "\\n"
|
||||
)
|
||||
)
|
||||
|
@ -12,9 +12,9 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import ujson as json
|
||||
|
||||
from oslo_log import log
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories.influxdb import abstract_repository
|
||||
from monasca_persister.repositories.influxdb import line_utils
|
||||
|
@ -13,7 +13,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import ujson as json
|
||||
import simplejson as json
|
||||
|
||||
|
||||
def parse_measurement_message(message):
|
||||
|
@ -13,8 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from unittest import mock
|
||||
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
@ -44,12 +43,15 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
self.patch_conn_util_cluster.stop()
|
||||
|
||||
def _set_patchers(self):
|
||||
self.patch_cfg = patch.object(alarm_state_history_repository.abstract_repository, 'conf')
|
||||
self.patch_conn_util_cluster = patch.object(connection_util, 'create_cluster',
|
||||
return_value=None)
|
||||
self.patch_conn_util_session = patch.object(connection_util, 'create_session',
|
||||
return_value=Mock(
|
||||
prepare=Mock(return_value=None)))
|
||||
self.patch_cfg = mock.patch.object(
|
||||
alarm_state_history_repository.abstract_repository, 'conf')
|
||||
self.patch_conn_util_cluster = mock.patch.object(connection_util,
|
||||
'create_cluster',
|
||||
return_value=None)
|
||||
self.patch_conn_util_session =\
|
||||
mock.patch.object(connection_util, 'create_session',
|
||||
return_value=mock.Mock(prepare=mock.Mock(
|
||||
return_value=None)))
|
||||
|
||||
def _set_mocks(self):
|
||||
self.mock_cfg = self.patch_cfg.start()
|
||||
@ -57,7 +59,7 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
self.mock_conn_util_session = self.patch_conn_util_session.start()
|
||||
|
||||
def test_process_message(self):
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
message.value.return_value = """{
|
||||
"alarm-transitioned": {
|
||||
"alarmId": "dummyid",
|
||||
@ -79,10 +81,10 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
}"""
|
||||
self.alarm_state_hist_repo._retention = 0
|
||||
|
||||
expected_output = [b'"sub_alarm_expression":"dummy_sub_alarm"',
|
||||
b'"current_values":"dummy_values"',
|
||||
b'"metric_definition":"dummy_definition"',
|
||||
b'"sub_alarm_state":"dummy_state"']
|
||||
expected_output = [b'"sub_alarm_expression": "dummy_sub_alarm"',
|
||||
b'"current_values": "dummy_values"',
|
||||
b'"metric_definition": "dummy_definition"',
|
||||
b'"sub_alarm_state": "dummy_state"']
|
||||
|
||||
output, tenant_id = self.alarm_state_hist_repo.process_message(message)
|
||||
|
||||
@ -100,11 +102,12 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
self.assertEqual(output[9], 'dummytimestamp')
|
||||
|
||||
def test_write_batch(self):
|
||||
with patch.object(alarm_state_history_repository, 'execute_concurrent_with_args',
|
||||
return_value=0):
|
||||
cfg.CONF = Mock(kafka_alarm_history=Mock(batch_size=1))
|
||||
with mock.patch.object(alarm_state_history_repository,
|
||||
'execute_concurrent_with_args',
|
||||
return_value=0):
|
||||
cfg.CONF = mock.Mock(kafka_alarm_history=mock.Mock(batch_size=1))
|
||||
|
||||
self._session, self._upsert_stmt = Mock(), Mock()
|
||||
self._session, self._upsert_stmt = mock.Mock(), mock.Mock()
|
||||
alarm_state_hists_by_tenant = data_points.DataPointsAsList()
|
||||
alarm_state_hists_by_tenant.append('fake_tenant', 'elem')
|
||||
self.alarm_state_hist_repo.write_batch(alarm_state_hists_by_tenant)
|
||||
|
@ -13,8 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from unittest import mock
|
||||
|
||||
from cassandra.query import BatchStatement
|
||||
from cassandra.query import SimpleStatement
|
||||
@ -31,15 +30,16 @@ class TestMetricBatch(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestMetricBatch, self).setUp()
|
||||
self.bound_stmt = SimpleStatement("whatever")
|
||||
self.metric_batch = metric_batch.MetricBatch(Mock(), Mock(), 1)
|
||||
self.metric_batch = metric_batch.MetricBatch(mock.Mock(),
|
||||
mock.Mock(), 1)
|
||||
self._set_patchers()
|
||||
self._set_mocks()
|
||||
|
||||
def _set_patchers(self):
|
||||
self.patch_from_key = patch.object(self.metric_batch.metadata.token_map.token_class,
|
||||
'from_key')
|
||||
self.patch_lb_make_query_plan = patch.object(self.metric_batch.lb_policy,
|
||||
'make_query_plan')
|
||||
self.patch_from_key = mock.patch.object(
|
||||
self.metric_batch.metadata.token_map.token_class, 'from_key')
|
||||
self.patch_lb_make_query_plan = mock.patch.object(
|
||||
self.metric_batch.lb_policy, 'make_query_plan')
|
||||
|
||||
def _set_mocks(self):
|
||||
self.mock_key = self.patch_from_key.start()
|
||||
@ -144,21 +144,21 @@ class TestMetricBatch(base.BaseTestCase):
|
||||
def test_log_token_batch_map(self):
|
||||
query_map = {}
|
||||
self.metric_batch.batch_query_by_token(self.bound_stmt, query_map)
|
||||
with patch.object(metric_batch.LOG, 'info') as mock_log_info:
|
||||
with mock.patch.object(metric_batch.LOG, 'info') as mock_log_info:
|
||||
self.metric_batch.log_token_batch_map('name', query_map)
|
||||
mock_log_info.assert_called_with('name : Size: 1; Tokens: |token: 1|')
|
||||
|
||||
def test_log_replica_batch_map(self):
|
||||
query_map = {}
|
||||
with patch.object(self.metric_batch.lb_policy, 'make_query_plan'):
|
||||
with patch.object(metric_batch.LOG, 'info') as mock_log_info:
|
||||
with mock.patch.object(self.metric_batch.lb_policy, 'make_query_plan'):
|
||||
with mock.patch.object(metric_batch.LOG, 'info') as mock_log_info:
|
||||
self.metric_batch.batch_query_by_replicas(self.bound_stmt, query_map)
|
||||
self.metric_batch.log_replica_batch_map('name', query_map)
|
||||
mock_log_info.assert_called_with('name : Size: 1; Replicas: |: 1|')
|
||||
|
||||
def test_get_all_batches(self):
|
||||
with patch.object(self.metric_batch, 'log_token_batch_map'):
|
||||
with(patch.object(self.metric_batch, 'log_replica_batch_map')):
|
||||
with mock.patch.object(self.metric_batch, 'log_token_batch_map'):
|
||||
with(mock.patch.object(self.metric_batch, 'log_replica_batch_map')):
|
||||
sample_elements = ['measurement_query', 'metric_query', 'dimension_query',
|
||||
'dimension_metric_query', 'metric_dimension_query']
|
||||
self.metric_batch.measurement_queries.update({'some': sample_elements[0]})
|
||||
|
@ -14,8 +14,8 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from mock import patch
|
||||
from oslotest import base
|
||||
from testtools import matchers
|
||||
|
||||
@ -60,7 +60,7 @@ class TestEvents(base.BaseTestCase):
|
||||
|
||||
self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43'))
|
||||
|
||||
@patch('monasca_common.kafka.legacy_kafka_message')
|
||||
@mock.patch('monasca_common.kafka.legacy_kafka_message')
|
||||
def _load_event(self, event_name, mock_kafka_message):
|
||||
if self.events is None:
|
||||
filepath = os.path.join(os.path.dirname(__file__), 'events.json')
|
||||
|
@ -13,8 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from unittest import mock
|
||||
|
||||
from oslotest import base
|
||||
|
||||
@ -26,14 +25,15 @@ from monasca_persister.repositories.influxdb import abstract_repository
|
||||
class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestInfluxdbAlarmStateHistoryRepo, self).setUp()
|
||||
with patch.object(abstract_repository.cfg, 'CONF', return_value=Mock()):
|
||||
with mock.patch.object(abstract_repository.cfg, 'CONF',
|
||||
return_value=mock.Mock()):
|
||||
self.alarm_state_repo = AlarmStateHistInfluxdbRepository()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestInfluxdbAlarmStateHistoryRepo, self).tearDown()
|
||||
|
||||
def test_process_message(self):
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
|
||||
message.value.return_value = """{
|
||||
"alarm-transitioned": {
|
||||
@ -54,16 +54,16 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase):
|
||||
}
|
||||
}
|
||||
}"""
|
||||
expected_output = u'alarm_state_history,tenant_id=dummytenantId ' \
|
||||
u'tenant_id="dummytenantId",alarm_id="dummyid",' \
|
||||
u'metrics="\\"dummymetrics\\"",new_state="dummynewState"' \
|
||||
u',old_state="dummyoldState",link="dummylink",' \
|
||||
u'lifecycle_state="dummylifecycleState",' \
|
||||
u'reason="dummystateChangeReason",reason_data="{}"'
|
||||
expected_dict = ['\\"sub_alarm_expression\\":\\"dummy_sub_alarm\\"',
|
||||
'\\"metric_definition\\":\\"dummy_definition\\"',
|
||||
'\\"sub_alarm_state\\":\\"dummy_state\\"',
|
||||
'\\"current_values\\":\\"dummy_values\\"']
|
||||
expected_output = 'alarm_state_history,tenant_id=dummytenantId ' \
|
||||
'tenant_id="dummytenantId",alarm_id="dummyid",' \
|
||||
'metrics="\\"dummymetrics\\"",new_state="dummynewState"' \
|
||||
',old_state="dummyoldState",link="dummylink",' \
|
||||
'lifecycle_state="dummylifecycleState",' \
|
||||
'reason="dummystateChangeReason",reason_data="{}"'
|
||||
expected_dict = ['\\"sub_alarm_expression\\": \\"dummy_sub_alarm\\"',
|
||||
'\\"metric_definition\\": \\"dummy_definition\\"',
|
||||
'\\"sub_alarm_state\\": \\"dummy_state\\"',
|
||||
'\\"current_values\\": \\"dummy_values\\"']
|
||||
|
||||
actual_output, tenant_id = self.alarm_state_repo.process_message(message)
|
||||
|
||||
|
@ -13,19 +13,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import influxdb
|
||||
from influxdb.exceptions import InfluxDBClientError
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from mock import call
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from monasca_persister.repositories.influxdb.metrics_repository import MetricInfluxdbRepository
|
||||
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
|
||||
import six
|
||||
|
||||
db_not_found = InfluxDBClientError(
|
||||
content='{"error": "database not found: db"}', code=404)
|
||||
@ -44,8 +41,8 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||
self.assertEqual(_tenant, tenant)
|
||||
data_points.append(_tenant, _dp)
|
||||
|
||||
@patch.object(influxdb, 'InfluxDBClient')
|
||||
@patch.object(cfg, 'CONF', return_value=None)
|
||||
@mock.patch.object(influxdb, 'InfluxDBClient')
|
||||
@mock.patch.object(cfg, 'CONF', return_value=None)
|
||||
def _test_write_batch(self, mock_conf, mock_influxdb_client,
|
||||
db_per_tenant, db_exists, hours=0):
|
||||
mock_conf.influxdb.database_name = db_name = 'db'
|
||||
@ -62,24 +59,24 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||
metrics_repo._influxdb_client = mock_influxdb_client
|
||||
|
||||
if db_exists:
|
||||
metrics_repo._influxdb_client.write_points = Mock()
|
||||
metrics_repo._influxdb_client.write_points = mock.Mock()
|
||||
else:
|
||||
metrics_repo._influxdb_client.write_points = Mock(
|
||||
metrics_repo._influxdb_client.write_points = mock.Mock(
|
||||
side_effect=[db_not_found, None, db_not_found, None])
|
||||
rp = '{}h'.format(hours)
|
||||
if db_per_tenant:
|
||||
db1 = '%s_%s' % (db_name, t1)
|
||||
db2 = '%s_%s' % (db_name, t2)
|
||||
rp1 = call(database=db1, default=True,
|
||||
name=rp, duration=rp, replication='1')
|
||||
rp2 = call(database=db2, default=True,
|
||||
name=rp, duration=rp, replication='1')
|
||||
calls = [call(db1), call(db2)]
|
||||
rp1 = mock.call(database=db1, default=True, name=rp,
|
||||
duration=rp, replication='1')
|
||||
rp2 = mock.call(database=db2, default=True, name=rp,
|
||||
duration=rp, replication='1')
|
||||
calls = [mock.call(db1), mock.call(db2)]
|
||||
rp_calls = [rp1, rp2]
|
||||
else:
|
||||
calls = [call(db_name)]
|
||||
rp_calls = [call(database=db_name, default=True,
|
||||
name=rp, duration=rp, replication='1')]
|
||||
calls = [mock.call(db_name)]
|
||||
rp_calls = [mock.call(database=db_name, default=True,
|
||||
name=rp, duration=rp, replication='1')]
|
||||
metrics_repo.write_batch(data_points)
|
||||
if db_exists:
|
||||
mock_influxdb_client.create_database.assert_not_called()
|
||||
@ -116,7 +113,7 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
|
||||
"creation_time":1554725988
|
||||
}
|
||||
'''
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
message.value.return_value = metric
|
||||
return message
|
||||
|
||||
|
@ -10,10 +10,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import patch
|
||||
from mock import call
|
||||
from mock import Mock
|
||||
import signal
|
||||
from unittest import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslotest import base
|
||||
@ -41,12 +39,13 @@ class TestPersister(base.BaseTestCase):
|
||||
self._set_mocks()
|
||||
|
||||
def _set_patchers(self):
|
||||
self.sys_exit_patcher = patch.object(self.persister.sys, 'exit')
|
||||
self.log_patcher = patch.object(self.persister, 'log')
|
||||
self.simport_patcher = patch.object(self.persister, 'simport')
|
||||
self.cfg_patcher = patch.object(self.persister, 'cfg')
|
||||
self.sleep_patcher = patch.object(self.persister.time, 'sleep')
|
||||
self.process_patcher = patch.object(self.persister.multiprocessing, 'Process')
|
||||
self.sys_exit_patcher = mock.patch.object(self.persister.sys, 'exit')
|
||||
self.log_patcher = mock.patch.object(self.persister, 'log')
|
||||
self.simport_patcher = mock.patch.object(self.persister, 'simport')
|
||||
self.cfg_patcher = mock.patch.object(self.persister, 'cfg')
|
||||
self.sleep_patcher = mock.patch.object(self.persister.time, 'sleep')
|
||||
self.process_patcher = mock.patch.object(
|
||||
self.persister.multiprocessing, 'Process')
|
||||
|
||||
def _set_mocks(self):
|
||||
self.mock_sys_exit = self.sys_exit_patcher.start()
|
||||
@ -118,23 +117,25 @@ class TestPersister(base.BaseTestCase):
|
||||
|
||||
def test_active_children_are_killed_during_exit(self):
|
||||
|
||||
with patch.object(self.persister.multiprocessing, 'active_children') as active_children,\
|
||||
patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
with mock.patch.object(self.persister.multiprocessing,
|
||||
'active_children') as active_children,\
|
||||
mock.patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
|
||||
active_children.return_value = [Mock(name='child-1', pid=1),
|
||||
Mock(name='child-2', pid=2)]
|
||||
active_children.return_value = [mock.Mock(name='child-1', pid=1),
|
||||
mock.Mock(name='child-2', pid=2)]
|
||||
|
||||
self.persister.clean_exit(0)
|
||||
|
||||
mock_kill.assert_has_calls([call(1, signal.SIGKILL), call(2, signal.SIGKILL)])
|
||||
mock_kill.assert_has_calls([mock.call(1, signal.SIGKILL),
|
||||
mock.call(2, signal.SIGKILL)])
|
||||
|
||||
def test_active_children_kill_exception_is_ignored(self):
|
||||
|
||||
with patch.object(self.persister.multiprocessing,
|
||||
'active_children') as active_children, \
|
||||
patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
with mock.patch.object(self.persister.multiprocessing,
|
||||
'active_children') as active_children, \
|
||||
mock.patch.object(self.persister.os, 'kill') as mock_kill:
|
||||
|
||||
active_children.return_value = [Mock()]
|
||||
active_children.return_value = [mock.Mock()]
|
||||
mock_kill.side_effect = FakeException
|
||||
|
||||
self.persister.clean_exit(0)
|
||||
@ -185,11 +186,11 @@ class TestPersister(base.BaseTestCase):
|
||||
self._assert_process_terminate_called()
|
||||
|
||||
def test_start_process_handler_creates_and_runs_persister(self):
|
||||
fake_kafka_config = Mock()
|
||||
fake_repository = Mock()
|
||||
fake_kafka_config = mock.Mock()
|
||||
fake_repository = mock.Mock()
|
||||
|
||||
with patch('monasca_persister.kafka.legacy_kafka_persister'
|
||||
'.LegacyKafkaPersister') as mock_persister_class:
|
||||
with mock.patch('monasca_persister.kafka.legacy_kafka_persister.'
|
||||
'LegacyKafkaPersister') as mock_persister_class:
|
||||
self.persister.start_process(fake_repository, fake_kafka_config)
|
||||
|
||||
mock_persister_class.assert_called_once_with(
|
||||
@ -197,7 +198,7 @@ class TestPersister(base.BaseTestCase):
|
||||
|
||||
|
||||
def _get_process(name, is_alive=True):
|
||||
return Mock(name=name, is_alive=Mock(return_value=is_alive))
|
||||
return mock.Mock(name=name, is_alive=mock.Mock(return_value=is_alive))
|
||||
|
||||
|
||||
def _get_process_list(number_of_processes):
|
||||
|
@ -14,8 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
from mock import patch
|
||||
from mock import Mock
|
||||
from unittest import mock
|
||||
|
||||
from oslotest import base
|
||||
from oslo_config import cfg
|
||||
@ -37,16 +36,17 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
self._set_patchers()
|
||||
self._set_mocks()
|
||||
|
||||
self.persister = LegacyKafkaPersister(self.mock_kafka, self.mock_zookeeper, Mock())
|
||||
self.persister = LegacyKafkaPersister(self.mock_kafka,
|
||||
self.mock_zookeeper, mock.Mock())
|
||||
|
||||
def _set_mocks(self):
|
||||
self.mock_kafka = Mock()
|
||||
self.mock_kafka = mock.Mock()
|
||||
self.mock_kafka.topic = 'topic'
|
||||
self.mock_kafka.batch_size = 1
|
||||
self.mock_kafka.zookeeper_path = ''
|
||||
self.mock_kafka.group_id = 0
|
||||
self.mock_kafka.max_wait_time_seconds = 0
|
||||
self.mock_zookeeper = Mock(uri='')
|
||||
self.mock_zookeeper = mock.Mock(uri='')
|
||||
|
||||
self.mock_consumer_init = self.patch_kafka_init.start()
|
||||
self.mock_client_init = self.patch_kafka_client_init.start()
|
||||
@ -55,14 +55,17 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
self.mock_log_exception = self.patch_log_exception.start()
|
||||
|
||||
def _set_patchers(self):
|
||||
self.patch_kafka_init = patch.object(consumer.KafkaConsumer, '__init__',
|
||||
return_value=None)
|
||||
self.patch_kafka_commit = patch.object(consumer.KafkaConsumer, 'commit',
|
||||
return_value=FakeException())
|
||||
self.patch_kafka_client_init = patch.object(consumer.kafka_client.KafkaClient, '__init__',
|
||||
return_value=None)
|
||||
self.patch_log_warning = patch.object(LOG, 'warning')
|
||||
self.patch_log_exception = patch.object(LOG, 'exception')
|
||||
self.patch_kafka_init = mock.patch.object(consumer.KafkaConsumer,
|
||||
'__init__',
|
||||
return_value=None)
|
||||
self.patch_kafka_commit = \
|
||||
mock.patch.object(consumer.KafkaConsumer, 'commit',
|
||||
return_value=FakeException())
|
||||
self.patch_kafka_client_init = \
|
||||
mock.patch.object(consumer.kafka_client.KafkaClient, '__init__',
|
||||
return_value=None)
|
||||
self.patch_log_warning = mock.patch.object(LOG, 'warning')
|
||||
self.patch_log_exception = mock.patch.object(LOG, 'exception')
|
||||
|
||||
def tearDown(self):
|
||||
super(TestPersisterRepo, self).tearDown()
|
||||
@ -84,24 +87,25 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
self.assertIsNone(self.persister._flush())
|
||||
|
||||
def test_run_if_consumer_is_faulty(self):
|
||||
with patch.object(os, '_exit', return_value=None) as mock_exit:
|
||||
with mock.patch.object(os, '_exit', return_value=None) as mock_exit:
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._consumer = Mock(side_effect=FakeException)
|
||||
self.persister._consumer = mock.Mock(side_effect=FakeException)
|
||||
self.persister.run()
|
||||
mock_exit.assert_called_once_with(1)
|
||||
|
||||
def test_run_logs_exception_from_consumer(self):
|
||||
with patch.object(self.persister.repository, 'process_message',
|
||||
side_effect=FakeException):
|
||||
with mock.patch.object(self.persister.repository, 'process_message',
|
||||
side_effect=FakeException):
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._consumer = ['aa']
|
||||
self.persister.run()
|
||||
self.mock_log_exception.assert_called()
|
||||
|
||||
def test_run_commit_is_called_and_data_points_is_emptied(self):
|
||||
with patch.object(self.persister.repository, 'process_message',
|
||||
return_value=('message', 'tenant_id')):
|
||||
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
|
||||
with mock.patch.object(self.persister.repository, 'process_message',
|
||||
return_value=('message', 'tenant_id')):
|
||||
with mock.patch.object(self.persister, '_consumer',
|
||||
return_value=mock.Mock()) as mock_consumer:
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
|
||||
@ -113,19 +117,21 @@ class TestPersisterRepo(base.BaseTestCase):
|
||||
def test_flush_logs_warning_and_exception(self):
|
||||
exception_msgs = ['partial write: points beyond retention policy dropped',
|
||||
'unable to parse']
|
||||
with(patch.object(cfg.CONF.repositories, 'ignore_parse_point_error',
|
||||
return_value=True)):
|
||||
with(mock.patch.object(cfg.CONF.repositories,
|
||||
'ignore_parse_point_error', return_value=True)):
|
||||
for elem in exception_msgs:
|
||||
with patch.object(LOG, 'info', side_effect=FakeException(elem)):
|
||||
with mock.patch.object(LOG, 'info', side_effect=FakeException(
|
||||
elem)):
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
self.persister._flush()
|
||||
self.mock_log_warning.assert_called()
|
||||
|
||||
@patch.object(LOG, 'info', side_effect=FakeException())
|
||||
@mock.patch.object(LOG, 'info', side_effect=FakeException())
|
||||
def test_flush_logs_exception(self, mock_log_info):
|
||||
with(patch.object(cfg.CONF.repositories,
|
||||
'ignore_parse_point_error', return_value=False)):
|
||||
with(mock.patch.object(cfg.CONF.repositories,
|
||||
'ignore_parse_point_error',
|
||||
return_value=False)):
|
||||
mock_log_info.side_effect.message = 'some msg'
|
||||
self.persister._data_points = data_points.DataPointsAsDict()
|
||||
self.persister._data_points.append('fake_tenant_id', 'some')
|
||||
|
@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from unittest import mock
|
||||
|
||||
from cassandra.policies import RetryPolicy
|
||||
from oslotest import base
|
||||
@ -32,7 +32,10 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
def test_on_read_timeout_when_retry_num_gt_read_attempts(self):
|
||||
self.monasca_retry_policy.read_attempts = 0
|
||||
retry_num = 1
|
||||
rethrow, none = self.monasca_retry_policy.on_read_timeout(Mock(), Mock(), Mock(), 0, 0,
|
||||
rethrow, none = self.monasca_retry_policy.on_read_timeout(mock.Mock(),
|
||||
mock.Mock(),
|
||||
mock.Mock(),
|
||||
0, 0,
|
||||
retry_num)
|
||||
self.assertEqual(rethrow, RetryPolicy.RETHROW)
|
||||
self.assertEqual(none, None)
|
||||
@ -45,7 +48,7 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
retry_num = 0
|
||||
self.monasca_retry_policy.read_attempts = 1
|
||||
returned_retry, returned_consistency = self.monasca_retry_policy. \
|
||||
on_read_timeout(Mock(), consistency, required_responses,
|
||||
on_read_timeout(mock.Mock(), consistency, required_responses,
|
||||
received_responses, data_retrieved, retry_num)
|
||||
self.assertEqual(returned_retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(consistency, returned_consistency)
|
||||
@ -58,7 +61,7 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
retry_num = 0
|
||||
self.monasca_retry_policy.read_attempts = 1
|
||||
returned_rethrow, returned_none = self.monasca_retry_policy. \
|
||||
on_read_timeout(Mock(), consistency, required_reponses,
|
||||
on_read_timeout(mock.Mock(), consistency, required_reponses,
|
||||
received_responses, data_retrieved, retry_num)
|
||||
self.assertEqual(returned_rethrow, RetryPolicy.RETHROW)
|
||||
self.assertEqual(returned_none, None)
|
||||
@ -67,7 +70,8 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
retry_num = 1
|
||||
self.monasca_retry_policy.write_attempts = 0
|
||||
returned_rethrow, returned_none = self.monasca_retry_policy. \
|
||||
on_write_timeout(Mock(), Mock(), Mock(), 0, 0, retry_num)
|
||||
on_write_timeout(mock.Mock(), mock.Mock(), mock.Mock(), 0, 0,
|
||||
retry_num)
|
||||
self.assertEqual(returned_rethrow, RetryPolicy.RETHROW)
|
||||
self.assertEqual(returned_none, None)
|
||||
|
||||
@ -76,7 +80,8 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
consistency = 0
|
||||
self.monasca_retry_policy.write_attempts = 1
|
||||
returned_retry, returned_consistency = self.monasca_retry_policy. \
|
||||
on_write_timeout(Mock(), consistency, Mock(), 0, 0, retry_num)
|
||||
on_write_timeout(mock.Mock(), consistency, mock.Mock(), 0, 0,
|
||||
retry_num)
|
||||
self.assertEqual(returned_retry, RetryPolicy.RETRY)
|
||||
self.assertEqual(returned_consistency, consistency)
|
||||
|
||||
@ -86,7 +91,8 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
self.monasca_retry_policy.unavailable_attempts = 1
|
||||
|
||||
returned_retry_next_host, returned_consistency = \
|
||||
self.monasca_retry_policy.on_unavailable(Mock(), consistency, 0, 0, retry_num)
|
||||
self.monasca_retry_policy.on_unavailable(mock.Mock(),
|
||||
consistency, 0, 0, retry_num)
|
||||
self.assertEqual(returned_consistency, consistency)
|
||||
self.assertEqual(returned_retry_next_host, RetryPolicy.RETRY_NEXT_HOST)
|
||||
|
||||
@ -96,6 +102,7 @@ class TestMonascaRetryPolicy(base.BaseTestCase):
|
||||
self.monasca_retry_policy.unavailable_attempts = 1
|
||||
|
||||
returned_rethrow, returned_none = \
|
||||
self.monasca_retry_policy.on_unavailable(Mock(), consistency, 0, 0, retry_num)
|
||||
self.monasca_retry_policy.on_unavailable(mock.Mock(),
|
||||
consistency, 0, 0, retry_num)
|
||||
self.assertEqual(returned_none, None)
|
||||
self.assertEqual(returned_rethrow, RetryPolicy.RETHROW)
|
||||
|
@ -13,8 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from mock import patch
|
||||
from unittest import mock
|
||||
|
||||
from oslotest import base
|
||||
|
||||
@ -33,11 +32,13 @@ class TestTokenRangeQueryManager(base.BaseTestCase):
|
||||
self._set_patchers()
|
||||
self._set_mocks()
|
||||
|
||||
cql, result_handler = Mock(), Mock()
|
||||
cql, result_handler = mock.Mock, mock.Mock()
|
||||
self.token_range_query_mgr = TokenRangeQueryManager(cql, result_handler, process_count=1)
|
||||
|
||||
def _set_patchers(self):
|
||||
self.patcher_setup = patch.object(TokenRangeQueryManager, '_setup', return_value=None)
|
||||
self.patcher_setup = mock.patch.object(TokenRangeQueryManager,
|
||||
'_setup',
|
||||
return_value=None)
|
||||
|
||||
def _set_mocks(self):
|
||||
self.mock_setup = self.patcher_setup.start()
|
||||
@ -48,12 +49,14 @@ class TestTokenRangeQueryManager(base.BaseTestCase):
|
||||
self.patcher_setup.stop()
|
||||
|
||||
def test_close_pool(self):
|
||||
with patch.object(self.token_range_query_mgr._pool, 'join', side_effect=None):
|
||||
with mock.patch.object(self.token_range_query_mgr._pool, 'join',
|
||||
side_effect=None):
|
||||
self.assertIsNone(self.token_range_query_mgr.close_pool())
|
||||
|
||||
def test_query(self):
|
||||
with patch.object(self.token_range_query_mgr._pool, 'map', side_effect=FakeException):
|
||||
sample_element = Mock()
|
||||
with mock.patch.object(self.token_range_query_mgr._pool, 'map',
|
||||
side_effect=FakeException):
|
||||
sample_element = mock.Mock()
|
||||
sample_element.value = 1
|
||||
token_ring = [sample_element, sample_element]
|
||||
self.assertRaises(FakeException, self.token_range_query_mgr.query, token_ring)
|
||||
|
@ -12,8 +12,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
from unittest import mock
|
||||
|
||||
from oslotest import base
|
||||
import simplejson as json
|
||||
|
||||
from monasca_persister.repositories import utils
|
||||
|
||||
@ -26,7 +28,7 @@ class TestUtils(base.BaseTestCase):
|
||||
super(TestUtils, self).tearDown()
|
||||
|
||||
def test_parse_measurement_message(self):
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
message.value.return_value = """{
|
||||
"metric": {
|
||||
"name": "metric_name",
|
||||
@ -53,7 +55,7 @@ class TestUtils(base.BaseTestCase):
|
||||
self.assertEqual(data[6], {})
|
||||
|
||||
def test_parse_alarm_state_hist_message(self):
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
message.value.return_value = """{
|
||||
"alarm-transitioned": {
|
||||
"alarmId": "dummyid",
|
||||
@ -73,10 +75,10 @@ class TestUtils(base.BaseTestCase):
|
||||
}
|
||||
}
|
||||
}"""
|
||||
output = ['"sub_alarm_expression":"dummy_sub_alarm"',
|
||||
'"current_values":"dummy_values"',
|
||||
'"metric_definition":"dummy_definition"',
|
||||
'"sub_alarm_state":"dummy_state"']
|
||||
output = {'sub_alarm_expression': 'dummy_sub_alarm',
|
||||
'current_values': 'dummy_values',
|
||||
'metric_definition': 'dummy_definition',
|
||||
'sub_alarm_state': 'dummy_state'}
|
||||
data = utils.parse_alarm_state_hist_message(message)
|
||||
self.assertEqual(data[0], 'dummyid')
|
||||
self.assertEqual(data[1], 'dummymetrics')
|
||||
@ -84,14 +86,15 @@ class TestUtils(base.BaseTestCase):
|
||||
self.assertEqual(data[3], 'dummyoldState')
|
||||
self.assertEqual(data[4], 'dummylink')
|
||||
self.assertEqual(data[5], 'dummylifecycleState')
|
||||
self.assertEqual(data[6], "dummystateChangeReason")
|
||||
for elem in output:
|
||||
self.assertIn(elem, data[7])
|
||||
self.assertEqual(data[6], 'dummystateChangeReason')
|
||||
sub_alarms_data = json.loads(data[7])
|
||||
for elemKey, elemValue in output.items():
|
||||
self.assertIn(elemValue, sub_alarms_data[elemKey])
|
||||
self.assertEqual(data[8], 'dummytenantId')
|
||||
self.assertEqual(data[9], 'dummytimestamp')
|
||||
|
||||
def test_parse_events_message(self):
|
||||
message = Mock()
|
||||
message = mock.Mock()
|
||||
message.value.return_value = """{
|
||||
"event": {
|
||||
"event_type": "dummy_event_type",
|
||||
|
@ -7,3 +7,4 @@ oslo.log>=3.36.0 # Apache-2.0
|
||||
six>=1.10.0 # MIT
|
||||
monasca-common>=2.16.0 # Apache-2.0
|
||||
cassandra-driver>=3.11.0
|
||||
simplejson>=3.8.1
|
||||
|
@ -5,7 +5,6 @@ bandit>=1.1.0 # Apache-2.0
|
||||
flake8<2.6.0,>=2.5.4 # MIT
|
||||
hacking>=1.1.0,<1.2.0 # Apache-2.0
|
||||
coverage!=4.4,>=4.0 # Apache-2.0
|
||||
mock>=2.0.0 # BSD
|
||||
oslotest>=3.2.0 # Apache-2.0
|
||||
stestr>=1.0.0 # Apache-2.0
|
||||
docutils>=0.11 # OSI-Approved Open Source, Public Domain
|
||||
|
Loading…
Reference in New Issue
Block a user