neutron-lib: start using neutron-lib helpers
The following helpers are used: parse_mappings compare_elements safe_sort_key Change-Id: I5947b473330fd29f8d4c1a08f03d007a52c8dfd9
This commit is contained in:
parent
fcd47cca6e
commit
396abb8ccb
@ -16,13 +16,13 @@
|
||||
import sys
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _, _LE, _LW
|
||||
from neutron.agent.l2 import l2_agent_extension
|
||||
from neutron.agent.linux import bridge_lib
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.ml2.drivers.linuxbridge.agent.common import (
|
||||
constants as linux_bridge_constants)
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
|
||||
@ -142,7 +142,7 @@ class FdbPopulationAgentExtension(
|
||||
'%(driver_type)s'), {'driver_type': driver_type})
|
||||
sys.exit(1)
|
||||
|
||||
self.device_mappings = n_utils.parse_mappings(
|
||||
self.device_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.FDB.shared_physical_device_mappings, unique_keys=False)
|
||||
devices = self._get_devices()
|
||||
if not devices:
|
||||
|
@ -15,6 +15,7 @@
|
||||
import collections
|
||||
import netaddr
|
||||
from neutron_lib import constants as lib_constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _, _LE, _LW
|
||||
@ -449,9 +450,9 @@ class RouterInfo(object):
|
||||
current_port = current_ports_dict.get(existing_port['id'])
|
||||
if current_port:
|
||||
if (sorted(existing_port['fixed_ips'],
|
||||
key=common_utils.safe_sort_key) !=
|
||||
key=helpers.safe_sort_key) !=
|
||||
sorted(current_port['fixed_ips'],
|
||||
key=common_utils.safe_sort_key)):
|
||||
key=helpers.safe_sort_key)):
|
||||
updated_ports[current_port['id']] = current_port
|
||||
return updated_ports
|
||||
|
||||
|
@ -12,12 +12,12 @@
|
||||
|
||||
import sys
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _LE, _LI
|
||||
from neutron.common import config
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.ml2.drivers.linuxbridge.agent \
|
||||
import linuxbridge_neutron_agent
|
||||
|
||||
@ -27,7 +27,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
def remove_empty_bridges():
|
||||
try:
|
||||
interface_mappings = n_utils.parse_mappings(
|
||||
interface_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
|
||||
except ValueError as e:
|
||||
LOG.error(_LE("Parsing physical_interface_mappings failed: %s."), e)
|
||||
@ -35,7 +35,7 @@ def remove_empty_bridges():
|
||||
LOG.info(_LI("Interface mappings: %s."), interface_mappings)
|
||||
|
||||
try:
|
||||
bridge_mappings = n_utils.parse_mappings(
|
||||
bridge_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.LINUX_BRIDGE.bridge_mappings)
|
||||
except ValueError as e:
|
||||
LOG.error(_LE("Parsing bridge_mappings failed: %s."), e)
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
import collections
|
||||
import decimal
|
||||
import errno
|
||||
import functools
|
||||
@ -34,10 +33,12 @@ import tempfile
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from debtcollector import removals
|
||||
import eventlet
|
||||
from eventlet.green import subprocess
|
||||
import netaddr
|
||||
from neutron_lib import constants as n_const
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
@ -82,44 +83,13 @@ def subprocess_popen(args, stdin=None, stdout=None, stderr=None, shell=False,
|
||||
close_fds=close_fds, env=env)
|
||||
|
||||
|
||||
@removals.remove(
|
||||
message="Use parse_mappings from neutron_lib.utils.helpers",
|
||||
version="Ocata",
|
||||
removal_version="Pike")
|
||||
def parse_mappings(mapping_list, unique_values=True, unique_keys=True):
|
||||
"""Parse a list of mapping strings into a dictionary.
|
||||
|
||||
:param mapping_list: a list of strings of the form '<key>:<value>'
|
||||
:param unique_values: values must be unique if True
|
||||
:param unique_keys: keys must be unique if True, else implies that keys
|
||||
and values are not unique
|
||||
:returns: a dict mapping keys to values or to list of values
|
||||
"""
|
||||
mappings = {}
|
||||
for mapping in mapping_list:
|
||||
mapping = mapping.strip()
|
||||
if not mapping:
|
||||
continue
|
||||
split_result = mapping.split(':')
|
||||
if len(split_result) != 2:
|
||||
raise ValueError(_("Invalid mapping: '%s'") % mapping)
|
||||
key = split_result[0].strip()
|
||||
if not key:
|
||||
raise ValueError(_("Missing key in mapping: '%s'") % mapping)
|
||||
value = split_result[1].strip()
|
||||
if not value:
|
||||
raise ValueError(_("Missing value in mapping: '%s'") % mapping)
|
||||
if unique_keys:
|
||||
if key in mappings:
|
||||
raise ValueError(_("Key %(key)s in mapping: '%(mapping)s' not "
|
||||
"unique") % {'key': key,
|
||||
'mapping': mapping})
|
||||
if unique_values and value in mappings.values():
|
||||
raise ValueError(_("Value %(value)s in mapping: '%(mapping)s' "
|
||||
"not unique") % {'value': value,
|
||||
'mapping': mapping})
|
||||
mappings[key] = value
|
||||
else:
|
||||
mappings.setdefault(key, [])
|
||||
if value not in mappings[key]:
|
||||
mappings[key].append(value)
|
||||
return mappings
|
||||
return helpers.parse_mappings(mapping_list, unique_values=unique_values,
|
||||
unique_keys=unique_keys)
|
||||
|
||||
|
||||
def get_hostname():
|
||||
@ -130,23 +100,20 @@ def get_first_host_ip(net, ip_version):
|
||||
return str(netaddr.IPAddress(net.first + 1, ip_version))
|
||||
|
||||
|
||||
@removals.remove(
|
||||
message="Use compare_elements from neutron_lib.utils.helpers",
|
||||
version="Ocata",
|
||||
removal_version="Pike")
|
||||
def compare_elements(a, b):
|
||||
"""Compare elements if a and b have same elements.
|
||||
|
||||
This method doesn't consider ordering
|
||||
"""
|
||||
if a is None:
|
||||
a = []
|
||||
if b is None:
|
||||
b = []
|
||||
return set(a) == set(b)
|
||||
return helpers.compare_elements(a, b)
|
||||
|
||||
|
||||
@removals.remove(
|
||||
message="Use safe_sort_key from neutron_lib.utils.helpers",
|
||||
version="Ocata",
|
||||
removal_version="Pike")
|
||||
def safe_sort_key(value):
|
||||
"""Return value hash or build one for dictionaries."""
|
||||
if isinstance(value, collections.Mapping):
|
||||
return sorted(value.items())
|
||||
return value
|
||||
return helpers.safe_sort_key(value)
|
||||
|
||||
|
||||
def dict2str(dic):
|
||||
|
@ -15,6 +15,7 @@
|
||||
import netaddr
|
||||
from neutron_lib.api import validators
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_utils import uuidutils
|
||||
from sqlalchemy.orm import exc
|
||||
from sqlalchemy.orm import scoped_session
|
||||
@ -746,7 +747,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
|
||||
need_notify = False
|
||||
port_updates = port['port']
|
||||
if (ext_sg.SECURITYGROUPS in port_updates and
|
||||
not utils.compare_elements(
|
||||
not helpers.compare_elements(
|
||||
original_port.get(ext_sg.SECURITYGROUPS),
|
||||
port_updates[ext_sg.SECURITYGROUPS])):
|
||||
# delete the port binding and read it with the new rules
|
||||
|
@ -15,12 +15,12 @@
|
||||
|
||||
import netaddr
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import netutils
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from neutron._i18n import _, _LW
|
||||
from neutron.common import utils
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.models import allowed_address_pair as aap_models
|
||||
from neutron.db.models import securitygroup as sg_models
|
||||
@ -95,7 +95,7 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
|
||||
|
||||
def check_and_notify_security_group_member_changed(
|
||||
self, context, original_port, updated_port):
|
||||
sg_change = not utils.compare_elements(
|
||||
sg_change = not helpers.compare_elements(
|
||||
original_port.get(ext_sg.SECURITYGROUPS),
|
||||
updated_port.get(ext_sg.SECURITYGROUPS))
|
||||
if sg_change:
|
||||
@ -115,7 +115,7 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
|
||||
need_notify = False
|
||||
if (original_port['fixed_ips'] != updated_port['fixed_ips'] or
|
||||
original_port['mac_address'] != updated_port['mac_address'] or
|
||||
not utils.compare_elements(
|
||||
not helpers.compare_elements(
|
||||
original_port.get(ext_sg.SECURITYGROUPS),
|
||||
updated_port.get(ext_sg.SECURITYGROUPS))):
|
||||
need_notify = True
|
||||
|
@ -15,13 +15,13 @@
|
||||
|
||||
import random
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log
|
||||
|
||||
from neutron._i18n import _LE
|
||||
from neutron.common import exceptions as exc
|
||||
from neutron.common import utils
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
|
||||
@ -36,7 +36,7 @@ class BaseTypeDriver(api.TypeDriver):
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
self.physnet_mtus = utils.parse_mappings(
|
||||
self.physnet_mtus = helpers.parse_mappings(
|
||||
cfg.CONF.ml2.physical_network_mtus, unique_values=False
|
||||
)
|
||||
except Exception as e:
|
||||
|
@ -23,6 +23,7 @@ import sys
|
||||
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
@ -39,7 +40,6 @@ from neutron.common import config as common_config
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import profiler as setup_profiler
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.common import utils as p_utils
|
||||
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
|
||||
@ -911,7 +911,7 @@ def main():
|
||||
|
||||
common_config.setup_logging()
|
||||
try:
|
||||
interface_mappings = n_utils.parse_mappings(
|
||||
interface_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
|
||||
except ValueError as e:
|
||||
LOG.error(_LE("Parsing physical_interface_mappings failed: %s. "
|
||||
@ -920,7 +920,7 @@ def main():
|
||||
LOG.info(_LI("Interface mappings: %s"), interface_mappings)
|
||||
|
||||
try:
|
||||
bridge_mappings = n_utils.parse_mappings(
|
||||
bridge_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.LINUX_BRIDGE.bridge_mappings)
|
||||
except ValueError as e:
|
||||
LOG.error(_LE("Parsing bridge_mappings failed: %s. "
|
||||
|
@ -18,6 +18,7 @@ import os
|
||||
import sys
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
@ -29,7 +30,6 @@ from neutron.agent.linux import utils
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.common import constants as p_constants
|
||||
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
|
||||
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
|
||||
@ -174,7 +174,7 @@ class MacvtapManager(amb.CommonAgentManagerBase):
|
||||
|
||||
def parse_interface_mappings():
|
||||
try:
|
||||
interface_mappings = n_utils.parse_mappings(
|
||||
interface_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.macvtap.physical_interface_mappings)
|
||||
LOG.info(_LI("Interface mappings: %s"), interface_mappings)
|
||||
return interface_mappings
|
||||
|
@ -21,6 +21,7 @@ import sys
|
||||
import time
|
||||
|
||||
from neutron_lib import constants as n_constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
@ -36,7 +37,6 @@ from neutron.api.rpc.callbacks import resources
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import profiler as setup_profiler
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron import context
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config
|
||||
@ -414,7 +414,7 @@ class SriovNicAgentConfigParser(object):
|
||||
|
||||
Parse and validate the consistency in both mappings
|
||||
"""
|
||||
self.device_mappings = n_utils.parse_mappings(
|
||||
self.device_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.SRIOV_NIC.physical_device_mappings, unique_keys=False)
|
||||
self.exclude_devices = config.parse_exclude_devices(
|
||||
cfg.CONF.SRIOV_NIC.exclude_devices)
|
||||
|
@ -24,6 +24,7 @@ import time
|
||||
import debtcollector
|
||||
import netaddr
|
||||
from neutron_lib import constants as n_const
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
@ -49,7 +50,6 @@ from neutron.callbacks import registry
|
||||
from neutron.common import config
|
||||
from neutron.common import constants as c_const
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron import context
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.plugins.common import constants as p_const
|
||||
@ -303,7 +303,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
||||
|
||||
def _parse_bridge_mappings(self, bridge_mappings):
|
||||
try:
|
||||
return n_utils.parse_mappings(bridge_mappings)
|
||||
return helpers.parse_mappings(bridge_mappings)
|
||||
except ValueError as e:
|
||||
raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
|
||||
|
||||
|
@ -26,6 +26,7 @@ import fixtures
|
||||
import mock
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_utils import netutils
|
||||
import six
|
||||
import unittest2
|
||||
@ -193,8 +194,8 @@ class UnorderedList(list):
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, list):
|
||||
return False
|
||||
return (sorted(self, key=utils.safe_sort_key) ==
|
||||
sorted(other, key=utils.safe_sort_key))
|
||||
return (sorted(self, key=helpers.safe_sort_key) ==
|
||||
sorted(other, key=helpers.safe_sort_key))
|
||||
|
||||
def __neq__(self, other):
|
||||
return not self == other
|
||||
|
@ -17,12 +17,12 @@ import copy
|
||||
|
||||
import mock
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from neutron.agent.l2.extensions.fdb_population import (
|
||||
FdbPopulationAgentExtension)
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.ml2.drivers.linuxbridge.agent.common import (
|
||||
constants as linux_bridge_constants)
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
|
||||
@ -47,7 +47,7 @@ class FdbPopulationExtensionTestCase(base.BaseTestCase):
|
||||
self.DEVICE = self._get_existing_device()
|
||||
|
||||
def _get_existing_device(self):
|
||||
device_mappings = n_utils.parse_mappings(
|
||||
device_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.FDB.shared_physical_device_mappings, unique_keys=False)
|
||||
DEVICES = six.next(six.itervalues(device_mappings))
|
||||
return DEVICES[0]
|
||||
|
@ -23,6 +23,7 @@ import mock
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import exceptions as lib_exc
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
@ -781,8 +782,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
|
||||
self.assertIn(k, resource[res_name])
|
||||
if isinstance(keys[k], list):
|
||||
self.assertEqual(
|
||||
sorted(resource[res_name][k], key=utils.safe_sort_key),
|
||||
sorted(keys[k], key=utils.safe_sort_key))
|
||||
sorted(resource[res_name][k], key=helpers.safe_sort_key),
|
||||
sorted(keys[k], key=helpers.safe_sort_key))
|
||||
else:
|
||||
self.assertEqual(resource[res_name][k], keys[k])
|
||||
|
||||
@ -4363,8 +4364,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
||||
res['subnet']['id'])
|
||||
res = self.deserialize(self.fmt, req.get_response(self.api))
|
||||
self.assertEqual(
|
||||
sorted(res['subnet']['host_routes'], key=utils.safe_sort_key),
|
||||
sorted(host_routes, key=utils.safe_sort_key))
|
||||
sorted(res['subnet']['host_routes'],
|
||||
key=helpers.safe_sort_key),
|
||||
sorted(host_routes, key=helpers.safe_sort_key))
|
||||
self.assertEqual(dns_nameservers, res['subnet']['dns_nameservers'])
|
||||
|
||||
def _test_subnet_update_ipv4_and_ipv6_pd_subnets(self, ra_addr_mode):
|
||||
|
@ -14,11 +14,11 @@
|
||||
# under the License.
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
from webob import exc
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron import context
|
||||
from neutron.db import extraroute_db
|
||||
from neutron.extensions import extraroute
|
||||
@ -172,8 +172,8 @@ class ExtraRouteDBTestCaseBase(object):
|
||||
routes)
|
||||
self.assertEqual(
|
||||
sorted(body['router']['routes'],
|
||||
key=utils.safe_sort_key),
|
||||
sorted(routes, key=utils.safe_sort_key))
|
||||
key=helpers.safe_sort_key),
|
||||
sorted(routes, key=helpers.safe_sort_key))
|
||||
self._routes_update_cleanup(p['port']['id'],
|
||||
None, r['router']['id'], [])
|
||||
|
||||
@ -224,16 +224,16 @@ class ExtraRouteDBTestCaseBase(object):
|
||||
routes_orig)
|
||||
self.assertEqual(
|
||||
sorted(body['router']['routes'],
|
||||
key=utils.safe_sort_key),
|
||||
sorted(routes_orig, key=utils.safe_sort_key))
|
||||
key=helpers.safe_sort_key),
|
||||
sorted(routes_orig, key=helpers.safe_sort_key))
|
||||
body = self._routes_update_prepare(r['router']['id'],
|
||||
None, p['port']['id'],
|
||||
routes_left,
|
||||
skip_add=True)
|
||||
self.assertEqual(
|
||||
sorted(body['router']['routes'],
|
||||
key=utils.safe_sort_key),
|
||||
sorted(routes_left, key=utils.safe_sort_key))
|
||||
key=helpers.safe_sort_key),
|
||||
sorted(routes_left, key=helpers.safe_sort_key))
|
||||
self._routes_update_cleanup(p['port']['id'],
|
||||
None, r['router']['id'], [])
|
||||
|
||||
|
@ -18,7 +18,7 @@ import os
|
||||
import sys
|
||||
|
||||
import mock
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
from oslo_service import service
|
||||
|
||||
@ -26,7 +26,6 @@ from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
|
||||
from neutron.plugins.ml2.drivers.macvtap.agent import macvtap_neutron_agent
|
||||
from neutron.plugins.ml2.drivers.macvtap import macvtap_common
|
||||
@ -177,7 +176,7 @@ class TestMacvtapMain(base.BaseTestCase):
|
||||
def test_parse_interface_mappings_good(self):
|
||||
cfg.CONF.set_override('physical_interface_mappings', 'good_mapping',
|
||||
'macvtap')
|
||||
with mock.patch.object(n_utils, 'parse_mappings',
|
||||
with mock.patch.object(helpers, 'parse_mappings',
|
||||
return_value=INTERFACE_MAPPINGS):
|
||||
mappings = macvtap_neutron_agent.parse_interface_mappings()
|
||||
self.assertEqual(INTERFACE_MAPPINGS, mappings)
|
||||
@ -185,7 +184,7 @@ class TestMacvtapMain(base.BaseTestCase):
|
||||
def test_parse_interface_mappings_bad(self):
|
||||
cfg.CONF.set_override('physical_interface_mappings', 'bad_mapping',
|
||||
'macvtap')
|
||||
with mock.patch.object(n_utils, 'parse_mappings',
|
||||
with mock.patch.object(helpers, 'parse_mappings',
|
||||
side_effect=ValueError('bad mapping')),\
|
||||
mock.patch.object(sys, 'exit') as mock_exit:
|
||||
macvtap_neutron_agent.parse_interface_mappings()
|
||||
|
@ -13,10 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config
|
||||
from neutron.plugins.ml2.drivers.mech_sriov.agent \
|
||||
import sriov_nic_agent as agent
|
||||
@ -61,7 +60,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
|
||||
cfg.CONF.set_override('physical_device_mappings',
|
||||
self.DEVICE_MAPPING_LIST,
|
||||
'SRIOV_NIC')
|
||||
device_mappings = n_utils.parse_mappings(
|
||||
device_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.SRIOV_NIC.physical_device_mappings, unique_keys=False)
|
||||
self.assertEqual(self.DEVICE_MAPPING, device_mappings)
|
||||
|
||||
@ -69,7 +68,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
|
||||
cfg.CONF.set_override('physical_device_mappings',
|
||||
self.DEVICE_MAPPING_WITH_ERROR_LIST,
|
||||
'SRIOV_NIC')
|
||||
self.assertRaises(ValueError, n_utils.parse_mappings,
|
||||
self.assertRaises(ValueError, helpers.parse_mappings,
|
||||
cfg.CONF.SRIOV_NIC.physical_device_mappings,
|
||||
unique_keys=False)
|
||||
|
||||
@ -77,7 +76,7 @@ class TestSriovAgentConfig(base.BaseTestCase):
|
||||
cfg.CONF.set_override('physical_device_mappings',
|
||||
self.DEVICE_MAPPING_WITH_SPACES_LIST,
|
||||
'SRIOV_NIC')
|
||||
device_mappings = n_utils.parse_mappings(
|
||||
device_mappings = helpers.parse_mappings(
|
||||
cfg.CONF.SRIOV_NIC.physical_device_mappings, unique_keys=False)
|
||||
self.assertEqual(self.DEVICE_MAPPING, device_mappings)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user