Merge "More accurate agent restart state transfer"
This commit is contained in:
commit
c42287eef3
@ -153,11 +153,12 @@ class PluginApi(object):
|
|||||||
agent_id=agent_id, host=host)
|
agent_id=agent_id, host=host)
|
||||||
|
|
||||||
def update_device_list(self, context, devices_up, devices_down,
|
def update_device_list(self, context, devices_up, devices_down,
|
||||||
agent_id, host):
|
agent_id, host, agent_restarted=False):
|
||||||
cctxt = self.client.prepare(version='1.5')
|
cctxt = self.client.prepare(version='1.5')
|
||||||
return cctxt.call(context, 'update_device_list',
|
return cctxt.call(context, 'update_device_list',
|
||||||
devices_up=devices_up, devices_down=devices_down,
|
devices_up=devices_up, devices_down=devices_down,
|
||||||
agent_id=agent_id, host=host)
|
agent_id=agent_id, host=host,
|
||||||
|
agent_restarted=agent_restarted)
|
||||||
|
|
||||||
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
|
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
|
||||||
cctxt = self.client.prepare(version='1.4')
|
cctxt = self.client.prepare(version='1.4')
|
||||||
|
@ -20,8 +20,13 @@ from neutron._i18n import _
|
|||||||
|
|
||||||
l2_population_options = [
|
l2_population_options = [
|
||||||
cfg.IntOpt('agent_boot_time', default=180,
|
cfg.IntOpt('agent_boot_time', default=180,
|
||||||
|
deprecated_for_removal=True,
|
||||||
|
deprecated_since='Stein',
|
||||||
help=_('Delay within which agent is expected to update '
|
help=_('Delay within which agent is expected to update '
|
||||||
'existing ports when it restarts')),
|
'existing ports when it restarts. This option '
|
||||||
|
'is deprecated in favor of direct RPC restart '
|
||||||
|
'state transfer and will be removed in a future '
|
||||||
|
'release.')),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -246,11 +246,16 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
|||||||
|
|
||||||
return agents
|
return agents
|
||||||
|
|
||||||
|
# This will be removed in next T release
|
||||||
def agent_restarted(self, context):
|
def agent_restarted(self, context):
|
||||||
agent_host = context.host
|
agent_host = context.host
|
||||||
port_context = context._plugin_context
|
port_context = context._plugin_context
|
||||||
agent = l2pop_db.get_agent_by_host(port_context, agent_host)
|
agent = l2pop_db.get_agent_by_host(port_context, agent_host)
|
||||||
if l2pop_db.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time:
|
if l2pop_db.get_agent_uptime(agent) < cfg.CONF.l2pop.agent_boot_time:
|
||||||
|
LOG.warning("Agent on host '%s' did not supply 'agent_restarted' "
|
||||||
|
"information in RPC message, determined it restarted "
|
||||||
|
"based on deprecated 'agent_boot_time' config option.",
|
||||||
|
agent_host)
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -275,7 +280,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
|||||||
self.L2populationAgentNotify.remove_fdb_entries(
|
self.L2populationAgentNotify.remove_fdb_entries(
|
||||||
self.rpc_ctx, fdb_entries)
|
self.rpc_ctx, fdb_entries)
|
||||||
|
|
||||||
def update_port_up(self, context):
|
def update_port_up(self, context, agent_restarted=None):
|
||||||
port = context.current
|
port = context.current
|
||||||
agent_host = context.host
|
agent_host = context.host
|
||||||
port_context = context._plugin_context
|
port_context = context._plugin_context
|
||||||
@ -301,7 +306,10 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
|||||||
# with high concurrency more than 1 port may be activated on an agent
|
# with high concurrency more than 1 port may be activated on an agent
|
||||||
# at the same time (like VM port + a DVR port) so checking for 1 or 2
|
# at the same time (like VM port + a DVR port) so checking for 1 or 2
|
||||||
is_first_port = agent_active_ports in (1, 2)
|
is_first_port = agent_active_ports in (1, 2)
|
||||||
if is_first_port or self.agent_restarted(context):
|
if agent_restarted is None:
|
||||||
|
# Only for backport compatibility, will be removed.
|
||||||
|
agent_restarted = self.agent_restarted(context)
|
||||||
|
if is_first_port or agent_restarted:
|
||||||
# First port(s) activated on current agent in this network,
|
# First port(s) activated on current agent in this network,
|
||||||
# we have to provide it with the whole list of fdb entries
|
# we have to provide it with the whole list of fdb entries
|
||||||
agent_fdb_entries = self._create_agent_fdb(port_context,
|
agent_fdb_entries = self._create_agent_fdb(port_context,
|
||||||
|
@ -944,9 +944,15 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||||||
LOG.debug("Setting status for %s to DOWN", device)
|
LOG.debug("Setting status for %s to DOWN", device)
|
||||||
devices_down.append(device)
|
devices_down.append(device)
|
||||||
if devices_up or devices_down:
|
if devices_up or devices_down:
|
||||||
|
# When the iter_num == 0, that indicate the ovs-agent is doing
|
||||||
|
# the initialization work. L2 pop needs this precise knowledge
|
||||||
|
# to notify the agent to refresh the tunnel related flows.
|
||||||
|
# Otherwise, these flows will be cleaned as stale due to the
|
||||||
|
# different cookie id.
|
||||||
|
agent_restarted = self.iter_num == 0
|
||||||
devices_set = self.plugin_rpc.update_device_list(
|
devices_set = self.plugin_rpc.update_device_list(
|
||||||
self.context, devices_up, devices_down, self.agent_id,
|
self.context, devices_up, devices_down, self.agent_id,
|
||||||
self.conf.host)
|
self.conf.host, agent_restarted=agent_restarted)
|
||||||
failed_devices = (devices_set.get('failed_devices_up') +
|
failed_devices = (devices_set.get('failed_devices_up') +
|
||||||
devices_set.get('failed_devices_down'))
|
devices_set.get('failed_devices_down'))
|
||||||
if failed_devices:
|
if failed_devices:
|
||||||
|
@ -254,6 +254,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||||||
|
|
||||||
def update_device_up(self, rpc_context, **kwargs):
|
def update_device_up(self, rpc_context, **kwargs):
|
||||||
"""Device is up on agent."""
|
"""Device is up on agent."""
|
||||||
|
agent_restarted = kwargs.pop('agent_restarted', None)
|
||||||
agent_id, host, device = self._get_request_details(kwargs)
|
agent_id, host, device = self._get_request_details(kwargs)
|
||||||
LOG.debug("Device %(device)s up at agent %(agent_id)s",
|
LOG.debug("Device %(device)s up at agent %(agent_id)s",
|
||||||
{'device': device, 'agent_id': agent_id})
|
{'device': device, 'agent_id': agent_id})
|
||||||
@ -281,7 +282,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||||||
else:
|
else:
|
||||||
self.update_port_status_to_active(port, rpc_context, port_id, host)
|
self.update_port_status_to_active(port, rpc_context, port_id, host)
|
||||||
self.notify_l2pop_port_wiring(port_id, rpc_context,
|
self.notify_l2pop_port_wiring(port_id, rpc_context,
|
||||||
n_const.PORT_STATUS_ACTIVE, host)
|
n_const.PORT_STATUS_ACTIVE, host,
|
||||||
|
agent_restarted)
|
||||||
|
|
||||||
def update_port_status_to_active(self, port, rpc_context, port_id, host):
|
def update_port_status_to_active(self, port, rpc_context, port_id, host):
|
||||||
plugin = directory.get_plugin()
|
plugin = directory.get_plugin()
|
||||||
@ -305,7 +307,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||||||
provisioning_blocks.L2_AGENT_ENTITY)
|
provisioning_blocks.L2_AGENT_ENTITY)
|
||||||
|
|
||||||
def notify_l2pop_port_wiring(self, port_id, rpc_context,
|
def notify_l2pop_port_wiring(self, port_id, rpc_context,
|
||||||
status, host):
|
status, host, agent_restarted=None):
|
||||||
"""Notify the L2pop driver that a port has been wired/unwired.
|
"""Notify the L2pop driver that a port has been wired/unwired.
|
||||||
|
|
||||||
The L2pop driver uses this notification to broadcast forwarding
|
The L2pop driver uses this notification to broadcast forwarding
|
||||||
@ -328,8 +330,10 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||||||
# and so we don't need to update it again here. But, l2pop did not
|
# and so we don't need to update it again here. But, l2pop did not
|
||||||
# handle DVR ports while restart neutron-*-agent, we need to handle
|
# handle DVR ports while restart neutron-*-agent, we need to handle
|
||||||
# it here.
|
# it here.
|
||||||
|
if agent_restarted is None:
|
||||||
|
agent_restarted = l2pop_driver.obj.agent_restarted(port_context)
|
||||||
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
|
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
|
||||||
not l2pop_driver.obj.agent_restarted(port_context)):
|
not agent_restarted):
|
||||||
return
|
return
|
||||||
port = port_context.current
|
port = port_context.current
|
||||||
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
|
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
|
||||||
@ -345,7 +349,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||||||
port_context.current['status'] = status
|
port_context.current['status'] = status
|
||||||
port_context.current[portbindings.HOST_ID] = host
|
port_context.current[portbindings.HOST_ID] = host
|
||||||
if status == n_const.PORT_STATUS_ACTIVE:
|
if status == n_const.PORT_STATUS_ACTIVE:
|
||||||
l2pop_driver.obj.update_port_up(port_context)
|
l2pop_driver.obj.update_port_up(port_context, agent_restarted)
|
||||||
else:
|
else:
|
||||||
l2pop_driver.obj.update_port_down(port_context)
|
l2pop_driver.obj.update_port_down(port_context)
|
||||||
|
|
||||||
|
@ -273,7 +273,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
return ports
|
return ports
|
||||||
|
|
||||||
def _mock_update_device(self, context, devices_up, devices_down, agent_id,
|
def _mock_update_device(self, context, devices_up, devices_down, agent_id,
|
||||||
host=None):
|
host=None, agent_restarted=False):
|
||||||
dev_up = []
|
dev_up = []
|
||||||
dev_down = []
|
dev_down = []
|
||||||
for port in self.ports:
|
for port in self.ports:
|
||||||
@ -317,7 +317,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
def _prepare_failed_dev_up_trigger(self, agent):
|
def _prepare_failed_dev_up_trigger(self, agent):
|
||||||
|
|
||||||
def mock_failed_devices_up(context, devices_up, devices_down,
|
def mock_failed_devices_up(context, devices_up, devices_down,
|
||||||
agent_id, host=None):
|
agent_id, host=None,
|
||||||
|
agent_restarted=False):
|
||||||
failed_devices = []
|
failed_devices = []
|
||||||
devices = list(devices_up)
|
devices = list(devices_up)
|
||||||
# first port fails
|
# first port fails
|
||||||
@ -338,7 +339,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
def _prepare_failed_dev_down_trigger(self, agent):
|
def _prepare_failed_dev_down_trigger(self, agent):
|
||||||
|
|
||||||
def mock_failed_devices_down(context, devices_up, devices_down,
|
def mock_failed_devices_down(context, devices_up, devices_down,
|
||||||
agent_id, host=None):
|
agent_id, host=None,
|
||||||
|
agent_restarted=False):
|
||||||
# first port fails
|
# first port fails
|
||||||
failed_port_id = self.ports[0]['id']
|
failed_port_id = self.ports[0]['id']
|
||||||
failed_devices_down = []
|
failed_devices_down = []
|
||||||
|
@ -355,11 +355,13 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
|
|||||||
self.mock_fanout.assert_called_with(
|
self.mock_fanout.assert_called_with(
|
||||||
mock.ANY, 'remove_fdb_entries', expected)
|
mock.ANY, 'remove_fdb_entries', expected)
|
||||||
|
|
||||||
def test_ovs_agent_restarted_with_dvr_port(self):
|
def _test_ovs_agent_restarted_with_dvr_port(
|
||||||
|
self, agent_boot_timeout=True, agent_restarted=False):
|
||||||
plugin = directory.get_plugin()
|
plugin = directory.get_plugin()
|
||||||
router = self._create_dvr_router()
|
router = self._create_dvr_router()
|
||||||
with mock.patch.object(l2pop_mech_driver.L2populationMechanismDriver,
|
with mock.patch.object(l2pop_mech_driver.L2populationMechanismDriver,
|
||||||
'agent_restarted', return_value=True):
|
'agent_restarted',
|
||||||
|
return_value=agent_boot_timeout):
|
||||||
with self.subnet(network=self._network,
|
with self.subnet(network=self._network,
|
||||||
enable_dhcp=False) as snet:
|
enable_dhcp=False) as snet:
|
||||||
with self.port(
|
with self.port(
|
||||||
@ -373,10 +375,12 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
|
|||||||
port = self._show('ports', port_id)
|
port = self._show('ports', port_id)
|
||||||
self.assertEqual(portbindings.VIF_TYPE_DISTRIBUTED,
|
self.assertEqual(portbindings.VIF_TYPE_DISTRIBUTED,
|
||||||
port['port'][portbindings.VIF_TYPE])
|
port['port'][portbindings.VIF_TYPE])
|
||||||
self.callbacks.update_device_up(self.adminContext,
|
self.callbacks.update_device_up(
|
||||||
agent_id=HOST_4,
|
self.adminContext,
|
||||||
device=port_id,
|
agent_id=HOST_4,
|
||||||
host=HOST_4)
|
device=port_id,
|
||||||
|
host=HOST_4,
|
||||||
|
agent_restarted=agent_restarted)
|
||||||
fanout_expected = {port['port']['network_id']: {
|
fanout_expected = {port['port']['network_id']: {
|
||||||
'network_type': u'vxlan',
|
'network_type': u'vxlan',
|
||||||
'ports': {
|
'ports': {
|
||||||
@ -386,6 +390,13 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
|
|||||||
'add_fdb_entries',
|
'add_fdb_entries',
|
||||||
fanout_expected)
|
fanout_expected)
|
||||||
|
|
||||||
|
def test_ovs_agent_restarted_with_dvr_port_boot_config_timeout(self):
|
||||||
|
self._test_ovs_agent_restarted_with_dvr_port()
|
||||||
|
|
||||||
|
def test_ovs_agent_restarted_with_dvr_port_rpc_send_timeout(self):
|
||||||
|
self._test_ovs_agent_restarted_with_dvr_port(
|
||||||
|
agent_boot_timeout=False, agent_restarted=True)
|
||||||
|
|
||||||
def test_ha_agents_with_dvr_rtr_does_not_get_other_fdb(self):
|
def test_ha_agents_with_dvr_rtr_does_not_get_other_fdb(self):
|
||||||
router = self._create_dvr_router()
|
router = self._create_dvr_router()
|
||||||
directory.add_plugin(plugin_constants.L3, self.plugin)
|
directory.add_plugin(plugin_constants.L3, self.plugin)
|
||||||
|
@ -749,7 +749,8 @@ class TestOvsNeutronAgent(object):
|
|||||||
self.agent._bind_devices(port_details)
|
self.agent._bind_devices(port_details)
|
||||||
update_devices.assert_called_once_with(mock.ANY, devices_up,
|
update_devices.assert_called_once_with(mock.ANY, devices_up,
|
||||||
devices_down,
|
devices_down,
|
||||||
mock.ANY, mock.ANY)
|
mock.ANY, mock.ANY,
|
||||||
|
agent_restarted=True)
|
||||||
|
|
||||||
def _test_arp_spoofing(self, enable_prevent_arp_spoofing):
|
def _test_arp_spoofing(self, enable_prevent_arp_spoofing):
|
||||||
self.agent.prevent_arp_spoofing = enable_prevent_arp_spoofing
|
self.agent.prevent_arp_spoofing = enable_prevent_arp_spoofing
|
||||||
|
@ -455,6 +455,7 @@ class RpcApiTestCase(base.BaseTestCase):
|
|||||||
devices_down=['fake_device3', 'fake_device4'],
|
devices_down=['fake_device3', 'fake_device4'],
|
||||||
agent_id='fake_agent_id',
|
agent_id='fake_agent_id',
|
||||||
host='fake_host',
|
host='fake_host',
|
||||||
|
agent_restarted=False,
|
||||||
version='1.5')
|
version='1.5')
|
||||||
|
|
||||||
def test_get_devices_details_list_and_failed_devices(self):
|
def test_get_devices_details_list_and_failed_devices(self):
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
---
|
||||||
|
deprecations:
|
||||||
|
- |
|
||||||
|
The L2 population ``agent_boot_time`` config option is deprecated in
|
||||||
|
favor of the direct RPC agent restart state transfer. It will be
|
||||||
|
removed in the ``Train`` release.
|
||||||
|
critical:
|
||||||
|
- |
|
||||||
|
The neutron-openvswitch-agent can sometimes spend too much time handling
|
||||||
|
a large number of ports, exceeding its timeout value, ``agent_boot_time``,
|
||||||
|
for L2 population. Because of this, some flow update operations will not
|
||||||
|
be triggerred, resulting in lost flows during agent restart, especially
|
||||||
|
for host-to-host vxlan tunnel flows, causing the original tunnel flows to
|
||||||
|
be treated as stale due to the different cookie IDs. The agent's first
|
||||||
|
RPC loop will also do a stale flow clean-up procedure and delete them,
|
||||||
|
leading to a loss of connectivity.
|
||||||
|
Please ensure that all neutron-server and neutron-openvswitch-agent
|
||||||
|
binaries are upgraded for the changes to take effect, after which
|
||||||
|
the L2 population ``agent_boot_time`` config option will no longer
|
||||||
|
be used.
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
The neutron-openvswitch-agent was changed to notify the neutron-server
|
||||||
|
in its first RPC loop that it has restarted. This signals neutron-server
|
||||||
|
to provide updated L2 population information to correctly program FDB
|
||||||
|
entries, ensuring connectivity to instances is not interrupted.
|
||||||
|
This fixes the following bugs:
|
||||||
|
`1794991 <https://bugs.launchpad.net/neutron/+bug/1794991>`_,
|
||||||
|
`1799178 <https://bugs.launchpad.net/neutron/+bug/1799178>`_,
|
||||||
|
`1813703 <https://bugs.launchpad.net/neutron/+bug/1813703>`_,
|
||||||
|
`1813714 <https://bugs.launchpad.net/neutron/+bug/1813714>`_,
|
||||||
|
`1813715 <https://bugs.launchpad.net/neutron/+bug/1813715>`_.
|
Loading…
Reference in New Issue
Block a user