mistral-lib/mistral_lib/tests/test_utils.py

341 lines
9.8 KiB
Python

# Copyright 2013 - Mirantis, Inc.
# Copyright 2017 - Nokia Networks.
# Copyright 2017 - Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from yaql.language import utils as yaql_utils
from mistral_lib.tests import base as tests_base
from mistral_lib import utils
import testtools.matchers as ttm
LEFT = {
'key1': {
'key11': "val11"
},
'key2': 'val2'
}
RIGHT = {
'key1': {
'key11': "val111111",
'key12': "val12",
'key13': {
'key131': 'val131'
}
},
'key2': 'val2222',
'key3': 'val3'
}
class TestUtils(tests_base.TestCase):
def test_merge_dicts(self):
left = copy.deepcopy(LEFT)
right = copy.deepcopy(RIGHT)
expected = {
'key1': {
'key11': "val111111",
'key12': "val12",
'key13': {
'key131': 'val131'
}
},
'key2': 'val2222',
'key3': 'val3'
}
utils.merge_dicts(left, right)
self.assertDictEqual(left, expected)
def test_merge_dicts_overwrite_false(self):
left = copy.deepcopy(LEFT)
right = copy.deepcopy(RIGHT)
expected = {
'key1': {
'key11': "val11",
'key12': "val12",
'key13': {
'key131': 'val131'
}
},
'key2': 'val2',
'key3': 'val3'
}
utils.merge_dicts(left, right, overwrite=False)
self.assertDictEqual(left, expected)
def test_itersubclasses(self):
class A(object):
pass
class B(A):
pass
class C(A):
pass
class D(C):
pass
self.assertEqual([B, C, D], list(utils.iter_subclasses(A)))
def test_get_dict_from_entries(self):
input = ['param1', {'param2': 2}]
input_dict = utils.get_dict_from_entries(input)
self.assertIn('param1', input_dict)
self.assertIn('param2', input_dict)
self.assertEqual(2, input_dict.get('param2'))
self.assertIs(input_dict.get('param1'), utils.NotDefined)
def test_get_input_dict_from_string(self):
self.assertDictEqual(
{
'param1': utils.NotDefined,
'param2': 2,
'param3': 'var3'
},
utils.get_dict_from_string('param1, param2=2, param3="var3"')
)
self.assertDictEqual({}, utils.get_dict_from_string(''))
def test_cut_string(self):
s = 'Hello, Mistral!'
self.assertEqual('Hello...', utils.cut_string(s, length=5))
self.assertEqual(s, utils.cut_string(s, length=100))
self.assertEqual(s, utils.cut_string(s, length=-1))
def test_cut_list(self):
list_ = ['Hello, Mistral!', 'Hello, OpenStack!']
self.assertEqual("['Hello, M...", utils.cut_list(list_, 13))
self.assertEqual("['Hello, Mistr...", utils.cut_list(list_, 17))
self.assertEqual(
"['Hello, Mistral!', 'He...",
utils.cut_list(list_, 26))
self.assertEqual(
"['Hello, Mistral!', 'Hello, OpenStack!']",
utils.cut_list(list_, 100)
)
self.assertEqual(
"['Hello, Mistral!', 'Hello, OpenStack!']",
utils.cut_list(list_, -1)
)
self.assertEqual("[1, 2...", utils.cut_list([1, 2, 3, 4, 5], 8))
self.assertEqual("[1, 2,...", utils.cut_list([1, 2, 3, 4, 5], 9))
self.assertEqual("[1, 2, 3...", utils.cut_list([1, 2, 3, 4, 5], 11))
self.assertRaises(ValueError, utils.cut_list, (1, 2))
def test_cut_list_with_large_dict_of_str(self):
d = [str(i) for i in range(65535)]
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_list_with_large_dict_of_int(self):
d = [i for i in range(65535)]
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_list_with_large_dict_of_dict(self):
d = [{'value': str(i)} for i in range(65535)]
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_list_for_state_info(self):
d = [{'value': 'This is a string that exceeds 35 characters'}
for i in range(2000)]
s = utils.cut(d, 65500)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65500)))
def test_cut_dict_with_strings(self):
d = {'key1': 'value1', 'key2': 'value2'}
s = utils.cut_dict(d, 13)
self.assertIn(s, ["{'key1': '...", "{'key2': '..."])
s = utils.cut_dict(d, 15)
self.assertIn(s, ["{'key1': 'va...", "{'key2': 'va..."])
s = utils.cut_dict(d, 22)
self.assertIn(
s,
["{'key1': 'value1', ...", "{'key2': 'value2', ..."]
)
self.assertIn(
utils.cut_dict(d, 100),
[
"{'key1': 'value1', 'key2': 'value2'}",
"{'key2': 'value2', 'key1': 'value1'}"
]
)
self.assertIn(
utils.cut_dict(d, -1),
[
"{'key1': 'value1', 'key2': 'value2'}",
"{'key2': 'value2', 'key1': 'value1'}"
]
)
self.assertRaises(ValueError, utils.cut_dict, (1, 2))
def test_cut_dict_with_digits(self):
d = {1: 2, 3: 4}
s = utils.cut_dict(d, 10)
self.assertIn(s, ["{1: 2, ...", "{3: 4, ..."])
s = utils.cut_dict(d, 11)
self.assertIn(s, ["{1: 2, 3...", "{3: 4, 1..."])
s = utils.cut_dict(d, 100)
self.assertIn(s, ["{1: 2, 3: 4}", "{3: 4, 1: 2}"])
def test_cut_dict_with_large_dict_of_str(self):
d = {}
for i in range(65535):
d[str(i)] = str(i)
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_dict_with_large_dict_of_int(self):
d = {}
for i in range(65535):
d[i] = i
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_dict_with_large_dict_of_dict(self):
d = {}
for i in range(65535):
d[i] = {'value': str(i)}
s = utils.cut(d, 65535)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65535)))
def test_cut_dict_for_state_info(self):
d = {}
for i in range(2000):
d[i] = {'value': 'This is a string that exceeds 35 characters'}
s = utils.cut(d, 65500)
self.assertThat(len(s), ttm.Not(ttm.GreaterThan(65500)))
def test_mask_data(self):
payload = {'adminPass': 'fooBarBaz'}
expected = {'adminPass': '***'}
self.assertEqual(expected, utils.mask_data(payload))
payload = """adminPass='fooBarBaz'"""
expected = """adminPass='***'"""
self.assertEqual(expected, utils.mask_data(payload))
payload = [{'adminPass': 'fooBarBaz'}, {"new_pass": "blah"}]
expected = [{'adminPass': '***'}, {"new_pass": "***"}]
self.assertEqual(expected, utils.mask_data(payload))
payload = ["adminPass", 'fooBarBaz']
expected = ["adminPass", 'fooBarBaz']
self.assertEqual(expected, utils.mask_data(payload))
def test_json_serialize_frozen_dict(self):
data = yaql_utils.FrozenDict(a=1, b=2, c=iter([1, 2, 3]))
json_str = utils.to_json_str(data)
self.assertIsNotNone(json_str)
self.assertIn('"a": 1', json_str)
self.assertIn('"b": 2', json_str)
self.assertIn('"c": [1, 2, 3]', json_str)
def test_json_serialize_generator(self):
def _list_stream(_list):
for i in _list:
yield i
gen = _list_stream(
[1, yaql_utils.FrozenDict(a=1), _list_stream([12, 15])]
)
self.assertEqual('[1, {"a": 1}, [12, 15]]', utils.to_json_str(gen))
def test_json_serialize_dict_of_generators(self):
def _f(cnt):
for i in range(1, cnt + 1):
yield i
data = {'numbers': _f(3)}
self.assertEqual('{"numbers": [1, 2, 3]}', utils.to_json_str(data))
def test_json_serialize_range(self):
self.assertEqual("[1, 2, 3, 4]", utils.to_json_str(range(1, 5)))
def test_json_serialize_iterator_of_frozen_dicts(self):
data = iter(
[
yaql_utils.FrozenDict(a=1, b=2, c=iter([1, 2, 3])),
yaql_utils.FrozenDict(
a=11,
b=yaql_utils.FrozenDict(b='222'),
c=iter(
[
1,
yaql_utils.FrozenDict(
a=iter([4, yaql_utils.FrozenDict(a=99)])
)
]
)
)
]
)
json_str = utils.to_json_str(data)
self.assertIsNotNone(json_str)
# Checking the first item.
self.assertIn('"a": 1', json_str)
self.assertIn('"b": 2', json_str)
self.assertIn('"c": [1, 2, 3]', json_str)
# Checking the first item.
self.assertIn('"a": 11', json_str)
self.assertIn('"b": {"b": "222"}', json_str)
self.assertIn('"c": [1, {"a": [4, {"a": 99}]}]', json_str)