Merge "Cache the ARP entries in L3 Agent for DVR" into stable/liberty
This commit is contained in:
commit
99febcca47
|
@ -13,6 +13,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import binascii
|
import binascii
|
||||||
|
import collections
|
||||||
import netaddr
|
import netaddr
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
@ -25,12 +26,16 @@ from neutron.agent.linux import ip_lib
|
||||||
from neutron.common import constants as l3_constants
|
from neutron.common import constants as l3_constants
|
||||||
from neutron.common import exceptions
|
from neutron.common import exceptions
|
||||||
from neutron.common import utils as common_utils
|
from neutron.common import utils as common_utils
|
||||||
from neutron.i18n import _LE
|
from neutron.i18n import _LE, _LW
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
# xor-folding mask used for IPv6 rule index
|
# xor-folding mask used for IPv6 rule index
|
||||||
MASK_30 = 0x3fffffff
|
MASK_30 = 0x3fffffff
|
||||||
|
|
||||||
|
# Tracks the arp entry cache
|
||||||
|
Arp_entry = collections.namedtuple(
|
||||||
|
'Arp_entry', 'ip mac subnet_id operation')
|
||||||
|
|
||||||
|
|
||||||
class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||||
def __init__(self, agent, host, *args, **kwargs):
|
def __init__(self, agent, host, *args, **kwargs):
|
||||||
|
@ -41,6 +46,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||||
self.rtr_fip_subnet = None
|
self.rtr_fip_subnet = None
|
||||||
self.dist_fip_count = None
|
self.dist_fip_count = None
|
||||||
self.fip_ns = None
|
self.fip_ns = None
|
||||||
|
self._pending_arp_set = set()
|
||||||
|
|
||||||
def get_floating_ips(self):
|
def get_floating_ips(self):
|
||||||
"""Filter Floating IPs to be hosted on this agent."""
|
"""Filter Floating IPs to be hosted on this agent."""
|
||||||
|
@ -176,21 +182,65 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||||
if f['subnet_id'] == subnet_id:
|
if f['subnet_id'] == subnet_id:
|
||||||
return port
|
return port
|
||||||
|
|
||||||
|
def _cache_arp_entry(self, ip, mac, subnet_id, operation):
|
||||||
|
"""Cache the arp entries if device not ready."""
|
||||||
|
arp_entry_tuple = Arp_entry(ip=ip,
|
||||||
|
mac=mac,
|
||||||
|
subnet_id=subnet_id,
|
||||||
|
operation=operation)
|
||||||
|
self._pending_arp_set.add(arp_entry_tuple)
|
||||||
|
|
||||||
|
def _process_arp_cache_for_internal_port(self, subnet_id):
|
||||||
|
"""Function to process the cached arp entries."""
|
||||||
|
arp_remove = set()
|
||||||
|
for arp_entry in self._pending_arp_set:
|
||||||
|
if subnet_id == arp_entry.subnet_id:
|
||||||
|
try:
|
||||||
|
state = self._update_arp_entry(
|
||||||
|
arp_entry.ip, arp_entry.mac,
|
||||||
|
arp_entry.subnet_id, arp_entry.operation)
|
||||||
|
except Exception:
|
||||||
|
state = False
|
||||||
|
if state:
|
||||||
|
# If the arp update was successful, then
|
||||||
|
# go ahead and add it to the remove set
|
||||||
|
arp_remove.add(arp_entry)
|
||||||
|
|
||||||
|
self._pending_arp_set -= arp_remove
|
||||||
|
|
||||||
|
def _delete_arp_cache_for_internal_port(self, subnet_id):
|
||||||
|
"""Function to delete the cached arp entries."""
|
||||||
|
arp_delete = set()
|
||||||
|
for arp_entry in self._pending_arp_set:
|
||||||
|
if subnet_id == arp_entry.subnet_id:
|
||||||
|
arp_delete.add(arp_entry)
|
||||||
|
self._pending_arp_set -= arp_delete
|
||||||
|
|
||||||
def _update_arp_entry(self, ip, mac, subnet_id, operation):
|
def _update_arp_entry(self, ip, mac, subnet_id, operation):
|
||||||
"""Add or delete arp entry into router namespace for the subnet."""
|
"""Add or delete arp entry into router namespace for the subnet."""
|
||||||
port = self._get_internal_port(subnet_id)
|
port = self._get_internal_port(subnet_id)
|
||||||
# update arp entry only if the subnet is attached to the router
|
# update arp entry only if the subnet is attached to the router
|
||||||
if not port:
|
if not port:
|
||||||
return
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# TODO(mrsmith): optimize the calls below for bulk calls
|
# TODO(mrsmith): optimize the calls below for bulk calls
|
||||||
interface_name = self.get_internal_device_name(port['id'])
|
interface_name = self.get_internal_device_name(port['id'])
|
||||||
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
|
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
|
||||||
if operation == 'add':
|
if ip_lib.device_exists(interface_name, self.ns_name):
|
||||||
device.neigh.add(ip, mac)
|
if operation == 'add':
|
||||||
elif operation == 'delete':
|
device.neigh.add(ip, mac)
|
||||||
device.neigh.delete(ip, mac)
|
elif operation == 'delete':
|
||||||
|
device.neigh.delete(ip, mac)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
if operation == 'add':
|
||||||
|
LOG.warn(_LW("Device %s does not exist so ARP entry "
|
||||||
|
"cannot be updated, will cache information "
|
||||||
|
"to be applied later when the device exists"),
|
||||||
|
device)
|
||||||
|
self._cache_arp_entry(ip, mac, subnet_id, operation)
|
||||||
|
return False
|
||||||
except Exception:
|
except Exception:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.exception(_LE("DVR: Failed updating arp entry"))
|
LOG.exception(_LE("DVR: Failed updating arp entry"))
|
||||||
|
@ -208,6 +258,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||||
p['mac_address'],
|
p['mac_address'],
|
||||||
subnet_id,
|
subnet_id,
|
||||||
'add')
|
'add')
|
||||||
|
self._process_arp_cache_for_internal_port(subnet_id)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_snat_idx(ip_cidr):
|
def _get_snat_idx(ip_cidr):
|
||||||
|
@ -324,6 +375,9 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
|
||||||
# DVR handling code for SNAT
|
# DVR handling code for SNAT
|
||||||
interface_name = self.get_internal_device_name(port['id'])
|
interface_name = self.get_internal_device_name(port['id'])
|
||||||
self._snat_redirect_remove(sn_port, port, interface_name)
|
self._snat_redirect_remove(sn_port, port, interface_name)
|
||||||
|
# Clean up the cached arp entries related to the port subnet
|
||||||
|
for subnet in port['subnets']:
|
||||||
|
self._delete_arp_cache_for_internal_port(subnet)
|
||||||
|
|
||||||
def internal_network_removed(self, port):
|
def internal_network_removed(self, port):
|
||||||
self._dvr_internal_network_removed(port)
|
self._dvr_internal_network_removed(port)
|
||||||
|
|
|
@ -372,8 +372,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||||
self.device_exists.return_value = False
|
self.device_exists.return_value = False
|
||||||
ri.get_snat_port_for_internal_port = mock.Mock(
|
ri.get_snat_port_for_internal_port = mock.Mock(
|
||||||
return_value=sn_port)
|
return_value=sn_port)
|
||||||
|
ri._delete_arp_cache_for_internal_port = mock.Mock()
|
||||||
ri._snat_redirect_modify = mock.Mock()
|
ri._snat_redirect_modify = mock.Mock()
|
||||||
ri.internal_network_removed(port)
|
ri.internal_network_removed(port)
|
||||||
|
self.assertEqual(
|
||||||
|
1, ri._delete_arp_cache_for_internal_port.call_count)
|
||||||
ri._snat_redirect_modify.assert_called_with(
|
ri._snat_redirect_modify.assert_called_with(
|
||||||
sn_port, port,
|
sn_port, port,
|
||||||
ri.get_internal_device_name(port['id']),
|
ri.get_internal_device_name(port['id']),
|
||||||
|
|
|
@ -365,7 +365,10 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||||
# Test basic case
|
# Test basic case
|
||||||
ports[0]['subnets'] = [{'id': subnet_id,
|
ports[0]['subnets'] = [{'id': subnet_id,
|
||||||
'cidr': '1.2.3.0/24'}]
|
'cidr': '1.2.3.0/24'}]
|
||||||
ri._set_subnet_arp_info(subnet_id)
|
with mock.patch.object(ri,
|
||||||
|
'_process_arp_cache_for_internal_port') as parp:
|
||||||
|
ri._set_subnet_arp_info(subnet_id)
|
||||||
|
self.assertEqual(1, parp.call_count)
|
||||||
self.mock_ip_dev.neigh.add.assert_called_once_with(
|
self.mock_ip_dev.neigh.add.assert_called_once_with(
|
||||||
'1.2.3.4', '00:11:22:33:44:55')
|
'1.2.3.4', '00:11:22:33:44:55')
|
||||||
|
|
||||||
|
@ -414,6 +417,48 @@ class TestDvrRouterOperations(base.BaseTestCase):
|
||||||
ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add')
|
ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add')
|
||||||
self.assertFalse(f.call_count)
|
self.assertFalse(f.call_count)
|
||||||
|
|
||||||
|
def _setup_test_for_arp_entry_cache(self):
|
||||||
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
|
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||||
|
router['distributed'] = True
|
||||||
|
ri = dvr_router.DvrLocalRouter(
|
||||||
|
agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
|
||||||
|
subnet_id = l3_test_common.get_subnet_id(
|
||||||
|
ri.router[l3_constants.INTERFACE_KEY][0])
|
||||||
|
return ri, subnet_id
|
||||||
|
|
||||||
|
def test__update_arp_entry_calls_arp_cache_with_no_device(self):
|
||||||
|
ri, subnet_id = self._setup_test_for_arp_entry_cache()
|
||||||
|
state = True
|
||||||
|
with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\
|
||||||
|
mock.patch.object(ri, '_cache_arp_entry') as arp_cache:
|
||||||
|
self.device_exists.return_value = False
|
||||||
|
state = ri._update_arp_entry(
|
||||||
|
mock.ANY, mock.ANY, subnet_id, 'add')
|
||||||
|
self.assertFalse(state)
|
||||||
|
self.assertTrue(arp_cache.called)
|
||||||
|
arp_cache.assert_called_once_with(mock.ANY, mock.ANY,
|
||||||
|
subnet_id, 'add')
|
||||||
|
self.assertFalse(rtrdev.neigh.add.called)
|
||||||
|
|
||||||
|
def test__process_arp_cache_for_internal_port(self):
|
||||||
|
ri, subnet_id = self._setup_test_for_arp_entry_cache()
|
||||||
|
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
|
||||||
|
subnet_id, 'add')
|
||||||
|
self.assertEqual(1, len(ri._pending_arp_set))
|
||||||
|
with mock.patch.object(ri, '_update_arp_entry') as update_arp:
|
||||||
|
update_arp.return_value = True
|
||||||
|
ri._process_arp_cache_for_internal_port(subnet_id)
|
||||||
|
self.assertEqual(0, len(ri._pending_arp_set))
|
||||||
|
|
||||||
|
def test__delete_arp_cache_for_internal_port(self):
|
||||||
|
ri, subnet_id = self._setup_test_for_arp_entry_cache()
|
||||||
|
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
|
||||||
|
subnet_id, 'add')
|
||||||
|
self.assertEqual(1, len(ri._pending_arp_set))
|
||||||
|
ri._delete_arp_cache_for_internal_port(subnet_id)
|
||||||
|
self.assertEqual(0, len(ri._pending_arp_set))
|
||||||
|
|
||||||
def test_del_arp_entry(self):
|
def test_del_arp_entry(self):
|
||||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||||
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
router = l3_test_common.prepare_router_data(num_internal_ports=2)
|
||||||
|
|
Loading…
Reference in New Issue