Merge "Use sets to calculate added/original/removed ips"

This commit is contained in:
Jenkins 2015-06-30 01:38:10 +00:00 committed by Gerrit Code Review
commit 876ecb0305
2 changed files with 114 additions and 35 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import collections
import itertools
import netaddr
from oslo_config import cfg
@ -316,6 +317,15 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
pool=pool_range,
ip_address=gateway_ip)
def _is_ip_required_by_subnet(self, context, subnet_id, device_owner):
# For ports that are not router ports, retain any automatic
# (non-optional, e.g. IPv6 SLAAC) addresses.
if device_owner in constants.ROUTER_INTERFACE_OWNERS:
return True
subnet = self._get_subnet(context, subnet_id)
return not ipv6_utils.is_auto_address_subnet(subnet)
def _get_changed_ips_for_port(self, context, original_ips,
new_ips, device_owner):
"""Calculate changes in IPs for the port."""
@ -324,30 +334,44 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
msg = _('Exceeded maximum amount of fixed ips per port')
raise n_exc.InvalidInput(error_message=msg)
# These ips are still on the port and haven't been removed
prev_ips = []
add_ips = []
remove_ips = []
ips_map = {ip['ip_address']: ip
for ip in itertools.chain(new_ips, original_ips)
if 'ip_address' in ip}
# Remove all of the intersecting elements
for original_ip in original_ips[:]:
for new_ip in new_ips[:]:
if ('ip_address' in new_ip and
original_ip['ip_address'] == new_ip['ip_address']):
original_ips.remove(original_ip)
new_ips.remove(new_ip)
prev_ips.append(original_ip)
break
new = set()
for ip in new_ips:
if 'ip_address' in ip:
new.add(ip['ip_address'])
else:
# For ports that are not router ports, retain any automatic
# (non-optional, e.g. IPv6 SLAAC) addresses.
if device_owner not in constants.ROUTER_INTERFACE_OWNERS:
subnet = self._get_subnet(context,
original_ip['subnet_id'])
if (ipv6_utils.is_auto_address_subnet(subnet)):
original_ips.remove(original_ip)
prev_ips.append(original_ip)
return self.Changes(add=new_ips,
add_ips.append(ip)
# Convert original ip addresses to sets
orig = set(ip['ip_address'] for ip in original_ips)
add = new - orig
unchanged = new & orig
remove = orig - new
# Convert results back to list of dicts
add_ips += [ips_map[ip] for ip in add]
prev_ips = [ips_map[ip] for ip in unchanged]
# Mark ip for removing if it is not found in new_ips
# and subnet requires ip to be set manually.
# For auto addresses leave ip unchanged
for ip in remove:
subnet_id = ips_map[ip]['subnet_id']
if self._is_ip_required_by_subnet(context, subnet_id,
device_owner):
remove_ips.append(ips_map[ip])
else:
prev_ips.append(ips_map[ip])
return self.Changes(add=add_ips,
original=prev_ips,
remove=original_ips)
remove=remove_ips)
def _delete_port(self, context, port_id):
query = (context.session.query(models_v2.Port).

View File

@ -30,11 +30,23 @@ class TestIpamBackendMixin(base.BaseTestCase):
('id-2', '192.168.1.2'))
self.default_original_ips = (('id-1', '192.168.1.1'),
('id-5', '172.20.16.5'))
self.owner_non_router = constants.DEVICE_OWNER_DHCP
self.owner_router = constants.DEVICE_OWNER_ROUTER_INTF
def _prepare_ips(self, ips):
return [{'ip_address': ip[1],
'subnet_id': ip[0]} for ip in ips]
def _mock_slaac_subnet_on(self):
slaac_subnet = {'ipv6_address_mode': constants.IPV6_SLAAC,
'ipv6_ra_mode': constants.IPV6_SLAAC}
self.mixin._get_subnet = mock.Mock(return_value=slaac_subnet)
def _mock_slaac_subnet_off(self):
non_slaac_subnet = {'ipv6_address_mode': None,
'ipv6_ra_mode': None}
self.mixin._get_subnet = mock.Mock(return_value=non_slaac_subnet)
def _test_get_changed_ips_for_port(self, expected_change, original_ips,
new_ips, owner):
change = self.mixin._get_changed_ips_for_port(self.ctx,
@ -44,36 +56,79 @@ class TestIpamBackendMixin(base.BaseTestCase):
self.assertEqual(expected_change, change)
def test__get_changed_ips_for_port(self):
owner_router = constants.DEVICE_OWNER_ROUTER_INTF
new_ips = self._prepare_ips(self.default_new_ips)
original_ips = self._prepare_ips(self.default_original_ips)
# generate changes before calling _get_changed_ips_for_port
# because new_ips and original_ips are affected during call
expected_change = self.mixin.Changes(add=[new_ips[1]],
original=[original_ips[0]],
remove=[original_ips[1]])
self._test_get_changed_ips_for_port(expected_change, original_ips,
new_ips, owner_router)
new_ips, self.owner_router)
def test__get_changed_ips_for_port_autoaddress(self):
owner_not_router = constants.DEVICE_OWNER_DHCP
new_ips = self._prepare_ips(self.default_new_ips)
original = (('id-1', '192.168.1.1'),
('id-5', '2000:1234:5678::12FF:FE34:5678'))
original_ips = self._prepare_ips(original)
# mock to test auto address part
slaac_subnet = {'ipv6_address_mode': constants.IPV6_SLAAC,
'ipv6_ra_mode': constants.IPV6_SLAAC}
self.mixin._get_subnet = mock.Mock(return_value=slaac_subnet)
self._mock_slaac_subnet_on()
# make a copy of original_ips
# since it is changed by _get_changed_ips_for_port
expected_change = self.mixin.Changes(add=[new_ips[1]],
original=original_ips[:],
original=original_ips,
remove=[])
self._test_get_changed_ips_for_port(expected_change, original_ips,
new_ips, owner_not_router)
new_ips, self.owner_non_router)
def _test_get_changed_ips_for_port_no_ip_address(self):
# IP address should be added if only subnet_id is provided,
# independently from auto_address status for subnet
new_ips = [{'subnet_id': 'id-3'}]
original_ips = []
expected_change = self.mixin.Changes(add=[new_ips[0]],
original=[],
remove=[])
self._test_get_changed_ips_for_port(expected_change, original_ips,
new_ips, self.owner_non_router)
def test__get_changed_ips_for_port_no_ip_address_no_slaac(self):
self._mock_slaac_subnet_off()
self._test_get_changed_ips_for_port_no_ip_address()
def test__get_changed_ips_for_port_no_ip_address_slaac(self):
self._mock_slaac_subnet_on()
self._test_get_changed_ips_for_port_no_ip_address()
def test__is_ip_required_by_subnet_for_router_port(self):
# Owner -> router:
# _get_subnet should not be called,
# expected True
self._mock_slaac_subnet_off()
result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
self.owner_router)
self.assertTrue(result)
self.assertFalse(self.mixin._get_subnet.called)
def test__is_ip_required_by_subnet_for_non_router_port(self):
# Owner -> not router:
# _get_subnet should be called,
# expected True, because subnet is not slaac
self._mock_slaac_subnet_off()
result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
self.owner_non_router)
self.assertTrue(result)
self.assertTrue(self.mixin._get_subnet.called)
def test__is_ip_required_by_subnet_for_non_router_port_and_slaac(self):
# Owner -> not router:
# _get_subnet should be called,
# expected False, because subnet is slaac
self._mock_slaac_subnet_on()
result = self.mixin._is_ip_required_by_subnet(self.ctx, 'id',
self.owner_non_router)
self.assertFalse(result)
self.assertTrue(self.mixin._get_subnet.called)