Fix callers of get_devices_with_ip() to pass addresses

If callers of get_devices_with_ip(), or
device.addr.list(to=address) pass an ip_cidr, it
could match any ip_cidr in that range on the interface.
Callers need to pass the IP without the prefix portion in
order to match it exactly.  Added a helper utility to
strip the cidr part from a ip_cidr.

Determined the unit test for this can't actually check
this case since we are mocking the return value from
/sbin/ip, so modified it to just make sure the dict
is correct.

Added a functional test that adds two IP addresses in
the same IP range to verify that we actually filter
correctly when a 'to=IP' is specified.

Change-Id: I3a95b3bb72a43f322ad23892d8959398aac22a1c
Closes-bug: #1728080
(cherry picked from commit 7b8289253c355f39ac84bc6ea36d265a8be0e381)
This commit is contained in:
Brian Haley 2017-10-27 14:01:52 -04:00 committed by Stefan Nica
parent adc344c065
commit 15d843a2ca
7 changed files with 48 additions and 7 deletions
neutron
agent
common
plugins/ml2/drivers/linuxbridge/agent
tests
functional/agent/linux
unit/agent/linux

@ -355,7 +355,8 @@ class FipNamespace(namespaces.Namespace):
ipd.route.delete_gateway(current_gateway.get('gateway'))
def _add_cidr_to_device(self, device, ip_cidr):
if not device.addr.list(to=ip_cidr):
to = common_utils.cidr_to_ip(ip_cidr)
if not device.addr.list(to=to):
device.addr.add(ip_cidr, add_broadcast=False)
def delete_rtr_2_fip_link(self, ri):

@ -301,7 +301,8 @@ class HaRouter(router.RouterInfo):
def remove_floating_ip(self, device, ip_cidr):
self._remove_vip(ip_cidr)
if device.addr.list(to=ip_cidr):
to = common_utils.cidr_to_ip(ip_cidr)
if device.addr.list(to=to):
super(HaRouter, self).remove_floating_ip(device, ip_cidr)
def internal_network_updated(self, interface_name, ip_cidrs, mtu):

@ -601,8 +601,13 @@ class IpAddrCommand(IpDeviceCommandBase):
filters=None, ip_version=None):
"""Get a list of all the devices with an IP attached in the namespace.
@param name: if it's not None, only a device with that matching name
:param name: if it's not None, only a device with that matching name
will be returned.
:param scope: address scope, for example, global, link, or host
:param to: IP address or cidr to match. If cidr then it will match
any IP within the specified subnet
:param filters: list of any other filters supported by /sbin/ip
:param ip_version: 4 or 6
"""
options = [ip_version] if ip_version else []

@ -243,6 +243,15 @@ def ip_to_cidr(ip, prefix=None):
return str(net)
def cidr_to_ip(ip_cidr):
"""Strip the cidr notation from an ip cidr or ip
:param ip_cidr: An ipv4 or ipv6 address, with or without cidr notation
"""
net = netaddr.IPNetwork(ip_cidr)
return str(net.ip)
def fixed_ip_cidrs(fixed_ips):
"""Create a list of a port's fixed IPs in cidr notation.

@ -358,7 +358,8 @@ class LinuxBridgeManager(amb.CommonAgentManagerBase):
for ip in ips:
# If bridge ip address already exists, then don't add
# otherwise will report error
if not dst_device.addr.list(to=ip['cidr']):
to = utils.cidr_to_ip(ip['cidr'])
if not dst_device.addr.list(to=to):
dst_device.addr.add(cidr=ip['cidr'])
if gateway:

@ -37,6 +37,7 @@ Device = collections.namedtuple('Device',
WRONG_IP = '0.0.0.0'
TEST_IP = '240.0.0.1'
TEST_IP_NEIGH = '240.0.0.2'
TEST_IP_SECONDARY = '240.0.0.3'
class IpLibTestFramework(functional_base.BaseSudoTestCase):
@ -132,10 +133,23 @@ class IpLibTestCase(IpLibTestFramework):
self.assertIsNone(ip_wrapper.get_device_by_ip(ip=None))
def test_ipwrapper_get_device_by_ip(self):
attr = self.generate_device_details()
# We need to pass both IP and cidr values to get_device_by_ip()
# to make sure it filters correctly.
test_ip = "%s/24" % TEST_IP
test_ip_secondary = "%s/24" % TEST_IP_SECONDARY
attr = self.generate_device_details(
ip_cidrs=[test_ip, test_ip_secondary]
)
self.manage_device(attr)
ip_wrapper = ip_lib.IPWrapper(namespace=attr.namespace)
self.assertEqual(attr.name, ip_wrapper.get_device_by_ip(TEST_IP).name)
self.assertEqual(attr.name,
ip_wrapper.get_device_by_ip(TEST_IP_SECONDARY).name)
self.assertIsNone(ip_wrapper.get_device_by_ip(TEST_IP_NEIGH))
# this is in the same subnet, so will match if we pass as cidr
test_ip_neigh = "%s/24" % TEST_IP_NEIGH
self.assertEqual(attr.name,
ip_wrapper.get_device_by_ip(test_ip_neigh).name)
self.assertIsNone(ip_wrapper.get_device_by_ip(WRONG_IP))
def test_device_exists_with_ips_and_mac(self):

@ -947,10 +947,20 @@ class TestIpAddrCommand(TestIPCmdBase):
'global'))
def test_get_devices_with_ip(self):
# This can only verify that get_devices_with_ip() returns a dict
# with the correct entry, it doesn't actually test that it only
# returns items filtered by the arguments since it isn't calling
# /sbin/ip at all.
self.parent._run.return_value = ADDR_SAMPLE3
devices = self.addr_cmd.get_devices_with_ip('172.16.77.240/24')
devices = self.addr_cmd.get_devices_with_ip(to='172.16.77.240/24')
self.assertEqual(1, len(devices))
self.assertEqual('eth0', devices[0]['name'])
expected = {'cidr': '172.16.77.240/24',
'dadfailed': False,
'dynamic': False,
'name': 'eth0',
'scope': 'global',
'tentative': False}
self.assertEqual(expected, devices[0])
class TestIpRouteCommand(TestIPCmdBase):