Preserve DVR FIP rule priority over Agent restarts

IP rule priorities assigned to DVR floating IPs need
to be preserved over L3 agent restarts. Reuse
the ItemAllocator class decomposed from Link Local IP
address allocation.  Also move commn unit tests to
ItemAllocator class.

Closes-Bug: #1414779
Change-Id: I6a75aa8ad612ee80b391f0a27a8a7e29519c3f8d
Co-Authored-By: Rajeev Grover <rajeev.grover@hp.com>
Co-Authored-By: Ryan Moats <rmoats@us.ibm.com>
This commit is contained in:
Adolfo Duarte 2015-06-18 19:50:13 -07:00 committed by Ryan Moats
parent 0cdff4f7cc
commit de81ab8385
8 changed files with 244 additions and 86 deletions

View File

@ -14,13 +14,13 @@
import os
from oslo_log import log as logging
from neutron.agent.l3 import fip_rule_priority_allocator as frpa
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import utils as common_utils
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
@ -49,7 +49,10 @@ class FipNamespace(namespaces.Namespace):
self.use_ipv6 = use_ipv6
self.agent_gateway_port = None
self._subscribers = set()
self._rule_priorities = set(range(FIP_PR_START, FIP_PR_END))
path = os.path.join(agent_conf.state_path, 'fip-priorities')
self._rule_priorities = frpa.FipRulePriorityAllocator(path,
FIP_PR_START,
FIP_PR_END)
self._iptables_manager = iptables_manager.IptablesManager(
namespace=self.get_name(),
use_ipv6=self.use_ipv6)
@ -85,11 +88,11 @@ class FipNamespace(namespaces.Namespace):
self._subscribers.discard(router_id)
return not self.has_subscribers()
def allocate_rule_priority(self):
return self._rule_priorities.pop()
def allocate_rule_priority(self, floating_ip):
return self._rule_priorities.allocate(floating_ip)
def deallocate_rule_priority(self, rule_pr):
self._rule_priorities.add(rule_pr)
def deallocate_rule_priority(self, floating_ip):
self._rule_priorities.release(floating_ip)
def _gateway_added(self, ex_gw_port, interface_name):
"""Add Floating IP gateway port."""
@ -232,4 +235,8 @@ class FipNamespace(namespaces.Namespace):
existing_cidrs = [addr['cidr'] for addr in device.addr.list()]
fip_cidrs = [c for c in existing_cidrs if
common_utils.is_cidr_host(c)]
for fip_cidr in fip_cidrs:
fip_ip = fip_cidr.split('/')[0]
rule_pr = self._rule_priorities.allocate(fip_ip)
ri.floating_ips_dict[fip_ip] = rule_pr
ri.dist_fip_count = len(fip_cidrs)

View File

@ -74,7 +74,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
"""Add floating IP to FIP namespace."""
floating_ip = fip['floating_ip_address']
fixed_ip = fip['fixed_ip_address']
rule_pr = self.fip_ns.allocate_rule_priority()
rule_pr = self.fip_ns.allocate_rule_priority(floating_ip)
self.floating_ips_dict[floating_ip] = rule_pr
fip_2_rtr_name = self.fip_ns.get_int_device_name(self.router_id)
ip_rule = ip_lib.IPRule(namespace=self.ns_name)
@ -113,7 +113,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
ip_rule.rule.delete(ip=floating_ip,
table=dvr_fip_ns.FIP_RT_TBL,
priority=rule_pr)
self.fip_ns.deallocate_rule_priority(rule_pr)
self.fip_ns.deallocate_rule_priority(floating_ip)
#TODO(rajeev): Handle else case - exception/log?
device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)

View File

@ -0,0 +1,53 @@
# Copyright 2015 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.l3.item_allocator import ItemAllocator
class FipPriority(object):
def __init__(self, index):
self.index = index
def __repr__(self):
return str(self.index)
def __hash__(self):
return hash(self.__repr__())
def __eq__(self, other):
if isinstance(other, FipPriority):
return (self.index == other.index)
else:
return False
class FipRulePriorityAllocator(ItemAllocator):
"""Manages allocation of floating ips rule priorities.
IP rule priorities assigned to DVR floating IPs need
to be preserved over L3 agent restarts.
This class provides an allocator which saves the prirorities
to a datastore which will survive L3 agent restarts.
"""
def __init__(self, data_store_path, priority_rule_start,
priority_rule_end):
"""Create the necessary pool and create the item allocator
using ',' as the delimiter and FipRulePriorityAllocator as the
class type
"""
pool = set(FipPriority(str(s)) for s in range(priority_rule_start,
priority_rule_end))
super(FipRulePriorityAllocator, self).__init__(data_store_path,
FipPriority,
pool)

View File

@ -1285,6 +1285,31 @@ class TestDvrRouter(L3AgentTestFramework):
self._assert_dvr_snat_gateway(router1)
self.assertFalse(self._namespace_exists(fip_ns))
def test_dvr_router_add_fips_on_restarted_agent(self):
self.agent.conf.agent_mode = 'dvr'
router_info = self.generate_dvr_router_info()
router = self.manage_router(self.agent, router_info)
floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
router_ns = router.ns_name
fip_rule_prio_1 = self._get_fixed_ip_rule_priority(
router_ns, floating_ips[0]['fixed_ip_address'])
restarted_agent = neutron_l3_agent.L3NATAgent(
self.agent.host, self.agent.conf)
floating_ips[0]['floating_ip_address'] = '21.4.4.2'
floating_ips[0]['fixed_ip_address'] = '10.0.0.2'
self.manage_router(restarted_agent, router_info)
fip_rule_prio_2 = self._get_fixed_ip_rule_priority(
router_ns, floating_ips[0]['fixed_ip_address'])
self.assertNotEqual(fip_rule_prio_1, fip_rule_prio_2)
def _get_fixed_ip_rule_priority(self, namespace, fip):
iprule = ip_lib.IPRule(namespace)
lines = iprule.rule._as_root([4], ['show']).splitlines()
for line in lines:
if fip in line:
info = iprule.rule._parse_line(4, line)
return info['priority']
def test_dvr_router_add_internal_network_set_arp_cache(self):
# Check that, when the router is set up and there are
# existing ports on the the uplinked subnet, the ARP

View File

@ -57,13 +57,15 @@ class TestDvrFipNs(base.BaseTestCase):
self.assertFalse(is_last)
def test_allocate_rule_priority(self):
pr = self.fip_ns.allocate_rule_priority()
self.assertNotIn(pr, self.fip_ns._rule_priorities)
pr = self.fip_ns.allocate_rule_priority('20.0.0.30')
self.assertIn('20.0.0.30', self.fip_ns._rule_priorities.allocations)
self.assertNotIn(pr, self.fip_ns._rule_priorities.pool)
def test_deallocate_rule_priority(self):
pr = self.fip_ns.allocate_rule_priority()
self.fip_ns.deallocate_rule_priority(pr)
self.assertIn(pr, self.fip_ns._rule_priorities)
pr = self.fip_ns.allocate_rule_priority('20.0.0.30')
self.fip_ns.deallocate_rule_priority('20.0.0.30')
self.assertNotIn('20.0.0.30', self.fip_ns._rule_priorities.allocations)
self.assertIn(pr, self.fip_ns._rule_priorities.pool)
@mock.patch.object(ip_lib, 'IPWrapper')
@mock.patch.object(ip_lib, 'IPDevice')
@ -179,6 +181,7 @@ class TestDvrFipNs(base.BaseTestCase):
device_exists.return_value = True
ri = mock.Mock()
ri.dist_fip_count = None
ri.floating_ips_dict = {}
ip_list = [{'cidr': '111.2.3.4/32'}, {'cidr': '111.2.3.5/32'}]
self._test_scan_fip_ports(ri, ip_list)
self.assertEqual(2, ri.dist_fip_count)
@ -188,6 +191,7 @@ class TestDvrFipNs(base.BaseTestCase):
device_exists.return_value = True
ri = mock.Mock()
ri.dist_fip_count = None
ri.floating_ips_dict = {}
self._test_scan_fip_ports(ri, [])
self.assertEqual(0, ri.dist_fip_count)

View File

@ -0,0 +1,61 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.l3 import fip_rule_priority_allocator as frpa
from neutron.tests import base
class TestFipPriority(base.BaseTestCase):
def setUp(self):
super(TestFipPriority, self).setUp()
def test__init__(self):
test_pr = frpa.FipPriority(10)
self.assertEqual(10, test_pr.index)
def test__repr__(self):
test_pr = frpa.FipPriority(20)
self.assertEqual("20", str(test_pr))
def test__eq__(self):
left_pr = frpa.FipPriority(10)
right_pr = frpa.FipPriority(10)
other_pr = frpa.FipPriority(20)
self.assertEqual(left_pr, right_pr)
self.assertNotEqual(left_pr, other_pr)
self.assertNotEqual(right_pr, other_pr)
def test__hash__(self):
left_pr = frpa.FipPriority(10)
right_pr = frpa.FipPriority(10)
other_pr = frpa.FipPriority(20)
self.assertEqual(hash(left_pr), hash(right_pr))
self.assertNotEqual(hash(left_pr), hash(other_pr))
self.assertNotEqual(hash(other_pr), hash(right_pr))
class TestFipRulePriorityAllocator(base.BaseTestCase):
def setUp(self):
super(TestFipRulePriorityAllocator, self).setUp()
self.priority_rule_start = 100
self.priority_rule_end = 200
self.data_store_path = '/data_store_path_test'
def test__init__(self):
_frpa = frpa.FipRulePriorityAllocator(self.data_store_path,
self.priority_rule_start,
self.priority_rule_end)
self.assertEqual(self.data_store_path, _frpa.state_file)
self.assertEqual(frpa.FipPriority, _frpa.ItemClass)
self.assertEqual(100, len(_frpa.pool))

View File

@ -12,18 +12,93 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.l3 import item_allocator as ia
from neutron.tests import base
class TestObject(object):
def __init__(self, value):
super(TestObject, self).__init__()
self._value = value
def __str__(self):
return str(self._value)
class TestItemAllocator(base.BaseTestCase):
def setUp(self):
super(TestItemAllocator, self).setUp()
def test__init__(self):
test_pool = set(s for s in range(32768, 40000))
a = ia.ItemAllocator('/file', object, test_pool)
self.assertEqual('/file', a.state_file)
test_pool = set(TestObject(s) for s in range(32768, 40000))
with mock.patch.object(ia.ItemAllocator, '_write') as write:
a = ia.ItemAllocator('/file', TestObject, test_pool)
test_object = a.allocate('test')
self.assertTrue('test' in a.allocations)
self.assertTrue(test_object in a.allocations.values())
self.assertTrue(test_object not in a.pool)
self.assertTrue(write.called)
def test__init__readfile(self):
test_pool = set(TestObject(s) for s in range(32768, 40000))
with mock.patch.object(ia.ItemAllocator, '_read') as read:
read.return_value = ["da873ca2,10\n"]
a = ia.ItemAllocator('/file', TestObject, test_pool)
self.assertTrue('da873ca2' in a.remembered)
self.assertEqual({}, a.allocations)
self.assertEqual(object, a.ItemClass)
self.assertEqual(test_pool, a.pool)
def test_allocate(self):
test_pool = set([TestObject(33000), TestObject(33001)])
a = ia.ItemAllocator('/file', TestObject, test_pool)
with mock.patch.object(ia.ItemAllocator, '_write') as write:
test_object = a.allocate('test')
self.assertTrue('test' in a.allocations)
self.assertTrue(test_object in a.allocations.values())
self.assertTrue(test_object not in a.pool)
self.assertTrue(write.called)
def test_allocate_from_file(self):
test_pool = set([TestObject(33000), TestObject(33001)])
with mock.patch.object(ia.ItemAllocator, '_read') as read:
read.return_value = ["deadbeef,33000\n"]
a = ia.ItemAllocator('/file', TestObject, test_pool)
with mock.patch.object(ia.ItemAllocator, '_write') as write:
t_obj = a.allocate('deadbeef')
self.assertEqual('33000', t_obj._value)
self.assertTrue('deadbeef' in a.allocations)
self.assertTrue(t_obj in a.allocations.values())
self.assertTrue(33000 not in a.pool)
self.assertFalse(write.called)
def test_allocate_exhausted_pool(self):
test_pool = set([TestObject(33000)])
with mock.patch.object(ia.ItemAllocator, '_read') as read:
read.return_value = ["deadbeef,33000\n"]
a = ia.ItemAllocator('/file', TestObject, test_pool)
with mock.patch.object(ia.ItemAllocator, '_write') as write:
allocation = a.allocate('abcdef12')
self.assertFalse('deadbeef' in a.allocations)
self.assertTrue(allocation not in a.pool)
self.assertTrue(write.called)
def test_release(self):
test_pool = set([TestObject(33000), TestObject(33001)])
with mock.patch.object(ia.ItemAllocator, '_write') as write:
a = ia.ItemAllocator('/file', TestObject, test_pool)
allocation = a.allocate('deadbeef')
write.reset_mock()
a.release('deadbeef')
self.assertTrue('deadbeef' not in a.allocations)
self.assertTrue(allocation in a.pool)
self.assertEqual({}, a.allocations)
write.assert_called_once_with([])

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import netaddr
from neutron.agent.l3 import link_local_allocator as lla
@ -28,69 +27,3 @@ class TestLinkLocalAddrAllocator(base.BaseTestCase):
a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
self.assertEqual('/file', a.state_file)
self.assertEqual({}, a.allocations)
def test__init__readfile(self):
with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
read.return_value = ["da873ca2,169.254.31.28/31\n"]
a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
self.assertTrue('da873ca2' in a.remembered)
self.assertEqual({}, a.allocations)
def test_allocate(self):
a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
subnet = a.allocate('deadbeef')
self.assertTrue('deadbeef' in a.allocations)
self.assertTrue(subnet not in a.pool)
self._check_allocations(a.allocations)
write.assert_called_once_with(['deadbeef,%s\n' % subnet.cidr])
def test_allocate_from_file(self):
with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
read.return_value = ["deadbeef,169.254.31.88/31\n"]
a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
subnet = a.allocate('deadbeef')
self.assertEqual(netaddr.IPNetwork('169.254.31.88/31'), subnet)
self.assertTrue(subnet not in a.pool)
self._check_allocations(a.allocations)
self.assertFalse(write.called)
def test_allocate_exhausted_pool(self):
subnet = netaddr.IPNetwork('169.254.31.0/31')
with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
read.return_value = ["deadbeef,169.254.31.0/31\n"]
a = lla.LinkLocalAllocator('/file', subnet.cidr)
with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
allocation = a.allocate('abcdef12')
self.assertEqual(subnet, allocation)
self.assertFalse('deadbeef' in a.allocations)
self.assertTrue('abcdef12' in a.allocations)
self.assertTrue(allocation not in a.pool)
self._check_allocations(a.allocations)
write.assert_called_once_with(['abcdef12,%s\n' % allocation.cidr])
self.assertRaises(RuntimeError, a.allocate, 'deadbeef')
def test_release(self):
with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
subnet = a.allocate('deadbeef')
write.reset_mock()
a.release('deadbeef')
self.assertTrue('deadbeef' not in a.allocations)
self.assertTrue(subnet in a.pool)
self.assertEqual({}, a.allocations)
write.assert_called_once_with([])
def _check_allocations(self, allocations):
for key, subnet in allocations.items():
self.assertTrue(subnet in self.subnet)
self.assertEqual(subnet.prefixlen, 31)