Merge "Remove unused code in utils and improved utils testing"

This commit is contained in:
Zuul 2019-07-02 04:25:51 +00:00 committed by Gerrit Code Review
commit 973cafd79b
5 changed files with 495 additions and 292 deletions

View File

@ -1,194 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import functools
import tempfile
import six
import testtools
from mock import Mock
from jinja2 import Template
from designate.tests import TestCase
from designate import exceptions
from designate import utils
class TestUtils(TestCase):
def test_resource_string(self):
name = ['templates', 'bind9-zone.jinja2']
resource_string = utils.resource_string(*name)
self.assertIsNotNone(resource_string)
def test_resource_string_missing(self):
name = 'invalid.jinja2'
with testtools.ExpectedException(exceptions.ResourceNotFound):
utils.resource_string(name)
def test_resource_string_empty_args(self):
with testtools.ExpectedException(ValueError):
utils.resource_string()
def test_load_schema_missing(self):
with testtools.ExpectedException(exceptions.ResourceNotFound):
utils.load_schema('v1', 'missing')
def test_load_template(self):
name = 'bind9-zone.jinja2'
template = utils.load_template(name)
self.assertIsInstance(template, Template)
def test_load_template_keep_trailing_newline(self):
name = 'bind9-zone.jinja2'
template = utils.load_template(name)
self.assertTrue(template.environment.keep_trailing_newline)
def test_load_template_missing(self):
name = 'invalid.jinja2'
with testtools.ExpectedException(exceptions.ResourceNotFound):
utils.load_template(name)
def test_render_template(self):
template = Template("Hello {{name}}")
result = utils.render_template(template, name="World")
self.assertEqual('Hello World', result)
def test_render_template_to_file(self):
output_path = tempfile.mktemp()
template = Template("Hello {{name}}")
utils.render_template_to_file(template, output_path=output_path,
name="World")
self.assertTrue(os.path.exists(output_path))
try:
with open(output_path, 'r') as fh:
self.assertEqual('Hello World', fh.read())
finally:
os.unlink(output_path)
def test_increment_serial(self):
ret_serial = utils.increment_serial(serial=20)
self.assertGreater(ret_serial, 20)
def test_is_uuid_like(self):
uuid_str = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
self.assertTrue(utils.is_uuid_like(uuid_str))
uuid_str = '678'
self.assertFalse(utils.is_uuid_like(uuid_str))
def test_split_host_port(self):
host_port = "abc:abc"
host, port = utils.split_host_port(host_port)
self.assertEqual((host, port), ("abc:abc", 53))
host_port = "abc:25"
host, port = utils.split_host_port(host_port)
self.assertEqual((host, port), ("abc", 25))
def test_get_paging_params_invalid_limit(self):
context = Mock()
for value in [9223372036854775809, -1]:
with testtools.ExpectedException(exceptions.InvalidLimit):
utils.get_paging_params(context, {'limit': value}, [])
def test_get_paging_params_max_limit(self):
context = Mock()
self.config(max_limit_v2=1000, group='service:api')
result = utils.get_paging_params(context, {'limit': "max"}, [])
self.assertEqual(result[1], 1000)
def test_get_paging_params_invalid_sort_dir(self):
context = Mock()
with testtools.ExpectedException(exceptions.InvalidSortDir):
utils.get_paging_params(context, {'sort_dir': "dsc"}, [])
def test_get_paging_params_invalid_sort_key(self):
context = Mock()
with testtools.ExpectedException(exceptions.InvalidSortKey):
utils.get_paging_params(context, {'sort_key': "dsc"},
['asc', 'desc'])
def def_method(f, *args, **kwargs):
@functools.wraps(f)
def new_method(self):
return f(self, *args, **kwargs)
return new_method
def parameterized_class(cls):
"""A class decorator for running parameterized test cases.
Mark your class with @parameterized_class.
Mark your test cases with @parameterized.
"""
test_functions = {
k: v for k, v in vars(cls).items() if k.startswith('test')
}
for name, f in test_functions.items():
if not hasattr(f, '_test_data'):
continue
# remove the original test function from the class
delattr(cls, name)
# add a new test function to the class for each entry in f._test_data
for tag, args in f._test_data.items():
new_name = "{0}_{1}".format(f.__name__, tag)
if hasattr(cls, new_name):
raise Exception(
"Parameterized test case '{0}.{1}' created from '{0}.{2}' "
"already exists".format(cls.__name__, new_name, name))
# Using `def new_method(self): f(self, **args)` is not sufficient
# (all new_methods use the same args value due to late binding).
# Instead, use this factory function.
new_method = def_method(f, **args)
# To add a method to a class, available for all instances:
# MyClass.method = types.MethodType(f, None, MyClass)
setattr(cls, new_name, six.create_unbound_method(new_method, cls))
return cls
def parameterized(data):
"""A function decorator for parameterized test cases.
Example:
@parameterized({
'zero': dict(val=0),
'one': dict(val=1),
})
def test_val(self, val):
self.assertEqual(self.get_val(), val)
The above will generate two test cases:
`test_val_zero` which runs with val=0
`test_val_one` which runs with val=1
:param data: A dictionary that looks like {tag: {arg1: val1, ...}}
"""
def wrapped(f):
f._test_data = data
return f
return wrapped

View File

@ -11,19 +11,407 @@
# under the License.
import random
import jinja2
import mock
import oslotest.base
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
from oslo_utils import timeutils
from designate import exceptions
from designate import utils
from designate.tests import fixtures
CONF = cfg.CONF
class TestSocket(oslotest.base.BaseTestCase):
class TestUtils(oslotest.base.BaseTestCase):
def setUp(self):
super(TestSocket, self).setUp()
super(TestUtils, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(cfg_fixture.Config(CONF))
self.useFixture(self.stdlog)
@mock.patch('os.path.exists')
@mock.patch('os.path.abspath')
def test_find_config(self, mock_abspath, mock_path_exists):
CONF.set_override('pybasedir', '/tmp/workspace/designate')
mock_path_exists.side_effect = [True, False, False, False, False]
mock_abspath.return_value = '/tmp/designate/designate.conf'
config_files = utils.find_config('designate.conf')
self.assertEqual(['/tmp/designate/designate.conf'], config_files)
mock_abspath.assert_called_once()
@mock.patch.object(processutils, 'execute')
def test_execute(self, mock_execute):
mock_execute.return_value = ('designate.conf\npools.yaml\n', '')
out, err = utils.execute(
'/bin/ls', '/etc/designate/',
run_as_root=False
)
mock_execute.assert_called_once_with(
'/bin/ls', '/etc/designate/',
root_helper='sudo designate-rootwrap /etc/designate/rootwrap.conf',
run_as_root=False
)
self.assertEqual('designate.conf\npools.yaml\n', out)
self.assertFalse(err)
@mock.patch.object(processutils, 'execute')
def test_execute_with_rootwrap(self, mock_execute):
CONF.set_override('root_helper', 'sudo designate-test')
mock_execute.return_value = ('designate.conf\npools.yaml\n', '')
out, err = utils.execute(
'/bin/ls', '/etc/designate/',
run_as_root=True
)
mock_execute.assert_called_once_with(
'/bin/ls', '/etc/designate/',
root_helper='sudo designate-test',
run_as_root=True
)
self.assertEqual('designate.conf\npools.yaml\n', out)
self.assertFalse(err)
def test_deep_dict_merge(self):
a = {
'a': {'dns': 'record'},
'b': 'b',
'c': 'c',
}
b = {
'a': {'domain': 'zone'},
'c': 1,
'd': 'd',
}
self.assertEqual(
{
'a': {
'dns': 'record', 'domain': 'zone'
},
'b': 'b', 'c': 1, 'd': 'd'
},
utils.deep_dict_merge(a, b)
)
def test_deep_dict_merge_not_dict(self):
result = utils.deep_dict_merge(dict(), list())
self.assertIsInstance(result, list)
def test_get_proxies(self):
CONF.set_override('no_proxy', 'example.com', 'proxy')
CONF.set_override('http_proxy', 'example.org', 'proxy')
CONF.set_override('https_proxy', 'example.net', 'proxy')
result = utils.get_proxies()
self.assertEqual(['example.com'], result.get('no_proxy'))
self.assertEqual('example.org', result.get('http'))
self.assertEqual('example.net', result.get('https'))
def test_get_proxies_default_values(self):
result = utils.get_proxies()
self.assertIsNone(result.get('no_proxy'))
self.assertIsNone(result.get('http'))
self.assertIsNone(result.get('https'))
def test_get_proxies_with_no_proxy(self):
CONF.set_override('no_proxy', 'example.org', 'proxy')
result = utils.get_proxies()
self.assertEqual(['example.org'], result.get('no_proxy'))
self.assertIsNone(result.get('http'))
self.assertIsNone(result.get('https'))
def test_get_proxies_with_http_proxy(self):
CONF.set_override('http_proxy', 'example.org', 'proxy')
result = utils.get_proxies()
self.assertIsNone(result.get('no_proxy'))
self.assertEqual('example.org', result.get('http'))
self.assertEqual('example.org', result.get('https'))
def test_get_proxies_with_https_proxy(self):
CONF.set_override('https_proxy', 'example.org', 'proxy')
result = utils.get_proxies()
self.assertIsNone(result.get('no_proxy'))
self.assertIsNone(result.get('http'))
self.assertEqual('example.org', result.get('https'))
def test_resource_string(self):
resource_string = utils.resource_string(
'templates', 'bind9-zone.jinja2'
)
self.assertIsNotNone(resource_string)
def test_resource_string_missing(self):
self.assertRaisesRegex(
exceptions.ResourceNotFound,
'Could not find the requested resource',
utils.resource_string, 'invalid.jinja2'
)
def test_resource_string_empty_args(self):
self.assertRaises(
ValueError,
utils.resource_string
)
def test_load_schema_missing(self):
self.assertRaisesRegex(
exceptions.ResourceNotFound,
'Could not find the requested resource',
utils.load_schema, 'v1', 'missing'
)
@mock.patch.object(utils, 'resource_string')
def test_load_template(self, mock_resource_string):
mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8')
template = utils.load_template('bind9-zone.jinja2')
self.assertIsInstance(template, jinja2.Template)
@mock.patch.object(utils, 'resource_string')
def test_load_template_keep_trailing_newline(self, mock_resource_string):
mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8')
template = utils.load_template('bind9-zone.jinja2')
self.assertTrue(template.environment.keep_trailing_newline)
def test_load_template_missing(self):
self.assertRaises(
exceptions.ResourceNotFound,
utils.load_template, 'invalid.jinja2'
)
def test_render_template(self):
template = jinja2.Template('Hello {{name}}')
result = utils.render_template(template, name='World')
self.assertEqual('Hello World', result)
@mock.patch('six.moves.builtins.open', new_callable=mock.mock_open)
@mock.patch('os.path.exists')
def test_render_template_to_file(self, mock_exists, mock_open):
mock_exists.return_value = True
output_path = '/tmp/designate/resources/templates/hello.jinja2'
template = jinja2.Template('Hello {{name}}')
utils.render_template_to_file(
template, makedirs=False, output_path=output_path, name='World'
)
mock_open.assert_called_once_with(output_path, 'w')
mock_open().write.assert_called_once_with('Hello World')
@mock.patch('six.moves.builtins.open', new_callable=mock.mock_open)
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
def test_render_template_to_file_with_makedirs(self, mock_makedirs,
mock_exists,
mock_open):
mock_exists.return_value = False
output_path = '/tmp/designate/resources/templates/hello.jinja2'
template = jinja2.Template('Hello {{name}}')
utils.render_template_to_file(
template, makedirs=True, output_path=output_path, name='World'
)
mock_makedirs.assert_called_once_with(
'/tmp/designate/resources/templates'
)
mock_open.assert_called_once_with(output_path, 'w')
mock_open().write.assert_called_once_with('Hello World')
@mock.patch.object(timeutils, 'utcnow_ts')
def test_increment_serial_lower_than_ts(self, mock_utcnow_ts):
mock_utcnow_ts.return_value = 1561698354
ret_serial = utils.increment_serial(serial=1)
self.assertEqual(1561698354, ret_serial)
@mock.patch.object(timeutils, 'utcnow_ts')
def test_increment_serial_higher_than_ts(self, mock_utcnow_ts):
mock_utcnow_ts.return_value = 1561698354
ret_serial = utils.increment_serial(serial=1561698354 * 2)
self.assertEqual(1561698354 * 2 + 1, ret_serial)
def test_is_uuid_like(self):
self.assertTrue(
utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64')
)
def test_is_not_uuid_like(self):
self.assertFalse(utils.is_uuid_like('678'))
def test_split_host_port(self):
host, port = utils.split_host_port('abc:25')
self.assertEqual(('abc', 25), (host, port))
def test_split_host_port_with_invalid_port(self):
host, port = utils.split_host_port('abc:abc')
self.assertEqual(('abc:abc', 53), (host, port))
def test_get_paging_params(self):
CONF.set_override('default_limit_v2', 100, 'service:api')
context = mock.Mock()
params = {
'updated_at': None,
'created_at': '2019-06-28T04:17:34.000000',
'pattern': 'blacklisted.com.',
'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
}
marker, limit, sort_key, sort_dir = utils.get_paging_params(
context, params, ['created_at', 'id', 'updated_at', 'pattern']
)
self.assertIsNone(marker)
self.assertEqual(100, limit)
self.assertIsNone(sort_key)
self.assertIsNone(sort_dir)
def test_get_paging_params_without_sort_keys(self):
CONF.set_override('default_limit_v2', 0, 'service:api')
context = mock.Mock()
params = {
'updated_at': None,
'created_at': '2019-06-28T04:17:34.000000',
'pattern': 'blacklisted.com.',
'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
}
marker, limit, sort_key, sort_dir = utils.get_paging_params(
context, params, sort_keys=None
)
self.assertIsNone(marker)
self.assertEqual(0, limit)
self.assertIsNone(sort_key)
self.assertIsNone(sort_dir)
def test_get_paging_params_sort_by_tenant_id(self):
CONF.set_override('default_limit_v2', 100, 'service:api')
context = mock.Mock()
context.all_tenants = True
params = {
'updated_at': None,
'created_at': '2019-06-28T04:17:34.000000',
'pattern': 'blacklisted.com.',
'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
'sort_key': 'tenant_id',
}
marker, limit, sort_key, sort_dir = utils.get_paging_params(
context, params,
['created_at', 'id', 'updated_at', 'pattern', 'tenant_id']
)
self.assertIsNone(marker)
self.assertEqual(100, limit)
self.assertEqual('tenant_id', sort_key)
self.assertIsNone(sort_dir)
def test_get_paging_params_sort_tenant_without_all_tenants(self):
CONF.set_override('default_limit_v2', 100, 'service:api')
context = mock.Mock()
context.all_tenants = False
params = {
'updated_at': None,
'created_at': '2019-06-28T04:17:34.000000',
'pattern': 'blacklisted.com.',
'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea',
'sort_key': 'tenant_id',
}
marker, limit, sort_key, sort_dir = utils.get_paging_params(
context, params,
['created_at', 'id', 'updated_at', 'pattern', 'tenant_id']
)
self.assertIsNone(marker)
self.assertEqual(100, limit)
self.assertIsNone(sort_key)
self.assertIsNone(sort_dir)
def test_get_paging_params_invalid_limit(self):
context = mock.Mock()
self.assertRaises(
exceptions.InvalidLimit,
utils.get_paging_params,
context, {'limit': 9223372036854775809}, []
)
self.assertRaises(
exceptions.InvalidLimit,
utils.get_paging_params,
context, {'limit': -1}, []
)
def test_get_paging_params_max_limit(self):
CONF.set_override('max_limit_v2', 1000, 'service:api')
context = mock.Mock()
result = utils.get_paging_params(context, {'limit': 'max'}, [])
self.assertEqual(result[1], 1000)
def test_get_paging_params_invalid_sort_dir(self):
context = mock.Mock()
self.assertRaisesRegex(
exceptions.InvalidSortDir,
'Unknown sort direction, must be',
utils.get_paging_params, context, {'sort_dir': 'dsc'}, []
)
def test_get_paging_params_invalid_sort_key(self):
context = mock.Mock()
self.assertRaisesRegex(
exceptions.InvalidSortKey,
'sort key must be one of',
utils.get_paging_params, context, {'sort_key': 'dsc'},
['asc', 'desc']
)
@mock.patch('socket.socket')
def test_bind_tcp(self, mock_sock_impl):
mock_sock = mock.MagicMock()
@ -57,6 +445,19 @@ class TestSocket(oslotest.base.BaseTestCase):
self.stdlog.logger.output
)
@mock.patch('socket.socket')
def test_bind_tcp_without_reuse_port(self, mock_sock_impl):
mock_sock = mock.MagicMock()
mock_sock_impl.return_value = mock_sock
mock_sock.setsockopt.side_effect = [None, None, AttributeError, None]
utils.bind_tcp('127.0.0.1', 53, 100, 1)
self.assertIn(
'SO_REUSEPORT not available, ignoring.',
self.stdlog.logger.output
)
@mock.patch('socket.socket')
def test_bind_udp(self, mock_sock_impl):
mock_sock = mock.MagicMock()
@ -87,3 +488,16 @@ class TestSocket(oslotest.base.BaseTestCase):
'Listening on UDP port %(port)d' % {'port': random_port},
self.stdlog.logger.output
)
@mock.patch('socket.socket')
def test_bind_udp_without_reuse_port(self, mock_sock_impl):
mock_sock = mock.MagicMock()
mock_sock_impl.return_value = mock_sock
mock_sock.setsockopt.side_effect = [None, AttributeError]
utils.bind_udp('127.0.0.1', 53)
self.assertIn(
'SO_REUSEPORT not available, ignoring.',
self.stdlog.logger.output
)

View File

@ -0,0 +1,78 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import six
def def_method(f, *args, **kwargs):
@functools.wraps(f)
def new_method(self):
return f(self, *args, **kwargs)
return new_method
def parameterized_class(cls):
"""A class decorator for running parameterized test cases.
Mark your class with @parameterized_class.
Mark your test cases with @parameterized.
"""
test_functions = {
k: v for k, v in vars(cls).items() if k.startswith('test')
}
for name, f in test_functions.items():
if not hasattr(f, '_test_data'):
continue
# remove the original test function from the class
delattr(cls, name)
# add a new test function to the class for each entry in f._test_data
for tag, args in f._test_data.items():
new_name = "{0}_{1}".format(f.__name__, tag)
if hasattr(cls, new_name):
raise Exception(
"Parameterized test case '{0}.{1}' created from '{0}.{2}' "
"already exists".format(cls.__name__, new_name, name))
# Using `def new_method(self): f(self, **args)` is not sufficient
# (all new_methods use the same args value due to late binding).
# Instead, use this factory function.
new_method = def_method(f, **args)
# To add a method to a class, available for all instances:
# MyClass.method = types.MethodType(f, None, MyClass)
setattr(cls, new_name, six.create_unbound_method(new_method, cls))
return cls
def parameterized(data):
"""A function decorator for parameterized test cases.
Example:
@parameterized({
'zero': dict(val=0),
'one': dict(val=1),
})
def test_val(self, val):
self.assertEqual(self.get_val(), val)
The above will generate two test cases:
`test_val_zero` which runs with val=0
`test_val_one` which runs with val=1
:param data: A dictionary that looks like {tag: {arg1: val1, ...}}
"""
def wrapped(f):
f._test_data = data
return f
return wrapped

View File

@ -19,9 +19,9 @@ import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
import designate.tests.test_utils as utils
from designate import exceptions
from designate import objects
from designate.tests.unit import utils
from designate.worker import processing
from designate.worker import utils as wutils
from designate.worker.tasks import zone

View File

@ -140,55 +140,6 @@ def execute(*cmd, **kw):
root_helper=root_helper, **kw)
def get_item_properties(item, fields, mixed_case_fields=None, formatters=None):
"""Return a tuple containing the item properties.
:param item: a single item resource (e.g. Server, Tenant, etc)
:param fields: tuple of strings with the desired field names
:param mixed_case_fields: tuple of field names to preserve case
:param formatters: dictionary mapping field names to callables
to format the values
"""
row = []
mixed_case_fields = mixed_case_fields or []
formatters = formatters or {}
for field in fields:
if field in formatters:
row.append(formatters[field](item))
else:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
else:
field_name = field.lower().replace(' ', '_')
if not hasattr(item, field_name) and \
(isinstance(item, dict) and field_name in item):
data = item[field_name]
else:
data = getattr(item, field_name, '')
if data is None:
data = ''
row.append(data)
return tuple(row)
def get_columns(data):
"""
Some row's might have variable count of columns, ensure that we have the
same.
:param data: Results in [{}, {]}]
"""
columns = set()
def _seen(col):
columns.add(str(col))
six.moves.map(lambda item: six.moves.map(_seen,
list(six.iterkeys(item))), data)
return list(columns)
def increment_serial(serial=0):
# This provides for *roughly* unix timestamp based serial numbers
new_serial = timeutils.utcnow_ts()
@ -199,44 +150,6 @@ def increment_serial(serial=0):
return new_serial
def quote_string(string):
inparts = string.split(' ')
outparts = []
tmp = None
for part in inparts:
if part == '':
continue
elif part[0] == '"' and part[-1:] == '"' and part[-2:] != '\\"':
# Handle Quoted Words
outparts.append(part.strip('"'))
elif part[0] == '"':
# Handle Start of Quoted Sentance
tmp = part[1:]
elif tmp is not None and part[-1:] == '"' and part[-2:] != '\\"':
# Handle End of Quoted Sentance
tmp += " " + part.strip('"')
outparts.append(tmp)
tmp = None
elif tmp is not None:
# Handle Middle of Quoted Sentance
tmp += " " + part
else:
# Handle Standalone words
outparts.append(part)
if tmp is not None:
# Handle unclosed quoted strings
outparts.append(tmp)
# This looks odd, but both calls are necessary to ensure the end results
# is always consistent.
outparts = [o.replace('\\"', '"') for o in outparts]
outparts = [o.replace('"', '\\"') for o in outparts]
return '"' + '" "'.join(outparts) + '"'
def deep_dict_merge(a, b):
if not isinstance(b, dict):
return b
@ -324,14 +237,6 @@ def get_proxies():
return proxies
def extract_priority_from_data(recordset_type, record):
priority, data = None, record['data']
if recordset_type in ('MX', 'SRV'):
priority, _, data = record['data'].partition(" ")
priority = int(priority)
return priority, data
def cache_result(function):
"""A function decorator to cache the result of the first call, every
additional call will simply return the cached value.