Merge "Cache the ARP entries in L3 Agent for DVR"

This commit is contained in:
Jenkins 2015-10-19 19:40:26 +00:00 committed by Gerrit Code Review
commit d57c546f1f
3 changed files with 109 additions and 7 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import binascii
import collections
import netaddr
from oslo_log import log as logging
@ -25,12 +26,16 @@ from neutron.agent.linux import ip_lib
from neutron.common import constants as l3_constants
from neutron.common import exceptions
from neutron.common import utils as common_utils
from neutron.i18n import _LE
from neutron.i18n import _LE, _LW
LOG = logging.getLogger(__name__)
# xor-folding mask used for IPv6 rule index
MASK_30 = 0x3fffffff
# Tracks the arp entry cache
Arp_entry = collections.namedtuple(
'Arp_entry', 'ip mac subnet_id operation')
class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def __init__(self, agent, host, *args, **kwargs):
@ -41,6 +46,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
self.rtr_fip_subnet = None
self.dist_fip_count = None
self.fip_ns = None
self._pending_arp_set = set()
def get_floating_ips(self):
"""Filter Floating IPs to be hosted on this agent."""
@ -173,21 +179,65 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
if f['subnet_id'] == subnet_id:
return port
def _cache_arp_entry(self, ip, mac, subnet_id, operation):
"""Cache the arp entries if device not ready."""
arp_entry_tuple = Arp_entry(ip=ip,
mac=mac,
subnet_id=subnet_id,
operation=operation)
self._pending_arp_set.add(arp_entry_tuple)
def _process_arp_cache_for_internal_port(self, subnet_id):
"""Function to process the cached arp entries."""
arp_remove = set()
for arp_entry in self._pending_arp_set:
if subnet_id == arp_entry.subnet_id:
try:
state = self._update_arp_entry(
arp_entry.ip, arp_entry.mac,
arp_entry.subnet_id, arp_entry.operation)
except Exception:
state = False
if state:
# If the arp update was successful, then
# go ahead and add it to the remove set
arp_remove.add(arp_entry)
self._pending_arp_set -= arp_remove
def _delete_arp_cache_for_internal_port(self, subnet_id):
"""Function to delete the cached arp entries."""
arp_delete = set()
for arp_entry in self._pending_arp_set:
if subnet_id == arp_entry.subnet_id:
arp_delete.add(arp_entry)
self._pending_arp_set -= arp_delete
def _update_arp_entry(self, ip, mac, subnet_id, operation):
"""Add or delete arp entry into router namespace for the subnet."""
port = self._get_internal_port(subnet_id)
# update arp entry only if the subnet is attached to the router
if not port:
return
return False
try:
# TODO(mrsmith): optimize the calls below for bulk calls
interface_name = self.get_internal_device_name(port['id'])
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
if operation == 'add':
device.neigh.add(ip, mac)
elif operation == 'delete':
device.neigh.delete(ip, mac)
if device.exists():
if operation == 'add':
device.neigh.add(ip, mac)
elif operation == 'delete':
device.neigh.delete(ip, mac)
return True
else:
if operation == 'add':
LOG.warn(_LW("Device %s does not exist so ARP entry "
"cannot be updated, will cache information "
"to be applied later when the device exists"),
device)
self._cache_arp_entry(ip, mac, subnet_id, operation)
return False
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("DVR: Failed updating arp entry"))
@ -205,6 +255,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
p['mac_address'],
subnet_id,
'add')
self._process_arp_cache_for_internal_port(subnet_id)
@staticmethod
def _get_snat_idx(ip_cidr):
@ -323,6 +374,9 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
# DVR handling code for SNAT
interface_name = self.get_internal_device_name(port['id'])
self._snat_redirect_remove(sn_port, port, interface_name)
# Clean up the cached arp entries related to the port subnet
for subnet in port['subnets']:
self._delete_arp_cache_for_internal_port(subnet)
def internal_network_removed(self, port):
self._dvr_internal_network_removed(port)

View File

@ -361,8 +361,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.device_exists.return_value = False
ri.get_snat_port_for_internal_port = mock.Mock(
return_value=sn_port)
ri._delete_arp_cache_for_internal_port = mock.Mock()
ri._snat_redirect_modify = mock.Mock()
ri.internal_network_removed(port)
self.assertEqual(
1, ri._delete_arp_cache_for_internal_port.call_count)
ri._snat_redirect_modify.assert_called_with(
sn_port, port,
ri.get_internal_device_name(port['id']),

View File

@ -346,7 +346,10 @@ class TestDvrRouterOperations(base.BaseTestCase):
# Test basic case
ports[0]['subnets'] = [{'id': subnet_id,
'cidr': '1.2.3.0/24'}]
ri._set_subnet_arp_info(subnet_id)
with mock.patch.object(ri,
'_process_arp_cache_for_internal_port') as parp:
ri._set_subnet_arp_info(subnet_id)
self.assertEqual(1, parp.call_count)
self.mock_ip_dev.neigh.add.assert_called_once_with(
'1.2.3.4', '00:11:22:33:44:55')
@ -395,6 +398,48 @@ class TestDvrRouterOperations(base.BaseTestCase):
ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add')
self.assertFalse(f.call_count)
def _setup_test_for_arp_entry_cache(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = l3_test_common.prepare_router_data(num_internal_ports=2)
router['distributed'] = True
ri = dvr_router.DvrLocalRouter(
agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
subnet_id = l3_test_common.get_subnet_id(
ri.router[l3_constants.INTERFACE_KEY][0])
return ri, subnet_id
def test__update_arp_entry_calls_arp_cache_with_no_device(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
state = True
with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\
mock.patch.object(ri, '_cache_arp_entry') as arp_cache:
rtrdev.return_value.exists.return_value = False
state = ri._update_arp_entry(
mock.ANY, mock.ANY, subnet_id, 'add')
self.assertFalse(state)
self.assertTrue(arp_cache.called)
arp_cache.assert_called_once_with(mock.ANY, mock.ANY,
subnet_id, 'add')
self.assertFalse(rtrdev.neigh.add.called)
def test__process_arp_cache_for_internal_port(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
subnet_id, 'add')
self.assertEqual(1, len(ri._pending_arp_set))
with mock.patch.object(ri, '_update_arp_entry') as update_arp:
update_arp.return_value = True
ri._process_arp_cache_for_internal_port(subnet_id)
self.assertEqual(0, len(ri._pending_arp_set))
def test__delete_arp_cache_for_internal_port(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
subnet_id, 'add')
self.assertEqual(1, len(ri._pending_arp_set))
ri._delete_arp_cache_for_internal_port(subnet_id)
self.assertEqual(0, len(ri._pending_arp_set))
def test_del_arp_entry(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = l3_test_common.prepare_router_data(num_internal_ports=2)