Merge "Start migration of utility methods"

This commit is contained in:
Jenkins 2016-08-19 14:41:12 +00:00 committed by Gerrit Code Review
commit b7238d4393
12 changed files with 593 additions and 9 deletions

View File

@ -176,6 +176,24 @@ class BaseTestCase(testtools.TestCase):
self.addOnException(self.check_for_systemexit)
self.orig_pid = os.getpid()
def get_new_temp_dir(self):
"""Create a new temporary directory.
:returns fixtures.TempDir
"""
return self.useFixture(fixtures.TempDir())
def get_default_temp_dir(self):
"""Create a default temporary directory.
Returns the same directory during the whole test case.
:returns fixtures.TempDir
"""
if not hasattr(self, '_temp_dir'):
self._temp_dir = self.get_new_temp_dir()
return self._temp_dir
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
if os.getpid() != self.orig_pid:

View File

@ -13,16 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import fixtures
import warnings
def _safe_sort_key(value):
"""Return value hash or build one for dictionaries."""
if isinstance(value, collections.Mapping):
return sorted(value.items())
return value
from neutron_lib.utils import helpers
class UnorderedList(list):
@ -31,8 +25,8 @@ class UnorderedList(list):
def __eq__(self, other):
if not isinstance(other, list):
return False
return (sorted(self, key=_safe_sort_key) ==
sorted(other, key=_safe_sort_key))
return (sorted(self, key=helpers.safe_sort_key) ==
sorted(other, key=helpers.safe_sort_key))
def __neq__(self, other):
return not self == other

View File

View File

@ -0,0 +1,81 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import os.path
import stat
import mock
from neutron_lib.tests import _base as base
from neutron_lib.utils import file
class TestEnsureDir(base.BaseTestCase):
@mock.patch('os.makedirs')
def test_ensure_dir_no_fail_if_exists(self, makedirs):
error = OSError()
error.errno = errno.EEXIST
makedirs.side_effect = error
file.ensure_dir("/etc/create/concurrently")
@mock.patch('os.makedirs')
def test_ensure_dir_oserr(self, makedirs):
error = OSError()
error.errno = errno.EPERM
makedirs.side_effect = error
self.assertRaises(OSError,
file.ensure_dir,
"/etc/create/directory")
makedirs.assert_called_once_with("/etc/create/directory", 0o755)
@mock.patch('os.makedirs')
def test_ensure_dir_calls_makedirs(self, makedirs):
file.ensure_dir("/etc/create/directory")
makedirs.assert_called_once_with("/etc/create/directory", 0o755)
class TestReplaceFile(base.BaseTestCase):
def setUp(self):
super(TestReplaceFile, self).setUp()
temp_dir = self.get_default_temp_dir().path
self.file_name = os.path.join(temp_dir, "new_file")
self.data = "data to copy"
def _verify_result(self, file_mode):
self.assertTrue(os.path.exists(self.file_name))
with open(self.file_name) as f:
content = f.read()
self.assertEqual(self.data, content)
mode = os.stat(self.file_name).st_mode
self.assertEqual(file_mode, stat.S_IMODE(mode))
def test_replace_file_default_mode(self):
file_mode = 0o644
file.replace_file(self.file_name, self.data)
self._verify_result(file_mode)
def test_replace_file_custom_mode(self):
file_mode = 0o722
file.replace_file(self.file_name, self.data, file_mode)
self._verify_result(file_mode)
def test_replace_file_custom_mode_twice(self):
file_mode = 0o722
file.replace_file(self.file_name, self.data, file_mode)
self.data = "new data to copy"
file_mode = 0o777
file.replace_file(self.file_name, self.data, file_mode)
self._verify_result(file_mode)

View File

@ -0,0 +1,219 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import re
import six
import testtools
from neutron_lib.tests import _base as base
from neutron_lib.utils import helpers
class TestParseMappings(base.BaseTestCase):
def parse(self, mapping_list, unique_values=True, unique_keys=True):
return helpers.parse_mappings(mapping_list, unique_values, unique_keys)
def test_parse_mappings_fails_for_missing_separator(self):
with testtools.ExpectedException(ValueError):
self.parse(['key'])
def test_parse_mappings_fails_for_missing_key(self):
with testtools.ExpectedException(ValueError):
self.parse([':val'])
def test_parse_mappings_fails_for_missing_value(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:'])
def test_parse_mappings_fails_for_extra_separator(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:val:junk'])
def test_parse_mappings_fails_for_duplicate_key(self):
with testtools.ExpectedException(ValueError):
self.parse(['key:val1', 'key:val2'])
def test_parse_mappings_fails_for_duplicate_value(self):
with testtools.ExpectedException(ValueError):
self.parse(['key1:val', 'key2:val'])
def test_parse_mappings_succeeds_for_one_mapping(self):
self.assertEqual({'key': 'val'}, self.parse(['key:val']))
def test_parse_mappings_succeeds_for_n_mappings(self):
self.assertEqual({'key1': 'val1', 'key2': 'val2'},
self.parse(['key1:val1', 'key2:val2']))
def test_parse_mappings_succeeds_for_duplicate_value(self):
self.assertEqual({'key1': 'val', 'key2': 'val'},
self.parse(['key1:val', 'key2:val'], False))
def test_parse_mappings_succeeds_for_no_mappings(self):
self.assertEqual({}, self.parse(['']))
def test_parse_mappings_succeeds_for_nonuniq_key(self):
self.assertEqual({'key': ['val1', 'val2']},
self.parse(['key:val1', 'key:val2', 'key:val2'],
unique_keys=False))
class TestCompareElements(base.BaseTestCase):
def test_compare_elements(self):
self.assertFalse(helpers.compare_elements([], ['napoli']))
self.assertFalse(helpers.compare_elements(None, ['napoli']))
self.assertFalse(helpers.compare_elements(['napoli'], []))
self.assertFalse(helpers.compare_elements(['napoli'], None))
self.assertFalse(helpers.compare_elements(['napoli', 'juve'],
['juve']))
self.assertTrue(helpers.compare_elements(['napoli', 'juve'],
['napoli', 'juve']))
self.assertTrue(helpers.compare_elements(['napoli', 'juve'],
['juve', 'napoli']))
class TestDictUtils(base.BaseTestCase):
def test_dict2str(self):
dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
expected = "key1=value1,key2=value2,key3=value3"
self.assertEqual(expected, helpers.dict2str(dic))
def test_str2dict(self):
string = "key1=value1,key2=value2,key3=value3"
expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
self.assertEqual(expected, helpers.str2dict(string))
def test_dict_str_conversion(self):
dic = {"key1": "value1", "key2": "value2"}
self.assertEqual(dic, helpers.str2dict(helpers.dict2str(dic)))
def test_diff_list_of_dict(self):
old_list = [{"key1": "value1"},
{"key2": "value2"},
{"key3": "value3"}]
new_list = [{"key1": "value1"},
{"key2": "value2"},
{"key4": "value4"}]
added, removed = helpers.diff_list_of_dict(old_list, new_list)
self.assertEqual(added, [dict(key4="value4")])
self.assertEqual(removed, [dict(key3="value3")])
class TestDict2Tuples(base.BaseTestCase):
def test_dict(self):
input_dict = {'foo': 'bar', '42': 'baz', 'aaa': 'zzz'}
expected = (('42', 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
output_tuple = helpers.dict2tuple(input_dict)
self.assertEqual(expected, output_tuple)
class TestCamelize(base.BaseTestCase):
def test_camelize(self):
data = {'bandwidth_limit': 'BandwidthLimit',
'test': 'Test',
'some__more__dashes': 'SomeMoreDashes',
'a_penguin_walks_into_a_bar': 'APenguinWalksIntoABar'}
for s, expected in data.items():
self.assertEqual(expected, helpers.camelize(s))
class TestRoundVal(base.BaseTestCase):
def test_round_val_ok(self):
for expected, value in ((0, 0),
(0, 0.1),
(1, 0.5),
(1, 1.49),
(2, 1.5)):
self.assertEqual(expected, helpers.round_val(value))
class TestGetRandomString(base.BaseTestCase):
def test_get_random_string(self):
length = 127
random_string = helpers.get_random_string(length)
self.assertEqual(length, len(random_string))
regex = re.compile('^[0-9a-fA-F]+$')
self.assertIsNotNone(regex.match(random_string))
def requires_py2(testcase):
return testtools.skipUnless(six.PY2, "requires python 2.x")(testcase)
def requires_py3(testcase):
return testtools.skipUnless(six.PY3, "requires python 3.x")(testcase)
class TestSafeDecodeUtf8(base.BaseTestCase):
@requires_py2
def test_py2_does_nothing(self):
s = 'test-py2'
self.assertIs(s, helpers.safe_decode_utf8(s))
@requires_py3
def test_py3_decoded_valid_bytes(self):
s = bytes('test-py2', 'utf-8')
decoded_str = helpers.safe_decode_utf8(s)
self.assertIsInstance(decoded_str, six.text_type)
self.assertEqual(s, decoded_str.encode('utf-8'))
@requires_py3
def test_py3_decoded_invalid_bytes(self):
s = bytes('test-py2', 'utf_16')
decoded_str = helpers.safe_decode_utf8(s)
self.assertIsInstance(decoded_str, six.text_type)
class TestSafeSortKey(base.BaseTestCase):
def test_safe_sort_key(self):
data1 = {'k1': 'v1',
'k2': 'v2'}
data2 = {'k2': 'v2',
'k1': 'v1'}
self.assertEqual(helpers.safe_sort_key(data1),
helpers.safe_sort_key(data2))
def _create_dict_from_list(self, list_data):
d = collections.defaultdict(list)
for k, v in list_data:
d[k].append(v)
return d
def test_safe_sort_key_mapping_ne(self):
list1 = [('yellow', 1), ('blue', 2), ('yellow', 3),
('blue', 4), ('red', 1)]
data1 = self._create_dict_from_list(list1)
list2 = [('yellow', 3), ('blue', 4), ('yellow', 1),
('blue', 2), ('red', 1)]
data2 = self._create_dict_from_list(list2)
self.assertNotEqual(helpers.safe_sort_key(data1),
helpers.safe_sort_key(data2))
def test_safe_sort_key_mapping(self):
list1 = [('yellow', 1), ('blue', 2), ('red', 1)]
data1 = self._create_dict_from_list(list1)
list2 = [('blue', 2), ('red', 1), ('yellow', 1)]
data2 = self._create_dict_from_list(list2)
self.assertEqual(helpers.safe_sort_key(data1),
helpers.safe_sort_key(data2))

View File

@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import mock
from neutron_lib.tests import _base as base
from neutron_lib.utils import host
class TestCpuCount(base.BaseTestCase):
@mock.patch.object(multiprocessing, 'cpu_count',
return_value=7)
def test_cpu_count(self, mock_cpu_count):
self.assertEqual(7, host.cpu_count())
mock_cpu_count.assert_called_once_with()
@mock.patch.object(multiprocessing, 'cpu_count',
side_effect=NotImplementedError())
def test_cpu_count_not_implemented(self, mock_cpu_count):
self.assertEqual(1, host.cpu_count())
mock_cpu_count.assert_called_once_with()

View File

@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import mock
from neutron_lib.tests import _base as base
from neutron_lib.utils import net
class TestGetHostname(base.BaseTestCase):
@mock.patch.object(socket, 'gethostname',
return_value='fake-host-name')
def test_get_hostname(self, mock_gethostname):
self.assertEqual('fake-host-name',
net.get_hostname())
mock_gethostname.assert_called_once_with()

View File

46
neutron_lib/utils/file.py Normal file
View File

@ -0,0 +1,46 @@
# Copyright 2011, VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import os
import tempfile
def ensure_dir(dir_path):
"""Ensure a directory with 755 permissions mode."""
try:
os.makedirs(dir_path, 0o755)
except OSError as e:
# If the directory already existed, don't raise the error.
if e.errno != errno.EEXIST:
raise
def replace_file(file_name, data, file_mode=0o644):
"""Replaces the contents of file_name with data in a safe manner.
First write to a temp file and then rename. Since POSIX renames are
atomic, the file is unlikely to be corrupted by competing writes.
We create the tempfile on the same device to ensure that it can be renamed.
"""
base_dir = os.path.dirname(os.path.abspath(file_name))
with tempfile.NamedTemporaryFile('w+',
dir=base_dir,
delete=False) as tmp_file:
tmp_file.write(data)
os.chmod(tmp_file.name, file_mode)
os.rename(tmp_file.name, file_name)

View File

@ -0,0 +1,124 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import decimal
import random
import six
from neutron_lib._i18n import _
def parse_mappings(mapping_list, unique_values=True, unique_keys=True):
"""Parse a list of mapping strings into a dictionary.
:param mapping_list: a list of strings of the form '<key>:<value>'
:param unique_values: values must be unique if True
:param unique_keys: keys must be unique if True, else implies that keys
and values are not unique
:returns: a dict mapping keys to values or to list of values
"""
mappings = {}
for mapping in mapping_list:
mapping = mapping.strip()
if not mapping:
continue
split_result = mapping.split(':')
if len(split_result) != 2:
raise ValueError(_("Invalid mapping: '%s'") % mapping)
key = split_result[0].strip()
if not key:
raise ValueError(_("Missing key in mapping: '%s'") % mapping)
value = split_result[1].strip()
if not value:
raise ValueError(_("Missing value in mapping: '%s'") % mapping)
if unique_keys:
if key in mappings:
raise ValueError(_("Key %(key)s in mapping: '%(mapping)s' not "
"unique") % {'key': key,
'mapping': mapping})
if unique_values and value in mappings.values():
raise ValueError(_("Value %(value)s in mapping: '%(mapping)s' "
"not unique") % {'value': value,
'mapping': mapping})
mappings[key] = value
else:
mappings.setdefault(key, [])
if value not in mappings[key]:
mappings[key].append(value)
return mappings
def compare_elements(a, b):
"""Compare elements if a and b have same elements.
This method doesn't consider ordering
"""
return set(a or []) == set(b or [])
def safe_sort_key(value):
"""Return value hash or build one for dictionaries."""
if isinstance(value, collections.Mapping):
return sorted(value.items())
return value
def dict2str(dic):
return ','.join("%s=%s" % (key, val)
for key, val in sorted(six.iteritems(dic)))
def str2dict(string):
res_dict = {}
for keyvalue in string.split(','):
(key, value) = keyvalue.split('=', 1)
res_dict[key] = value
return res_dict
def dict2tuple(d):
items = list(d.items())
items.sort()
return tuple(items)
def diff_list_of_dict(old_list, new_list):
new_set = set([dict2str(l) for l in new_list])
old_set = set([dict2str(l) for l in old_list])
added = new_set - old_set
removed = old_set - new_set
return [str2dict(a) for a in added], [str2dict(r) for r in removed]
def get_random_string(length):
"""Get a random hex string of the specified length."""
return "{0:0{1}x}".format(random.getrandbits(length * 4), length)
def camelize(s):
return ''.join(s.replace('_', ' ').title().split())
def round_val(val):
# we rely on decimal module since it behaves consistently across Python
# versions (2.x vs. 3.x)
return int(decimal.Decimal(val).quantize(decimal.Decimal('1'),
rounding=decimal.ROUND_HALF_UP))
def safe_decode_utf8(s):
if six.PY3 and isinstance(s, bytes):
return s.decode('utf-8', 'surrogateescape')
return s

21
neutron_lib/utils/host.py Normal file
View File

@ -0,0 +1,21 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
def cpu_count():
try:
return multiprocessing.cpu_count()
except NotImplementedError:
return 1

18
neutron_lib/utils/net.py Normal file
View File

@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
def get_hostname():
return socket.gethostname()