DVR: Fix race condition in creation of fip gateway

In large-scale environments, we have seen a router update
arrive for one tenant while we are still creating the
router for a different tenant and initializing the shared
floating IP gateway port.  Sometimes these updates can
get scheduled simultaneously, with the second running
before we are done creating all the resources in the
first, causing an exception when trying to set the
default route since either the interface or IP address
does not exist yet.

Add a lock to better synchronize these functions so
a create can finish before an update can be done.

If it still fails, we will throw an exception so that
the namespace will be cleaned-up and the update can be
re-scheduled for the next iteration.

Closes-Bug: #1631513
Change-Id: Ia8c92cea2f8798582c39ad3450ab3b3c45a356f7
This commit is contained in:
Swaminathan Vasudevan 2016-10-07 10:30:40 -07:00 committed by Brian Haley
parent cbd0632620
commit d40322c7d4
5 changed files with 220 additions and 41 deletions

View File

@ -12,16 +12,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import os
from oslo_concurrency import lockutils
from oslo_log import log as logging
from oslo_utils import excutils
from neutron._i18n import _, _LE, _LW
from neutron.agent.l3 import fip_rule_priority_allocator as frpa
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import utils as common_utils
from neutron.ipam import utils as ipam_utils
@ -97,9 +102,51 @@ class FipNamespace(namespaces.Namespace):
def deallocate_rule_priority(self, floating_ip):
self._rule_priorities.release(floating_ip)
def _gateway_added(self, ex_gw_port, interface_name):
"""Add Floating IP gateway port."""
LOG.debug("add gateway interface(%s)", interface_name)
@contextlib.contextmanager
def _fip_port_lock(self, interface_name):
# Use a namespace and port-specific lock semaphore to allow for
# concurrency
lock_name = 'port-lock-' + self.name + '-' + interface_name
with lockutils.lock(lock_name, common_utils.SYNCHRONIZED_PREFIX):
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE('DVR: FIP namespace config failure '
'for interface %s'), interface_name)
def create_or_update_gateway_port(self, agent_gateway_port):
interface_name = self.get_ext_device_name(agent_gateway_port['id'])
# The lock is used to make sure another thread doesn't call to
# update the gateway port before we are done initializing things.
with self._fip_port_lock(interface_name):
is_first = self.subscribe(agent_gateway_port['network_id'])
if is_first:
self._create_gateway_port_and_ns(agent_gateway_port,
interface_name)
else:
self._update_gateway_port(agent_gateway_port, interface_name)
def _create_gateway_port_and_ns(self, agent_gateway_port, interface_name):
"""Create namespace and Floating IP gateway port."""
self.create()
try:
self._create_gateway_port(agent_gateway_port, interface_name)
except Exception:
# If an exception occurs at this point, then it is
# good to clean up the namespace that has been created
# and reraise the exception in order to resync the router
with excutils.save_and_reraise_exception():
self.unsubscribe(agent_gateway_port['network_id'])
self.delete()
LOG.exception(_LE('DVR: Gateway setup in FIP namespace '
'failed'))
def _create_gateway_port(self, ex_gw_port, interface_name):
"""Request port creation from Plugin then configure gateway port."""
LOG.debug("DVR: adding gateway interface: %s", interface_name)
ns_name = self.get_name()
self.driver.plug(ex_gw_port['network_id'],
ex_gw_port['id'],
@ -116,6 +163,7 @@ class FipNamespace(namespaces.Namespace):
for device in devices:
name = device.name
if name.startswith(FIP_EXT_DEV_PREFIX) and name != interface_name:
LOG.debug('DVR: unplug: %s', name)
ext_net_bridge = self.agent_conf.external_network_bridge
self.driver.unplug(name,
bridge=ext_net_bridge,
@ -126,7 +174,7 @@ class FipNamespace(namespaces.Namespace):
self.driver.init_l3(interface_name, ip_cidrs, namespace=ns_name,
clean_connections=True)
self.update_gateway_port(ex_gw_port)
self._update_gateway_port(ex_gw_port, interface_name)
cmd = ['sysctl', '-w', 'net.ipv4.conf.%s.proxy_arp=1' % interface_name]
ip_wrapper.netns.execute(cmd, check_exit_code=False)
@ -177,17 +225,6 @@ class FipNamespace(namespaces.Namespace):
LOG.debug('DVR: destroy fip namespace: %s', self.name)
super(FipNamespace, self).delete()
def create_gateway_port(self, agent_gateway_port):
"""Create Floating IP gateway port.
Request port creation from Plugin then creates
Floating IP namespace and adds gateway port.
"""
self.create()
iface_name = self.get_ext_device_name(agent_gateway_port['id'])
self._gateway_added(agent_gateway_port, iface_name)
def _check_for_gateway_ip_change(self, new_agent_gateway_port):
def get_gateway_ips(gateway_port):
@ -205,22 +242,33 @@ class FipNamespace(namespaces.Namespace):
return new_gw_ips != old_gw_ips
def update_gateway_port(self, agent_gateway_port):
gateway_ip_not_changed = self.agent_gateway_port and (
not self._check_for_gateway_ip_change(agent_gateway_port))
self.agent_gateway_port = agent_gateway_port
if gateway_ip_not_changed:
return
def _update_gateway_port(self, agent_gateway_port, interface_name):
if (self.agent_gateway_port and
not self._check_for_gateway_ip_change(agent_gateway_port)):
return
ns_name = self.get_name()
interface_name = self.get_ext_device_name(agent_gateway_port['id'])
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
# If the 'fg-' device doesn't exist in the namespace then trying
# to send advertisements or configure the default route will just
# throw exceptions. Unsubscribe this external network so that
# the next call will trigger the interface to be plugged.
if not ipd.exists():
self.unsubscribe(agent_gateway_port['network_id'])
LOG.warning(_LW('DVR: FIP gateway port with interface '
'name: %(device)s does not exist in the given '
'namespace: %(ns)s'), {'device': interface_name,
'ns': ns_name})
msg = _('DVR: Gateway setup in FIP namespace failed, retry '
'should be attempted on next call')
raise n_exc.FloatingIpSetupException(msg)
for fixed_ip in agent_gateway_port['fixed_ips']:
ip_lib.send_ip_addr_adv_notif(ns_name,
interface_name,
fixed_ip['ip_address'],
self.agent_conf.send_arp_for_ha)
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
for subnet in agent_gateway_port['subnets']:
gw_ip = subnet.get('gateway_ip')
if gw_ip:
@ -233,6 +281,10 @@ class FipNamespace(namespaces.Namespace):
current_gateway = ipd.route.get_gateway()
if current_gateway and current_gateway.get('gateway'):
ipd.route.delete_gateway(current_gateway.get('gateway'))
# Cache the agent gateway port after successfully configuring
# the gateway, so that checking on self.agent_gateway_port
# will be a valid check
self.agent_gateway_port = agent_gateway_port
def _add_cidr_to_device(self, device, ip_cidr):
if not device.addr.list(to=ip_cidr):

View File

@ -518,10 +518,8 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
if fip_agent_port:
LOG.debug("FloatingIP agent gateway port received from the "
"plugin: %s", fip_agent_port)
is_first = False
if floating_ips:
is_first = self.fip_ns.subscribe(ex_gw_port['network_id'])
if is_first and not fip_agent_port:
if not fip_agent_port:
LOG.debug("No FloatingIP agent gateway port possibly due to "
"late binding of the private port to the host, "
"requesting agent gateway port for 'network-id' :"
@ -536,10 +534,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
if 'subnets' not in fip_agent_port:
LOG.error(_LE('Missing subnet/agent_gateway_port'))
else:
if is_first:
self.fip_ns.create_gateway_port(fip_agent_port)
else:
self.fip_ns.update_gateway_port(fip_agent_port)
self.fip_ns.create_or_update_gateway_port(fip_agent_port)
if (self.fip_ns.agent_gateway_port and
(self.dist_fip_count == 0)):

View File

@ -28,6 +28,7 @@ from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.extensions import portbindings
from neutron.tests.common import l3_test_common
@ -110,6 +111,89 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertEqual(3, default_rules_list_count)
self.assertEqual(2, interface_rules_list_count)
def test_dvr_update_gateway_port_with_no_gw_port_in_namespace(self):
self.agent.conf.agent_mode = 'dvr'
# Create the router with external net
router_info = self.generate_dvr_router_info()
external_gw_port = router_info['gw_port']
ext_net_id = router_info['_floatingips'][0]['floating_network_id']
self.mock_plugin_api.get_external_network_id.return_value = ext_net_id
router = self.manage_router(self.agent, router_info)
fg_port = router.fip_ns.agent_gateway_port
fg_port_name = router.fip_ns.get_ext_device_name(fg_port['id'])
fg_device = ip_lib.IPDevice(fg_port_name,
namespace=router.fip_ns.name)
# Now validate if the gateway is properly configured.
self.assertIn('gateway', fg_device.route.get_gateway())
self._validate_fips_for_external_network(
router, router.fip_ns.get_name())
# Now delete the fg- port that was created
ext_net_bridge = self.agent.conf.external_network_bridge
router.fip_ns.driver.unplug(fg_port_name,
bridge=ext_net_bridge,
namespace=router.fip_ns.name,
prefix=dvr_fip_ns.FIP_EXT_DEV_PREFIX)
# Now check if the fg- port is missing.
self.assertFalse(fg_device.exists())
# Now change the gateway ip for the router and do an update.
router.ex_gw_port = copy.deepcopy(router.ex_gw_port)
new_fg_port = copy.deepcopy(fg_port)
for subnet in new_fg_port['subnets']:
subnet['gateway_ip'] = '19.4.4.2'
router.router[n_const.FLOATINGIP_AGENT_INTF_KEY] = [new_fg_port]
self.assertRaises(n_exc.FloatingIpSetupException,
self.manage_router,
self.agent,
router.router)
router = self.manage_router(self.agent, router.router)
self.assertTrue(fg_device.exists())
self.assertEqual({'gateway': u'19.4.4.2'},
fg_device.route.get_gateway())
self._validate_fips_for_external_network(
router, router.fip_ns.get_name())
self._delete_router(self.agent, router.router_id)
self._assert_fip_namespace_deleted(external_gw_port)
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
def test_dvr_process_fips_with_no_gw_port_in_namespace(
self, fip_subscribe):
self.agent.conf.agent_mode = 'dvr'
# Create the router with external net
router_info = self.generate_dvr_router_info()
external_gw_port = router_info['gw_port']
ext_net_id = router_info['_floatingips'][0]['floating_network_id']
self.mock_plugin_api.get_external_network_id.return_value = ext_net_id
# Create the fip namespace up front
dvr_fip_ns.FipNamespace(ext_net_id,
self.agent.conf,
self.agent.driver,
self.agent.use_ipv6).create()
# Create the router with the fip, this shouldn't allow the
# update_gateway_port to be called without the fg- port
fip_subscribe.return_value = False
# This will raise the exception and will also clear
# subscription for the ext_net_id
self.assertRaises(n_exc.FloatingIpSetupException,
self.manage_router,
self.agent,
router_info)
fip_subscribe.return_value = True
# Now update the router again
router = self.manage_router(self.agent, router_info)
fg_port = router.fip_ns.agent_gateway_port
fg_port_name = router.fip_ns.get_ext_device_name(fg_port['id'])
fg_device = ip_lib.IPDevice(fg_port_name,
namespace=router.fip_ns.name)
# Now validate if the gateway is properly configured.
self.assertIn('gateway', fg_device.route.get_gateway())
self._validate_fips_for_external_network(
router, router.fip_ns.get_name())
self._delete_router(self.agent, router.router_id)
self._assert_fip_namespace_deleted(external_gw_port)
def test_dvr_router_fips_stale_gw_port(self):
self.agent.conf.agent_mode = 'dvr'
@ -148,7 +232,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
'prefixlen': prefixlen}],
'id': framework._uuid(),
'device_id': framework._uuid()}
stale_fip_ns.create_gateway_port(stale_agent_gw_port)
stale_fip_ns.create_or_update_gateway_port(stale_agent_gw_port)
stale_dev_exists = self.device_exists_with_ips_and_mac(
stale_agent_gw_port,

View File

@ -22,6 +22,7 @@ from neutron.agent.l3 import dvr_fip_ns
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.common import exceptions as n_exc
from neutron.tests import base
_uuid = uuidutils.generate_uuid
@ -93,24 +94,56 @@ class TestDvrFipNs(base.BaseTestCase):
@mock.patch.object(ip_lib, 'IPWrapper')
@mock.patch.object(ip_lib, 'device_exists')
def test_gateway_added(self, device_exists, ip_wrapper):
@mock.patch.object(dvr_fip_ns.FipNamespace, 'create')
def test_create_gateway_port(self, fip_create, device_exists, ip_wrapper):
agent_gw_port = self._get_agent_gw_port()
interface_name = self.fip_ns.get_ext_device_name(agent_gw_port['id'])
device_exists.return_value = False
self.fip_ns.update_gateway_port = mock.Mock()
self.fip_ns._gateway_added(agent_gw_port,
mock.sentinel.interface_name)
self.fip_ns._update_gateway_port = mock.Mock()
self.fip_ns.create_or_update_gateway_port(agent_gw_port)
self.assertTrue(fip_create.called)
self.assertEqual(1, self.driver.plug.call_count)
self.assertEqual(1, self.driver.init_l3.call_count)
self.fip_ns.update_gateway_port.assert_called_once_with(agent_gw_port)
self.fip_ns._update_gateway_port.assert_called_once_with(
agent_gw_port, interface_name)
@mock.patch.object(ip_lib, 'IPWrapper')
@mock.patch.object(ip_lib, 'device_exists')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'create')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'delete')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'unsubscribe')
def test_create_gateway_port_raises_exception(
self, fip_desub, fip_delete, fip_create, device_exists, ip_wrapper):
agent_gw_port = self._get_agent_gw_port()
interface_name = self.fip_ns.get_ext_device_name(agent_gw_port['id'])
device_exists.return_value = False
msg = 'L3 agent failed to setup fip gateway in the namespace'
self.fip_ns._update_gateway_port = mock.Mock(
side_effect=n_exc.FloatingIpSetupException(msg))
self.assertRaises(n_exc.FloatingIpSetupException,
self.fip_ns.create_or_update_gateway_port,
agent_gw_port)
self.assertTrue(fip_create.called)
self.assertEqual(1, self.driver.plug.call_count)
self.assertEqual(1, self.driver.init_l3.call_count)
self.fip_ns._update_gateway_port.assert_called_once_with(
agent_gw_port, interface_name)
self.assertTrue(fip_desub.called)
self.assertTrue(fip_delete.called)
self.assertIsNone(self.fip_ns.agent_gateway_port)
@mock.patch.object(ip_lib, 'IPDevice')
@mock.patch.object(ip_lib, 'send_ip_addr_adv_notif')
def test_update_gateway_port(self, send_adv_notif, IPDevice):
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
def test_update_gateway_port(self, fip_sub, send_adv_notif, IPDevice):
fip_sub.return_value = False
self.fip_ns._check_for_gateway_ip_change = mock.Mock(return_value=True)
self.fip_ns.agent_gateway_port = None
agent_gw_port = self._get_agent_gw_port()
self.fip_ns.update_gateway_port(agent_gw_port)
self.fip_ns.create_or_update_gateway_port(agent_gw_port)
expected = [
mock.call(self.fip_ns.get_name(),
self.fip_ns.get_ext_device_name(agent_gw_port['id']),
@ -126,15 +159,30 @@ class TestDvrFipNs(base.BaseTestCase):
expected = [mock.call(gw_ipv4), mock.call(gw_ipv6)]
IPDevice().route.add_gateway.assert_has_calls(expected)
@mock.patch.object(ip_lib.IPDevice, 'exists')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
def test_update_gateway_port_raises_exception(self, fip_sub, exists):
fip_sub.return_value = False
exists.return_value = False
self.fip_ns._check_for_gateway_ip_change = mock.Mock(return_value=True)
self.fip_ns.agent_gateway_port = None
agent_gw_port = self._get_agent_gw_port()
self.assertRaises(n_exc.FloatingIpSetupException,
self.fip_ns.create_or_update_gateway_port,
agent_gw_port)
@mock.patch.object(ip_lib, 'IPDevice')
@mock.patch.object(ip_lib, 'send_ip_addr_adv_notif')
@mock.patch.object(dvr_fip_ns.FipNamespace, 'subscribe')
def test_update_gateway_port_gateway_outside_subnet_added(
self, send_adv_notif, IPDevice):
self, fip_sub, send_adv_notif, IPDevice):
fip_sub.return_value = False
self.fip_ns.agent_gateway_port = None
agent_gw_port = self._get_agent_gw_port()
agent_gw_port['subnets'][0]['gateway_ip'] = '20.0.1.1'
self.fip_ns.update_gateway_port(agent_gw_port)
self.fip_ns.create_or_update_gateway_port(agent_gw_port)
IPDevice().route.add_route.assert_called_once_with('20.0.1.1',
scope='link')

View File

@ -170,7 +170,7 @@ class TestDvrRouterOperations(base.BaseTestCase):
ri.fip_ns.subscribe.return_value = False
ex_gw_port = {'network_id': 'fake_net_id'}
ri.create_dvr_fip_interfaces(ex_gw_port)
ri.fip_ns.update_gateway_port.assert_called_once_with(
ri.fip_ns.create_or_update_gateway_port.assert_called_once_with(
fip_agent_port)
def test_get_floating_ips_dvr(self):