795 lines
29 KiB
Python
795 lines
29 KiB
Python
# Copyright (c) 2012 OpenStack Foundation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import errno
|
|
import os.path
|
|
import re
|
|
import sys
|
|
|
|
import eventlet
|
|
import mock
|
|
import netaddr
|
|
from neutron_lib import constants
|
|
from neutron_lib import exceptions as exc
|
|
from oslo_log import log as logging
|
|
import six
|
|
import testtools
|
|
|
|
from neutron.common import exceptions as n_exc
|
|
from neutron.common import utils
|
|
from neutron.plugins.common import constants as p_const
|
|
from neutron.plugins.common import utils as plugin_utils
|
|
from neutron.tests import base
|
|
from neutron.tests.common import helpers
|
|
from neutron.tests.unit import tests
|
|
|
|
|
|
class TestParseMappings(base.BaseTestCase):
|
|
def parse(self, mapping_list, unique_values=True, unique_keys=True):
|
|
return utils.parse_mappings(mapping_list, unique_values, unique_keys)
|
|
|
|
def test_parse_mappings_fails_for_missing_separator(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse(['key'])
|
|
|
|
def test_parse_mappings_fails_for_missing_key(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse([':val'])
|
|
|
|
def test_parse_mappings_fails_for_missing_value(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse(['key:'])
|
|
|
|
def test_parse_mappings_fails_for_extra_separator(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse(['key:val:junk'])
|
|
|
|
def test_parse_mappings_fails_for_duplicate_key(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse(['key:val1', 'key:val2'])
|
|
|
|
def test_parse_mappings_fails_for_duplicate_value(self):
|
|
with testtools.ExpectedException(ValueError):
|
|
self.parse(['key1:val', 'key2:val'])
|
|
|
|
def test_parse_mappings_succeeds_for_one_mapping(self):
|
|
self.assertEqual({'key': 'val'}, self.parse(['key:val']))
|
|
|
|
def test_parse_mappings_succeeds_for_n_mappings(self):
|
|
self.assertEqual({'key1': 'val1', 'key2': 'val2'},
|
|
self.parse(['key1:val1', 'key2:val2']))
|
|
|
|
def test_parse_mappings_succeeds_for_duplicate_value(self):
|
|
self.assertEqual({'key1': 'val', 'key2': 'val'},
|
|
self.parse(['key1:val', 'key2:val'], False))
|
|
|
|
def test_parse_mappings_succeeds_for_no_mappings(self):
|
|
self.assertEqual({}, self.parse(['']))
|
|
|
|
def test_parse_mappings_succeeds_for_nonuniq_key(self):
|
|
self.assertEqual({'key': ['val1', 'val2']},
|
|
self.parse(['key:val1', 'key:val2', 'key:val2'],
|
|
unique_keys=False))
|
|
|
|
|
|
class TestParseTunnelRangesMixin(object):
|
|
TUN_MIN = None
|
|
TUN_MAX = None
|
|
TYPE = None
|
|
_err_prefix = "Invalid network tunnel range: '%d:%d' - "
|
|
_err_suffix = "%s is not a valid %s identifier."
|
|
_err_range = "End of tunnel range is less than start of tunnel range."
|
|
|
|
def _build_invalid_tunnel_range_msg(self, t_range_tuple, n):
|
|
bad_id = t_range_tuple[n - 1]
|
|
return (self._err_prefix % t_range_tuple) + (self._err_suffix
|
|
% (bad_id, self.TYPE))
|
|
|
|
def _build_range_reversed_msg(self, t_range_tuple):
|
|
return (self._err_prefix % t_range_tuple) + self._err_range
|
|
|
|
def _verify_range(self, tunnel_range):
|
|
return plugin_utils.verify_tunnel_range(tunnel_range, self.TYPE)
|
|
|
|
def _check_range_valid_ranges(self, tunnel_range):
|
|
self.assertIsNone(self._verify_range(tunnel_range))
|
|
|
|
def _check_range_invalid_ranges(self, bad_range, which):
|
|
expected_msg = self._build_invalid_tunnel_range_msg(bad_range, which)
|
|
err = self.assertRaises(exc.NetworkTunnelRangeError,
|
|
self._verify_range, bad_range)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def _check_range_reversed(self, bad_range):
|
|
err = self.assertRaises(exc.NetworkTunnelRangeError,
|
|
self._verify_range, bad_range)
|
|
expected_msg = self._build_range_reversed_msg(bad_range)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_range_tunnel_id_valid(self):
|
|
self._check_range_valid_ranges((self.TUN_MIN, self.TUN_MAX))
|
|
|
|
def test_range_tunnel_id_invalid(self):
|
|
self._check_range_invalid_ranges((-1, self.TUN_MAX), 1)
|
|
self._check_range_invalid_ranges((self.TUN_MIN,
|
|
self.TUN_MAX + 1), 2)
|
|
self._check_range_invalid_ranges((self.TUN_MIN - 1,
|
|
self.TUN_MAX + 1), 1)
|
|
|
|
def test_range_tunnel_id_reversed(self):
|
|
self._check_range_reversed((self.TUN_MAX, self.TUN_MIN))
|
|
|
|
|
|
class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
|
|
base.BaseTestCase):
|
|
TUN_MIN = p_const.MIN_GRE_ID
|
|
TUN_MAX = p_const.MAX_GRE_ID
|
|
TYPE = p_const.TYPE_GRE
|
|
|
|
|
|
class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
|
|
base.BaseTestCase):
|
|
TUN_MIN = p_const.MIN_VXLAN_VNI
|
|
TUN_MAX = p_const.MAX_VXLAN_VNI
|
|
TYPE = p_const.TYPE_VXLAN
|
|
|
|
|
|
class UtilTestParseVlanRanges(base.BaseTestCase):
|
|
_err_prefix = "Invalid network VLAN range: '"
|
|
_err_bad_count = "' - 'Need exactly two values for VLAN range'."
|
|
_err_bad_vlan = "' - '%s is not a valid VLAN tag'."
|
|
_err_range = "' - 'End of VLAN range is less than start of VLAN range'."
|
|
|
|
def _range_err_bad_count(self, nv_range):
|
|
return self._err_prefix + nv_range + self._err_bad_count
|
|
|
|
def _range_invalid_vlan(self, nv_range, n):
|
|
vlan = nv_range.split(':')[n]
|
|
return self._err_prefix + nv_range + (self._err_bad_vlan % vlan)
|
|
|
|
def _nrange_invalid_vlan(self, nv_range, n):
|
|
vlan = nv_range.split(':')[n]
|
|
v_range = ':'.join(nv_range.split(':')[1:])
|
|
return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
|
|
|
|
def _vrange_invalid_vlan(self, v_range_tuple, n):
|
|
vlan = v_range_tuple[n - 1]
|
|
v_range_str = '%d:%d' % v_range_tuple
|
|
return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
|
|
|
|
def _vrange_invalid(self, v_range_tuple):
|
|
v_range_str = '%d:%d' % v_range_tuple
|
|
return self._err_prefix + v_range_str + self._err_range
|
|
|
|
|
|
class TestVlanNetworkNameValid(base.BaseTestCase):
|
|
def parse_vlan_ranges(self, vlan_range):
|
|
return plugin_utils.parse_network_vlan_ranges(vlan_range)
|
|
|
|
def test_validate_provider_phynet_name_mixed(self):
|
|
self.assertRaises(n_exc.PhysicalNetworkNameError,
|
|
self.parse_vlan_ranges,
|
|
['', ':23:30', 'physnet1',
|
|
'tenant_net:100:200'])
|
|
|
|
def test_validate_provider_phynet_name_bad(self):
|
|
self.assertRaises(n_exc.PhysicalNetworkNameError,
|
|
self.parse_vlan_ranges,
|
|
[':1:34'])
|
|
|
|
|
|
class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
|
|
def verify_range(self, vlan_range):
|
|
return plugin_utils.verify_vlan_range(vlan_range)
|
|
|
|
def test_range_valid_ranges(self):
|
|
self.assertIsNone(self.verify_range((1, 2)))
|
|
self.assertIsNone(self.verify_range((1, 1999)))
|
|
self.assertIsNone(self.verify_range((100, 100)))
|
|
self.assertIsNone(self.verify_range((100, 200)))
|
|
self.assertIsNone(self.verify_range((4001, 4094)))
|
|
self.assertIsNone(self.verify_range((1, 4094)))
|
|
|
|
def check_one_vlan_invalid(self, bad_range, which):
|
|
expected_msg = self._vrange_invalid_vlan(bad_range, which)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.verify_range, bad_range)
|
|
self.assertEqual(str(err), expected_msg)
|
|
|
|
def test_range_first_vlan_invalid_negative(self):
|
|
self.check_one_vlan_invalid((-1, 199), 1)
|
|
|
|
def test_range_first_vlan_invalid_zero(self):
|
|
self.check_one_vlan_invalid((0, 199), 1)
|
|
|
|
def test_range_first_vlan_invalid_limit_plus_one(self):
|
|
self.check_one_vlan_invalid((4095, 199), 1)
|
|
|
|
def test_range_first_vlan_invalid_too_big(self):
|
|
self.check_one_vlan_invalid((9999, 199), 1)
|
|
|
|
def test_range_second_vlan_invalid_negative(self):
|
|
self.check_one_vlan_invalid((299, -1), 2)
|
|
|
|
def test_range_second_vlan_invalid_zero(self):
|
|
self.check_one_vlan_invalid((299, 0), 2)
|
|
|
|
def test_range_second_vlan_invalid_limit_plus_one(self):
|
|
self.check_one_vlan_invalid((299, 4095), 2)
|
|
|
|
def test_range_second_vlan_invalid_too_big(self):
|
|
self.check_one_vlan_invalid((299, 9999), 2)
|
|
|
|
def test_range_both_vlans_invalid_01(self):
|
|
self.check_one_vlan_invalid((-1, 0), 1)
|
|
|
|
def test_range_both_vlans_invalid_02(self):
|
|
self.check_one_vlan_invalid((0, 4095), 1)
|
|
|
|
def test_range_both_vlans_invalid_03(self):
|
|
self.check_one_vlan_invalid((4095, 9999), 1)
|
|
|
|
def test_range_both_vlans_invalid_04(self):
|
|
self.check_one_vlan_invalid((9999, -1), 1)
|
|
|
|
def test_range_reversed(self):
|
|
bad_range = (95, 10)
|
|
expected_msg = self._vrange_invalid(bad_range)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.verify_range, bad_range)
|
|
self.assertEqual(str(err), expected_msg)
|
|
|
|
|
|
class TestParseOneVlanRange(UtilTestParseVlanRanges):
|
|
def parse_one(self, cfg_entry):
|
|
return plugin_utils.parse_network_vlan_range(cfg_entry)
|
|
|
|
def test_parse_one_net_no_vlan_range(self):
|
|
config_str = "net1"
|
|
expected_networks = ("net1", None)
|
|
self.assertEqual(expected_networks, self.parse_one(config_str))
|
|
|
|
def test_parse_one_net_and_vlan_range(self):
|
|
config_str = "net1:100:199"
|
|
expected_networks = ("net1", (100, 199))
|
|
self.assertEqual(expected_networks, self.parse_one(config_str))
|
|
|
|
def test_parse_one_net_incomplete_range(self):
|
|
config_str = "net1:100"
|
|
expected_msg = self._range_err_bad_count(config_str)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_one_net_range_too_many(self):
|
|
config_str = "net1:100:150:200"
|
|
expected_msg = self._range_err_bad_count(config_str)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_one_net_vlan1_not_int(self):
|
|
config_str = "net1:foo:199"
|
|
expected_msg = self._range_invalid_vlan(config_str, 1)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_one_net_vlan2_not_int(self):
|
|
config_str = "net1:100:bar"
|
|
expected_msg = self._range_invalid_vlan(config_str, 2)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_one_net_and_max_range(self):
|
|
config_str = "net1:1:4094"
|
|
expected_networks = ("net1", (1, 4094))
|
|
self.assertEqual(expected_networks, self.parse_one(config_str))
|
|
|
|
def test_parse_one_net_range_bad_vlan1(self):
|
|
config_str = "net1:9000:150"
|
|
expected_msg = self._nrange_invalid_vlan(config_str, 1)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_one_net_range_bad_vlan2(self):
|
|
config_str = "net1:4000:4999"
|
|
expected_msg = self._nrange_invalid_vlan(config_str, 2)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_one, config_str)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
|
|
class TestParseVlanRangeList(UtilTestParseVlanRanges):
|
|
def parse_list(self, cfg_entries):
|
|
return plugin_utils.parse_network_vlan_ranges(cfg_entries)
|
|
|
|
def test_parse_list_one_net_no_vlan_range(self):
|
|
config_list = ["net1"]
|
|
expected_networks = {"net1": []}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_list_one_net_vlan_range(self):
|
|
config_list = ["net1:100:199"]
|
|
expected_networks = {"net1": [(100, 199)]}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_two_nets_no_vlan_range(self):
|
|
config_list = ["net1",
|
|
"net2"]
|
|
expected_networks = {"net1": [],
|
|
"net2": []}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_two_nets_range_and_no_range(self):
|
|
config_list = ["net1:100:199",
|
|
"net2"]
|
|
expected_networks = {"net1": [(100, 199)],
|
|
"net2": []}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_two_nets_no_range_and_range(self):
|
|
config_list = ["net1",
|
|
"net2:200:299"]
|
|
expected_networks = {"net1": [],
|
|
"net2": [(200, 299)]}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_two_nets_bad_vlan_range1(self):
|
|
config_list = ["net1:100",
|
|
"net2:200:299"]
|
|
expected_msg = self._range_err_bad_count(config_list[0])
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_list, config_list)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_two_nets_vlan_not_int2(self):
|
|
config_list = ["net1:100:199",
|
|
"net2:200:0x200"]
|
|
expected_msg = self._range_invalid_vlan(config_list[1], 2)
|
|
err = self.assertRaises(n_exc.NetworkVlanRangeError,
|
|
self.parse_list, config_list)
|
|
self.assertEqual(expected_msg, str(err))
|
|
|
|
def test_parse_two_nets_and_append_1_2(self):
|
|
config_list = ["net1:100:199",
|
|
"net1:1000:1099",
|
|
"net2:200:299"]
|
|
expected_networks = {"net1": [(100, 199),
|
|
(1000, 1099)],
|
|
"net2": [(200, 299)]}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
def test_parse_two_nets_and_append_1_3(self):
|
|
config_list = ["net1:100:199",
|
|
"net2:200:299",
|
|
"net1:1000:1099"]
|
|
expected_networks = {"net1": [(100, 199),
|
|
(1000, 1099)],
|
|
"net2": [(200, 299)]}
|
|
self.assertEqual(expected_networks, self.parse_list(config_list))
|
|
|
|
|
|
class TestDictUtils(base.BaseTestCase):
|
|
def test_dict2str(self):
|
|
dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
|
expected = "key1=value1,key2=value2,key3=value3"
|
|
self.assertEqual(expected, utils.dict2str(dic))
|
|
|
|
def test_str2dict(self):
|
|
string = "key1=value1,key2=value2,key3=value3"
|
|
expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
|
self.assertEqual(expected, utils.str2dict(string))
|
|
|
|
def test_dict_str_conversion(self):
|
|
dic = {"key1": "value1", "key2": "value2"}
|
|
self.assertEqual(dic, utils.str2dict(utils.dict2str(dic)))
|
|
|
|
def test_diff_list_of_dict(self):
|
|
old_list = [{"key1": "value1"},
|
|
{"key2": "value2"},
|
|
{"key3": "value3"}]
|
|
new_list = [{"key1": "value1"},
|
|
{"key2": "value2"},
|
|
{"key4": "value4"}]
|
|
added, removed = utils.diff_list_of_dict(old_list, new_list)
|
|
self.assertEqual([dict(key4="value4")], added)
|
|
self.assertEqual([dict(key3="value3")], removed)
|
|
|
|
|
|
class TestDict2Tuples(base.BaseTestCase):
|
|
def test_dict(self):
|
|
input_dict = {'foo': 'bar', '42': 'baz', 'aaa': 'zzz'}
|
|
expected = (('42', 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
|
|
output_tuple = utils.dict2tuple(input_dict)
|
|
self.assertEqual(expected, output_tuple)
|
|
|
|
|
|
class TestExceptionLogger(base.BaseTestCase):
|
|
def test_normal_call(self):
|
|
result = "Result"
|
|
|
|
@utils.exception_logger()
|
|
def func():
|
|
return result
|
|
|
|
self.assertEqual(result, func())
|
|
|
|
def test_raise(self):
|
|
result = "Result"
|
|
|
|
@utils.exception_logger()
|
|
def func():
|
|
raise RuntimeError(result)
|
|
|
|
self.assertRaises(RuntimeError, func)
|
|
|
|
def test_spawn_normal(self):
|
|
result = "Result"
|
|
logger = mock.Mock()
|
|
|
|
@utils.exception_logger(logger=logger)
|
|
def func():
|
|
return result
|
|
|
|
gt = eventlet.spawn(func)
|
|
self.assertEqual(result, gt.wait())
|
|
self.assertFalse(logger.called)
|
|
|
|
def test_spawn_raise(self):
|
|
result = "Result"
|
|
logger = mock.Mock()
|
|
|
|
@utils.exception_logger(logger=logger)
|
|
def func():
|
|
raise RuntimeError(result)
|
|
|
|
gt = eventlet.spawn(func)
|
|
self.assertRaises(RuntimeError, gt.wait)
|
|
self.assertTrue(logger.called)
|
|
|
|
def test_pool_spawn_normal(self):
|
|
logger = mock.Mock()
|
|
calls = mock.Mock()
|
|
|
|
@utils.exception_logger(logger=logger)
|
|
def func(i):
|
|
calls(i)
|
|
|
|
pool = eventlet.GreenPool(4)
|
|
for i in range(0, 4):
|
|
pool.spawn(func, i)
|
|
pool.waitall()
|
|
|
|
calls.assert_has_calls([mock.call(0), mock.call(1),
|
|
mock.call(2), mock.call(3)],
|
|
any_order=True)
|
|
self.assertFalse(logger.called)
|
|
|
|
def test_pool_spawn_raise(self):
|
|
logger = mock.Mock()
|
|
calls = mock.Mock()
|
|
|
|
@utils.exception_logger(logger=logger)
|
|
def func(i):
|
|
if i == 2:
|
|
raise RuntimeError(2)
|
|
else:
|
|
calls(i)
|
|
|
|
pool = eventlet.GreenPool(4)
|
|
for i in range(0, 4):
|
|
pool.spawn(func, i)
|
|
pool.waitall()
|
|
|
|
calls.assert_has_calls([mock.call(0), mock.call(1), mock.call(3)],
|
|
any_order=True)
|
|
self.assertTrue(logger.called)
|
|
|
|
|
|
class TestDvrServices(base.BaseTestCase):
|
|
|
|
def _test_is_dvr_serviced(self, device_owner, expected):
|
|
self.assertEqual(expected, utils.is_dvr_serviced(device_owner))
|
|
|
|
def test_is_dvr_serviced_with_lb_port(self):
|
|
self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCER, True)
|
|
|
|
def test_is_dvr_serviced_with_lbv2_port(self):
|
|
self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCERV2, True)
|
|
|
|
def test_is_dvr_serviced_with_dhcp_port(self):
|
|
self._test_is_dvr_serviced(constants.DEVICE_OWNER_DHCP, True)
|
|
|
|
def test_is_dvr_serviced_with_vm_port(self):
|
|
self._test_is_dvr_serviced(constants.DEVICE_OWNER_COMPUTE_PREFIX, True)
|
|
|
|
|
|
class TestIpToCidr(base.BaseTestCase):
|
|
def test_ip_to_cidr_ipv4_default(self):
|
|
self.assertEqual('15.1.2.3/32', utils.ip_to_cidr('15.1.2.3'))
|
|
|
|
def test_ip_to_cidr_ipv4_prefix(self):
|
|
self.assertEqual('15.1.2.3/24', utils.ip_to_cidr('15.1.2.3', 24))
|
|
|
|
def test_ip_to_cidr_ipv4_netaddr(self):
|
|
ip_address = netaddr.IPAddress('15.1.2.3')
|
|
self.assertEqual('15.1.2.3/32', utils.ip_to_cidr(ip_address))
|
|
|
|
def test_ip_to_cidr_ipv4_bad_prefix(self):
|
|
self.assertRaises(netaddr.core.AddrFormatError,
|
|
utils.ip_to_cidr, '15.1.2.3', 33)
|
|
|
|
def test_ip_to_cidr_ipv6_default(self):
|
|
self.assertEqual('::1/128', utils.ip_to_cidr('::1'))
|
|
|
|
def test_ip_to_cidr_ipv6_prefix(self):
|
|
self.assertEqual('::1/64', utils.ip_to_cidr('::1', 64))
|
|
|
|
def test_ip_to_cidr_ipv6_bad_prefix(self):
|
|
self.assertRaises(netaddr.core.AddrFormatError,
|
|
utils.ip_to_cidr, '2000::1', 129)
|
|
|
|
|
|
class TestCidrIsHost(base.BaseTestCase):
|
|
def test_is_cidr_host_ipv4(self):
|
|
self.assertTrue(utils.is_cidr_host('15.1.2.3/32'))
|
|
|
|
def test_is_cidr_host_ipv4_not_cidr(self):
|
|
self.assertRaises(ValueError,
|
|
utils.is_cidr_host,
|
|
'15.1.2.3')
|
|
|
|
def test_is_cidr_host_ipv6(self):
|
|
self.assertTrue(utils.is_cidr_host('2000::1/128'))
|
|
|
|
def test_is_cidr_host_ipv6_netaddr(self):
|
|
net = netaddr.IPNetwork("2000::1")
|
|
self.assertTrue(utils.is_cidr_host(net))
|
|
|
|
def test_is_cidr_host_ipv6_32(self):
|
|
self.assertFalse(utils.is_cidr_host('2000::1/32'))
|
|
|
|
def test_is_cidr_host_ipv6_not_cidr(self):
|
|
self.assertRaises(ValueError,
|
|
utils.is_cidr_host,
|
|
'2000::1')
|
|
|
|
def test_is_cidr_host_ipv6_not_cidr_netaddr(self):
|
|
ip_address = netaddr.IPAddress("2000::3")
|
|
self.assertRaises(ValueError,
|
|
utils.is_cidr_host,
|
|
ip_address)
|
|
|
|
|
|
class TestIpVersionFromInt(base.BaseTestCase):
|
|
def test_ip_version_from_int_ipv4(self):
|
|
self.assertEqual(constants.IPv4,
|
|
utils.ip_version_from_int(4))
|
|
|
|
def test_ip_version_from_int_ipv6(self):
|
|
self.assertEqual(constants.IPv6,
|
|
utils.ip_version_from_int(6))
|
|
|
|
def test_ip_version_from_int_illegal_int(self):
|
|
self.assertRaises(ValueError,
|
|
utils.ip_version_from_int,
|
|
8)
|
|
|
|
|
|
class TestDelayedStringRenderer(base.BaseTestCase):
|
|
def test_call_deferred_until_str(self):
|
|
my_func = mock.MagicMock(return_value='Brie cheese!')
|
|
delayed = utils.DelayedStringRenderer(my_func, 1, 2, key_arg=44)
|
|
self.assertFalse(my_func.called)
|
|
string = "Type: %s" % delayed
|
|
my_func.assert_called_once_with(1, 2, key_arg=44)
|
|
self.assertEqual("Type: Brie cheese!", string)
|
|
|
|
def test_not_called_with_low_log_level(self):
|
|
LOG = logging.getLogger(__name__)
|
|
# make sure we return logging to previous level
|
|
current_log_level = LOG.logger.getEffectiveLevel()
|
|
self.addCleanup(LOG.logger.setLevel, current_log_level)
|
|
|
|
my_func = mock.MagicMock()
|
|
delayed = utils.DelayedStringRenderer(my_func)
|
|
|
|
# set to warning so we shouldn't be logging debug messages
|
|
LOG.logger.setLevel(logging.logging.WARNING)
|
|
LOG.debug("Hello %s", delayed)
|
|
self.assertFalse(my_func.called)
|
|
|
|
# but it should be called with the debug level
|
|
LOG.logger.setLevel(logging.logging.DEBUG)
|
|
LOG.debug("Hello %s", delayed)
|
|
self.assertTrue(my_func.called)
|
|
|
|
|
|
class TestEnsureDir(base.BaseTestCase):
|
|
@mock.patch('os.makedirs')
|
|
def test_ensure_dir_no_fail_if_exists(self, makedirs):
|
|
error = OSError()
|
|
error.errno = errno.EEXIST
|
|
makedirs.side_effect = error
|
|
utils.ensure_dir("/etc/create/concurrently")
|
|
|
|
@mock.patch('os.makedirs')
|
|
def test_ensure_dir_calls_makedirs(self, makedirs):
|
|
utils.ensure_dir("/etc/create/directory")
|
|
makedirs.assert_called_once_with("/etc/create/directory", 0o755)
|
|
|
|
|
|
class TestCamelize(base.BaseTestCase):
|
|
def test_camelize(self):
|
|
data = {'bandwidth_limit': 'BandwidthLimit',
|
|
'test': 'Test',
|
|
'some__more__dashes': 'SomeMoreDashes',
|
|
'a_penguin_walks_into_a_bar': 'APenguinWalksIntoABar'}
|
|
|
|
for s, expected in data.items():
|
|
self.assertEqual(expected, utils.camelize(s))
|
|
|
|
|
|
class TestRoundVal(base.BaseTestCase):
|
|
def test_round_val_ok(self):
|
|
for expected, value in ((0, 0),
|
|
(0, 0.1),
|
|
(1, 0.5),
|
|
(1, 1.49),
|
|
(2, 1.5)):
|
|
self.assertEqual(expected, utils.round_val(value))
|
|
|
|
|
|
class TestGetRandomString(base.BaseTestCase):
|
|
def test_get_random_string(self):
|
|
length = 127
|
|
random_string = utils.get_random_string(length)
|
|
self.assertEqual(length, len(random_string))
|
|
regex = re.compile('^[0-9a-fA-F]+$')
|
|
self.assertIsNotNone(regex.match(random_string))
|
|
|
|
|
|
class TestSafeDecodeUtf8(base.BaseTestCase):
|
|
|
|
@helpers.requires_py2
|
|
def test_py2_does_nothing(self):
|
|
s = 'test-py2'
|
|
self.assertIs(s, utils.safe_decode_utf8(s))
|
|
|
|
@helpers.requires_py3
|
|
def test_py3_decoded_valid_bytes(self):
|
|
s = bytes('test-py2', 'utf-8')
|
|
decoded_str = utils.safe_decode_utf8(s)
|
|
self.assertIsInstance(decoded_str, six.text_type)
|
|
self.assertEqual(s, decoded_str.encode('utf-8'))
|
|
|
|
@helpers.requires_py3
|
|
def test_py3_decoded_invalid_bytes(self):
|
|
s = bytes('test-py2', 'utf_16')
|
|
decoded_str = utils.safe_decode_utf8(s)
|
|
self.assertIsInstance(decoded_str, six.text_type)
|
|
|
|
|
|
class TestPortRuleMasking(base.BaseTestCase):
|
|
def test_port_rule_masking(self):
|
|
compare_rules = lambda x, y: set(x) == set(y) and len(x) == len(y)
|
|
|
|
# Test 1.
|
|
port_min = 5
|
|
port_max = 12
|
|
expected_rules = ['0x0005', '0x000c', '0x0006/0xfffe',
|
|
'0x0008/0xfffc']
|
|
rules = utils.port_rule_masking(port_min, port_max)
|
|
self.assertTrue(compare_rules(rules, expected_rules))
|
|
|
|
# Test 2.
|
|
port_min = 20
|
|
port_max = 130
|
|
expected_rules = ['0x0014/0xfffe', '0x0016/0xfffe', '0x0018/0xfff8',
|
|
'0x0020/0xffe0', '0x0040/0xffc0', '0x0080/0xfffe',
|
|
'0x0082']
|
|
rules = utils.port_rule_masking(port_min, port_max)
|
|
self.assertEqual(expected_rules, rules)
|
|
|
|
# Test 3.
|
|
port_min = 4501
|
|
port_max = 33057
|
|
expected_rules = ['0x1195', '0x1196/0xfffe', '0x1198/0xfff8',
|
|
'0x11a0/0xffe0', '0x11c0/0xffc0', '0x1200/0xfe00',
|
|
'0x1400/0xfc00', '0x1800/0xf800', '0x2000/0xe000',
|
|
'0x4000/0xc000', '0x8021/0xff00', '0x8101/0xffe0',
|
|
'0x8120/0xfffe']
|
|
|
|
rules = utils.port_rule_masking(port_min, port_max)
|
|
self.assertEqual(expected_rules, rules)
|
|
|
|
def test_port_rule_masking_min_higher_than_max(self):
|
|
port_min = 10
|
|
port_max = 5
|
|
with testtools.ExpectedException(ValueError):
|
|
utils.port_rule_masking(port_min, port_max)
|
|
|
|
|
|
class TestAuthenticEUI(base.BaseTestCase):
|
|
|
|
def test_retains_original_format(self):
|
|
for mac_str in ('FA-16-3E-73-A2-E9', 'fa:16:3e:73:a2:e9'):
|
|
self.assertEqual(mac_str, str(utils.AuthenticEUI(mac_str)))
|
|
|
|
def test_invalid_values(self):
|
|
for mac in ('XXXX', 'ypp', 'g3:vvv'):
|
|
with testtools.ExpectedException(netaddr.core.AddrFormatError):
|
|
utils.AuthenticEUI(mac)
|
|
|
|
|
|
class TestAuthenticIPNetwork(base.BaseTestCase):
|
|
|
|
def test_retains_original_format(self):
|
|
for addr_str in ('10.0.0.0/24', '10.0.0.10/32', '100.0.0.1'):
|
|
self.assertEqual(addr_str, str(utils.AuthenticIPNetwork(addr_str)))
|
|
|
|
def test_invalid_values(self):
|
|
for addr in ('XXXX', 'ypp', 'g3:vvv'):
|
|
with testtools.ExpectedException(netaddr.core.AddrFormatError):
|
|
utils.AuthenticIPNetwork(addr)
|
|
|
|
|
|
class TestExcDetails(base.BaseTestCase):
|
|
|
|
def test_attach_exc_details(self):
|
|
e = Exception()
|
|
utils.attach_exc_details(e, 'details')
|
|
self.assertEqual('details', utils.extract_exc_details(e))
|
|
|
|
def test_attach_exc_details_with_interpolation(self):
|
|
e = Exception()
|
|
utils.attach_exc_details(e, 'details: %s', 'foo')
|
|
self.assertEqual('details: foo', utils.extract_exc_details(e))
|
|
|
|
def test_attach_exc_details_with_None_interpolation(self):
|
|
e = Exception()
|
|
utils.attach_exc_details(e, 'details: %s', None)
|
|
self.assertEqual(
|
|
'details: %s' % str(None), utils.extract_exc_details(e))
|
|
|
|
def test_attach_exc_details_with_multiple_interpolation(self):
|
|
e = Exception()
|
|
utils.attach_exc_details(
|
|
e, 'details: %s, %s', ('foo', 'bar'))
|
|
self.assertEqual('details: foo, bar', utils.extract_exc_details(e))
|
|
|
|
def test_attach_exc_details_with_dict_interpolation(self):
|
|
e = Exception()
|
|
utils.attach_exc_details(
|
|
e, 'details: %(foo)s, %(bar)s', {'foo': 'foo', 'bar': 'bar'})
|
|
self.assertEqual('details: foo, bar', utils.extract_exc_details(e))
|
|
|
|
def test_extract_exc_details_no_details_attached(self):
|
|
self.assertIsInstance(
|
|
utils.extract_exc_details(Exception()), six.text_type)
|
|
|
|
|
|
class ImportModulesRecursivelyTestCase(base.BaseTestCase):
|
|
|
|
def test_object_modules(self):
|
|
example_module = 'neutron.tests.unit.tests.example.dir.example_module'
|
|
sys.modules.pop(example_module, None)
|
|
modules = utils.import_modules_recursively(
|
|
os.path.dirname(tests.__file__))
|
|
self.assertIn(example_module, modules)
|
|
self.assertIn(example_module, sys.modules)
|