
1) Module urllib2 is missed in Python 3, so it should be replaced by other http lib. "requests" is good library for such stuff, which supports both Python 2.* and Python 3.* 2) Function 'rally.common.utils.parse_docstring' uses built-in function 'filter'. This function returns different types in Python 2.* and Python 3.*. Since results are used as a list, 'filter' should be replaced by list comprehension. 3) Extend H330 hacking rule to chech 'dict.iterkeys()', 'dict.itervalues()' and 'dict.iterlist()'. Also, fixed all places, which are failed in terms of this rule. 4) Function object does not have 'func_code' attribute in Python 3, but '__code__' attribute exist in both Python 2 and Python 3 Also, 3 tests are broken due to new release(1.6.0) of oslo.config, so we need to skip them to unblock our gates until new version of oslo.config will be released with appropriate fix. Closes-Bug: #1405919 Change-Id: Icc42e220ac3f15ae6c838a4698b3c9578177513c
452 lines
15 KiB
Python
452 lines
15 KiB
Python
# Copyright 2013: Mirantis Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Test for Rally utils."""
|
|
|
|
from __future__ import print_function
|
|
import string
|
|
import sys
|
|
import time
|
|
|
|
import mock
|
|
|
|
from rally.common.i18n import _
|
|
from rally.common import utils
|
|
from rally import exceptions
|
|
from tests.unit import test
|
|
|
|
|
|
class ImmutableMixinTestCase(test.TestCase):
|
|
|
|
def test_without_base_values(self):
|
|
im = utils.ImmutableMixin()
|
|
self.assertRaises(exceptions.ImmutableException,
|
|
im.__setattr__, 'test', 'test')
|
|
|
|
def test_with_base_values(self):
|
|
|
|
class A(utils.ImmutableMixin):
|
|
def __init__(self, test):
|
|
self.test = test
|
|
super(A, self).__init__()
|
|
|
|
a = A('test')
|
|
self.assertRaises(exceptions.ImmutableException,
|
|
a.__setattr__, 'abc', 'test')
|
|
self.assertEqual(a.test, 'test')
|
|
|
|
|
|
class EnumMixinTestCase(test.TestCase):
|
|
|
|
def test_enum_mix_in(self):
|
|
|
|
class Foo(utils.EnumMixin):
|
|
a = 10
|
|
b = 20
|
|
CC = "2000"
|
|
|
|
self.assertEqual(set(list(Foo())), set([10, 20, "2000"]))
|
|
|
|
|
|
class StdIOCaptureTestCase(test.TestCase):
|
|
|
|
def test_stdout_capture(self):
|
|
stdout = sys.stdout
|
|
messages = ['abcdef', 'defgaga']
|
|
with utils.StdOutCapture() as out:
|
|
for msg in messages:
|
|
print(msg)
|
|
|
|
self.assertEqual(out.getvalue().rstrip('\n').split('\n'), messages)
|
|
self.assertEqual(stdout, sys.stdout)
|
|
|
|
def test_stderr_capture(self):
|
|
stderr = sys.stderr
|
|
messages = ['abcdef', 'defgaga']
|
|
with utils.StdErrCapture() as err:
|
|
for msg in messages:
|
|
print(msg, file=sys.stderr)
|
|
|
|
self.assertEqual(err.getvalue().rstrip('\n').split('\n'), messages)
|
|
self.assertEqual(stderr, sys.stderr)
|
|
|
|
|
|
class TimerTestCase(test.TestCase):
|
|
|
|
def test_timer_duration(self):
|
|
start_time = time.time()
|
|
end_time = time.time()
|
|
|
|
with mock.patch('rally.common.utils.time') as mock_time:
|
|
mock_time.time = mock.MagicMock(return_value=start_time)
|
|
with utils.Timer() as timer:
|
|
mock_time.time = mock.MagicMock(return_value=end_time)
|
|
|
|
self.assertIsNone(timer.error)
|
|
self.assertEqual(end_time - start_time, timer.duration())
|
|
|
|
def test_timer_exception(self):
|
|
try:
|
|
with utils.Timer() as timer:
|
|
raise Exception()
|
|
except Exception:
|
|
pass
|
|
self.assertEqual(3, len(timer.error))
|
|
self.assertEqual(timer.error[0], type(Exception()))
|
|
|
|
|
|
class IterSubclassesTestCase(test.TestCase):
|
|
|
|
def test_itersubclasses(self):
|
|
class A(object):
|
|
pass
|
|
|
|
class B(A):
|
|
pass
|
|
|
|
class C(A):
|
|
pass
|
|
|
|
class D(C):
|
|
pass
|
|
|
|
self.assertEqual([B, C, D], list(utils.itersubclasses(A)))
|
|
|
|
|
|
class ImportModulesTestCase(test.TestCase):
|
|
def test_try_append_module_into_sys_modules(self):
|
|
modules = {}
|
|
utils.try_append_module('rally.common.version', modules)
|
|
self.assertIn('rally.common.version', modules)
|
|
|
|
def test_try_append_broken_module(self):
|
|
modules = {}
|
|
self.assertRaises(ImportError,
|
|
utils.try_append_module,
|
|
'tests.unit.fixtures.import.broken',
|
|
modules)
|
|
|
|
def test_import_modules_from_package(self):
|
|
utils.import_modules_from_package('tests.unit.fixtures.import.package')
|
|
self.assertIn('tests.unit.fixtures.import.package.a', sys.modules)
|
|
self.assertIn('tests.unit.fixtures.import.package.b', sys.modules)
|
|
|
|
|
|
class LogTestCase(test.TestCase):
|
|
|
|
def test_log_task_wrapper(self):
|
|
mock_log = mock.MagicMock()
|
|
msg = "test %(a)s %(b)s"
|
|
|
|
class TaskLog(object):
|
|
|
|
def __init__(self):
|
|
self.task = {'uuid': 'some_uuid'}
|
|
|
|
@utils.log_task_wrapper(mock_log, msg, a=10, b=20)
|
|
def some_method(self, x, y):
|
|
return x + y
|
|
|
|
t = TaskLog()
|
|
self.assertEqual(t.some_method.__name__, "some_method")
|
|
self.assertEqual(t.some_method(2, 2), 4)
|
|
params = {'msg': msg % {'a': 10, 'b': 20}, 'uuid': t.task['uuid']}
|
|
expected = [
|
|
mock.call(_("Task %(uuid)s | Starting: %(msg)s") % params),
|
|
mock.call(_("Task %(uuid)s | Completed: %(msg)s") % params)
|
|
]
|
|
self.assertEqual(mock_log.mock_calls, expected)
|
|
|
|
|
|
class LoadExtraModulesTestCase(test.TestCase):
|
|
|
|
@mock.patch("rally.common.utils.imp.load_module")
|
|
@mock.patch("rally.common.utils.imp.find_module",
|
|
return_value=(mock.MagicMock(), None, None))
|
|
@mock.patch("rally.common.utils.os.walk", return_value=[
|
|
('/somewhere', ('/subdir', ), ('plugin1.py', )),
|
|
('/somewhere/subdir', ('/subsubdir', ), ('plugin2.py',
|
|
'withoutextension')),
|
|
('/somewhere/subdir/subsubdir', [], ('plugin3.py', ))])
|
|
@mock.patch("rally.common.utils.os.path.exists", return_value=True)
|
|
def test_load_plugins_successfull(self, mock_exists,
|
|
mock_oswalk, mock_find_module,
|
|
mock_load_module):
|
|
test_path = "/somewhere"
|
|
utils.load_plugins(test_path)
|
|
expected = [
|
|
mock.call("plugin1", ["/somewhere"]),
|
|
mock.call("plugin2", ["/somewhere/subdir"]),
|
|
mock.call("plugin3", ["/somewhere/subdir/subsubdir"])
|
|
]
|
|
self.assertEqual(mock_find_module.mock_calls, expected)
|
|
self.assertEqual(len(mock_load_module.mock_calls), 3)
|
|
|
|
@mock.patch("rally.common.utils.os")
|
|
def test_load_plugins_from_nonexisting_and_empty_dir(self, mock_os):
|
|
# test no fails for nonexisting directory
|
|
mock_os.path.exists.return_value = False
|
|
utils.load_plugins("/somewhere")
|
|
# test no fails for empty directory
|
|
mock_os.path.exists.return_value = True
|
|
mock_os.walk.return_value = []
|
|
utils.load_plugins("/somewhere")
|
|
|
|
@mock.patch("rally.common.utils.imp.load_module", side_effect=Exception())
|
|
@mock.patch("rally.common.utils.imp.find_module")
|
|
@mock.patch("rally.common.utils.os.path", return_value=True)
|
|
@mock.patch("rally.common.utils.os.walk",
|
|
return_value=[('/etc/.rally/plugins', [], ('load_it.py', ))])
|
|
def test_load_plugins_fails(self, mock_oswalk, mock_ospath,
|
|
mock_load_module, mock_find_module):
|
|
# test no fails if module is broken
|
|
# TODO(olkonami): check exception is handled correct
|
|
utils.load_plugins("/somwhere")
|
|
|
|
|
|
def module_level_method():
|
|
pass
|
|
|
|
|
|
class MethodClassTestCase(test.TestCase):
|
|
|
|
def test_method_class_for_class_level_method(self):
|
|
class A:
|
|
def m(self):
|
|
pass
|
|
self.assertEqual(A, utils.get_method_class(A.m))
|
|
|
|
def test_method_class_for_module_level_method(self):
|
|
self.assertIsNone(utils.get_method_class(module_level_method))
|
|
|
|
|
|
class FirstIndexTestCase(test.TestCase):
|
|
|
|
def test_list_with_existing_matching_element(self):
|
|
lst = [1, 3, 5, 7]
|
|
self.assertEqual(utils.first_index(lst, lambda e: e == 1), 0)
|
|
self.assertEqual(utils.first_index(lst, lambda e: e == 5), 2)
|
|
self.assertEqual(utils.first_index(lst, lambda e: e == 7), 3)
|
|
|
|
def test_list_with_non_existing_matching_element(self):
|
|
lst = [1, 3, 5, 7]
|
|
self.assertIsNone(utils.first_index(lst, lambda e: e == 2))
|
|
|
|
|
|
class DocstringTestCase(test.TestCase):
|
|
|
|
def test_parse_complete_docstring(self):
|
|
docstring = """One-line description.
|
|
|
|
Multi-
|
|
line-
|
|
description.
|
|
|
|
:param p1: Param 1 description.
|
|
:param p2: Param 2
|
|
description.
|
|
:returns: Return value
|
|
description.
|
|
"""
|
|
|
|
dct = utils.parse_docstring(docstring)
|
|
expected = {
|
|
"short_description": "One-line description.",
|
|
"long_description": "Multi-\nline-\ndescription.",
|
|
"params": [{"name": "p1", "doc": "Param 1 description."},
|
|
{"name": "p2", "doc": "Param 2 description."}],
|
|
"returns": "Return value description."
|
|
}
|
|
self.assertEqual(dct, expected)
|
|
|
|
def test_parse_incomplete_docstring(self):
|
|
docstring = """One-line description.
|
|
|
|
:param p1: Param 1 description.
|
|
:param p2: Param 2
|
|
description.
|
|
"""
|
|
|
|
dct = utils.parse_docstring(docstring)
|
|
expected = {
|
|
"short_description": "One-line description.",
|
|
"long_description": None,
|
|
"params": [{"name": "p1", "doc": "Param 1 description."},
|
|
{"name": "p2", "doc": "Param 2 description."}],
|
|
"returns": None
|
|
}
|
|
self.assertEqual(dct, expected)
|
|
|
|
def test_parse_docstring_with_no_params(self):
|
|
docstring = """One-line description.
|
|
|
|
Multi-
|
|
line-
|
|
description.
|
|
|
|
:returns: Return value
|
|
description.
|
|
"""
|
|
|
|
dct = utils.parse_docstring(docstring)
|
|
expected = {
|
|
"short_description": "One-line description.",
|
|
"long_description": "Multi-\nline-\ndescription.",
|
|
"params": [],
|
|
"returns": "Return value description."
|
|
}
|
|
self.assertEqual(dct, expected)
|
|
|
|
|
|
class EditDistanceTestCase(test.TestCase):
|
|
|
|
def test_distance_empty_strings(self):
|
|
dist = utils.distance("", "")
|
|
self.assertEqual(0, dist)
|
|
|
|
def test_distance_equal_strings(self):
|
|
dist = utils.distance("abcde", "abcde")
|
|
self.assertEqual(0, dist)
|
|
|
|
def test_distance_replacement(self):
|
|
dist = utils.distance("abcde", "__cde")
|
|
self.assertEqual(2, dist)
|
|
|
|
def test_distance_insertion(self):
|
|
dist = utils.distance("abcde", "ab__cde")
|
|
self.assertEqual(2, dist)
|
|
|
|
def test_distance_deletion(self):
|
|
dist = utils.distance("abcde", "abc")
|
|
self.assertEqual(2, dist)
|
|
|
|
|
|
class TenantIteratorTestCase(test.TestCase):
|
|
|
|
def test_iterate_per_tenant(self):
|
|
users = list()
|
|
tenants_count = 2
|
|
users_per_tenant = 5
|
|
for tenant_id in range(tenants_count):
|
|
for user_id in range(users_per_tenant):
|
|
users.append({"id": str(user_id),
|
|
"tenant_id": str(tenant_id)})
|
|
|
|
expected_result = [
|
|
({"id": "0", "tenant_id": str(i)}, str(i)) for i in range(
|
|
tenants_count)]
|
|
real_result = [i for i in utils.iterate_per_tenants(users)]
|
|
|
|
self.assertEqual(expected_result, real_result)
|
|
|
|
|
|
class RAMIntTestCase(test.TestCase):
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test__init__(self, mock_multi):
|
|
utils.RAMInt()
|
|
mock_multi.Lock.assert_called_once_with()
|
|
mock_multi.Value.assert_called_once_with("I", 0)
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test__int__(self, mock_multi):
|
|
mock_multi.Value.return_value = mock.Mock(value=42)
|
|
self.assertEqual(int(utils.RAMInt()), 42)
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test__str__(self, mock_multi):
|
|
mock_multi.Value.return_value = mock.Mock(value=42)
|
|
self.assertEqual(str(utils.RAMInt()), "42")
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test__iter__(self, mock_multi):
|
|
ram_int = utils.RAMInt()
|
|
self.assertEqual(iter(ram_int), ram_int)
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test__next__(self, mock_multi):
|
|
class MemInt(int):
|
|
THRESHOLD = 5
|
|
|
|
def __iadd__(self, i):
|
|
return MemInt((int(self) + i) % self.THRESHOLD)
|
|
|
|
mock_lock = mock.MagicMock()
|
|
mock_multi.Lock.return_value = mock_lock
|
|
mock_multi.Value.return_value = mock.Mock(value=MemInt(0))
|
|
|
|
ram_int = utils.RAMInt()
|
|
self.assertEqual(int(ram_int), 0)
|
|
for i in range(MemInt.THRESHOLD - 1):
|
|
self.assertEqual(ram_int.__next__(), i)
|
|
self.assertRaises(StopIteration, ram_int.__next__)
|
|
self.assertEqual(mock_lock.__enter__.mock_calls,
|
|
[mock.call()] * MemInt.THRESHOLD)
|
|
self.assertEqual(len(mock_lock.__exit__.mock_calls), MemInt.THRESHOLD)
|
|
|
|
@mock.patch("rally.common.utils.RAMInt.__next__",
|
|
return_value="next_value")
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test_next(self, mock_multi, mock_next):
|
|
self.assertEqual(next(utils.RAMInt()), "next_value")
|
|
mock_next.assert_called_once_with()
|
|
|
|
@mock.patch("rally.common.utils.multiprocessing")
|
|
def test_reset(self, mock_multi):
|
|
ram_int = utils.RAMInt()
|
|
self.assertRaises(TypeError, int, ram_int)
|
|
ram_int.reset()
|
|
self.assertEqual(int(ram_int), 0)
|
|
|
|
|
|
class GenerateRandomTestCase(test.TestCase):
|
|
|
|
@mock.patch("rally.common.utils.random")
|
|
def test_generate_random_name(self, mock_random):
|
|
choice = "foobarspamchoicestring"
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(utils.generate_random_name(),
|
|
string.ascii_lowercase[:16])
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(utils.generate_random_name(length=10),
|
|
string.ascii_lowercase[:10])
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(utils.generate_random_name(choice=choice),
|
|
choice[:16])
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(utils.generate_random_name(choice=choice, length=5),
|
|
choice[:5])
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(
|
|
utils.generate_random_name(prefix="foo_", length=10),
|
|
"foo_" + string.ascii_lowercase[:10])
|
|
|
|
idx = iter(range(100))
|
|
mock_random.choice.side_effect = lambda choice: choice[next(idx)]
|
|
self.assertEqual(
|
|
utils.generate_random_name(prefix="foo_",
|
|
choice=choice, length=10),
|
|
"foo_" + choice[:10])
|