neutron/neutron/plugins/ml2/rpc.py
Darragh O'Reilly 93e9dc5426 ovs agent: signal to plugin if tunnel refresh needed
Currently the ovs agent calls update_device_list with the
agent_restarted flag set only on the first loop iteration. Then the
server knows to send the l2pop flooding entries for the network to
the agent. But when a compute node with many instances on many
networks reboots, it takes time to readd all the active devices and
some may be readded after the first loop iteration. Then the server
can fail to send the flooding entries which means there will be no
flood_to_tuns flow and broadcasts like dhcp will fail.

This patch fixes that by renaming the agent_restarted flag to
refresh_tunnels and setting it if the agent has not received the
flooding entries for the network.

Change-Id: I607aa8fa399e72b037fd068ad4f02b6210e57e91
Closes-Bug: #1853613
2020-01-09 14:18:43 +00:00

491 lines
22 KiB
Python

# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.agent import topics
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import uplink_status_propagation as usp
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from osprofiler import profiler
from sqlalchemy.orm import exc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.db import l3_hamode_db
from neutron.db import provisioning_blocks
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
# mixins and eventually remove the direct dependencies on type_tunnel.
LOG = log.getLogger(__name__)
class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
# return value to include fixed_ips and device_owner for
# the device port
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
# 1.5 Support update_device_list and
# get_devices_details_list_and_failed_devices
# 1.6 Support get_network_details
# 1.7 Support get_ports_by_vnic_type_and_host
# 1.8 Rename agent_restarted to refresh_tunnels in
# update_device_list to reflect its expanded purpose
target = oslo_messaging.Target(version='1.8')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
super(RpcCallbacks, self).__init__()
def _get_new_status(self, host, port_context):
port = port_context.current
if not host or host == port_context.host:
new_status = (n_const.PORT_STATUS_BUILD if port['admin_state_up']
else n_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
return new_status
@staticmethod
def _get_request_details(kwargs):
return (kwargs.get('agent_id'),
kwargs.get('host'),
kwargs.get('device') or kwargs.get('network'))
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id, host, device = self._get_request_details(kwargs)
# cached networks used for reducing number of network db calls
# for server internal usage only
cached_networks = kwargs.get('cached_networks')
LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_context = plugin.get_bound_port_context(rpc_context,
port_id,
host,
cached_networks)
if not port_context:
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device, 'agent_id': agent_id})
return {'device': device}
port = port_context.current
# caching information about networks for future use
if cached_networks is not None:
if port['network_id'] not in cached_networks:
cached_networks[port['network_id']] = (
port_context.network.current)
result = self._get_device_details(rpc_context, agent_id=agent_id,
host=host, device=device,
port_context=port_context)
if 'network_id' in result:
# success so we update status
new_status = self._get_new_status(host, port_context)
if new_status:
plugin.update_port_status(rpc_context,
port_id,
new_status,
host,
port_context.network.current)
return result
def _get_device_details(self, rpc_context, agent_id, host, device,
port_context):
segment = port_context.bottom_bound_segment
port = port_context.current
if not segment:
LOG.warning("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s",
{'device': device,
'agent_id': agent_id,
'network_id': port['network_id'],
'vif_type': port_context.vif_type})
return {'device': device}
if (port['device_owner'].startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX) and
port[portbindings.HOST_ID] != host):
LOG.debug("Device %(device)s has no active binding in host "
"%(host)s", {'device': device,
'host': host})
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
network_qos_policy_id = port_context.network._network.get(
qos_consts.QOS_POLICY_ID)
entry = {'device': device,
'network_id': port['network_id'],
'port_id': port['id'],
'mac_address': port['mac_address'],
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK],
'mtu': port_context.network._network.get('mtu'),
'fixed_ips': port['fixed_ips'],
'device_owner': port['device_owner'],
'allowed_address_pairs': port['allowed_address_pairs'],
'port_security_enabled': port.get(psec.PORTSECURITY, True),
'qos_policy_id': port.get(qos_consts.QOS_POLICY_ID),
'network_qos_policy_id': network_qos_policy_id,
'profile': port[portbindings.PROFILE],
'propagate_uplink_status': port.get(
usp.PROPAGATE_UPLINK_STATUS, False)}
LOG.debug("Returning: %s", entry)
return entry
def get_devices_details_list(self, rpc_context, **kwargs):
# cached networks used for reducing number of network db calls
cached_networks = {}
return [
self.get_device_details(
rpc_context,
device=device,
cached_networks=cached_networks,
**kwargs
)
for device in kwargs.pop('devices', [])
]
def get_devices_details_list_and_failed_devices(self,
rpc_context,
**kwargs):
devices = []
failed_devices = []
devices_to_fetch = kwargs.pop('devices', [])
plugin = directory.get_plugin()
host = kwargs.get('host')
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
devices_to_fetch,
host)
for device in devices_to_fetch:
if not bound_contexts.get(device):
# unbound bound
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device,
'agent_id': kwargs.get('agent_id')})
devices.append({'device': device})
continue
try:
devices.append(self._get_device_details(
rpc_context,
agent_id=kwargs.get('agent_id'),
host=host,
device=device,
port_context=bound_contexts[device]))
except Exception:
LOG.exception("Failed to get details for device %s",
device)
failed_devices.append(device)
new_status_map = {ctxt.current['id']: self._get_new_status(host, ctxt)
for ctxt in bound_contexts.values() if ctxt}
# filter out any without status changes
new_status_map = {p: s for p, s in new_status_map.items() if s}
try:
plugin.update_port_statuses(rpc_context, new_status_map, host)
except Exception:
LOG.exception("Failure updating statuses, retrying all")
failed_devices = devices_to_fetch
devices = []
return {'devices': devices,
'failed_devices': failed_devices}
def get_network_details(self, rpc_context, **kwargs):
"""Agent requests network details."""
agent_id, host, network = self._get_request_details(kwargs)
LOG.debug("Network %(network)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'network': network, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
return plugin.get_network(rpc_context, network)
@profiler.trace("rpc")
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s no longer exists at agent "
"%(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_exists = True
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
else:
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being "
"executed concurrently. Ignoring StaleDataError.")
return {'device': device,
'exists': port_exists}
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_DOWN, host)
return {'device': device,
'exists': port_exists}
@profiler.trace("rpc")
def update_device_up(self, rpc_context, **kwargs):
"""Device is up on agent."""
refresh_tunnels = kwargs.pop('refresh_tunnels', False)
if not refresh_tunnels:
# For backward compatibility with older agents
refresh_tunnels = kwargs.pop('agent_restarted', False)
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s up at agent %(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port = plugin.port_bound_to_host(rpc_context, port_id, host)
if host and not port:
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
# this might mean that a VM is in the process of live migration
# and vif was plugged on the destination compute node;
# need to notify nova explicitly
port = ml2_db.get_port(rpc_context, port_id)
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case).
if not port:
LOG.debug("Port %s not found, will not notify nova.", port_id)
return
else:
if port.device_owner.startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX):
# NOTE(haleyb): It is possible for a test to override a
# config option after the plugin has been initialized so
# the nova_notifier attribute is not set on the plugin.
if (cfg.CONF.notify_nova_on_port_status_changes and
hasattr(plugin, 'nova_notifier')):
plugin.nova_notifier.notify_port_active_direct(port)
return
else:
self.update_port_status_to_active(port, rpc_context, port_id, host)
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_ACTIVE, host,
refresh_tunnels)
def update_port_status_to_active(self, port, rpc_context, port_id, host):
plugin = directory.get_plugin()
if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE:
# NOTE(kevinbenton): we have to special case DVR ports because of
# the special multi-binding status update logic they have that
# depends on the host
plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE, host)
else:
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case). We
# need to look up the full one before calling provisioning_complete
if not port:
port = ml2_db.get_port(rpc_context, port_id)
if not port:
# port doesn't exist, no need to add a provisioning block
return
provisioning_blocks.provisioning_complete(
rpc_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def notify_l2pop_port_wiring(self, port_id, rpc_context,
status, host, refresh_tunnels=False):
"""Notify the L2pop driver that a port has been wired/unwired.
The L2pop driver uses this notification to broadcast forwarding
entries to other agents on the same network as the port for port_id.
"""
plugin = directory.get_plugin()
l2pop_driver = plugin.mechanism_manager.mech_drivers.get(
'l2population')
if not l2pop_driver:
return
port = ml2_db.get_port(rpc_context, port_id)
if not port:
return
port_context = plugin.get_bound_port_context(
rpc_context, port_id, host)
if not port_context:
# port deleted
return
# NOTE: DVR ports are already handled and updated through l2pop
# and so we don't need to update it again here. But, l2pop did not
# handle DVR ports while restart neutron-*-agent, we need to handle
# it here.
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
not refresh_tunnels):
return
port = port_context.current
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
status == n_const.PORT_STATUS_ACTIVE and
port[portbindings.HOST_ID] != host and
not l3_hamode_db.is_ha_router_port(rpc_context,
port['device_owner'],
port['device_id'])):
# don't setup ACTIVE forwarding entries unless bound to this
# host or if it's an HA or DVR port (which is special-cased in
# the mech driver)
return
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:
l2pop_driver.obj.update_port_up(port_context, refresh_tunnels)
else:
l2pop_driver.obj.update_port_down(port_context)
@profiler.trace("rpc")
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []
devices_down = []
failed_devices_down = []
devices = kwargs.get('devices_up')
if devices:
for device in devices:
try:
self.update_device_up(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_up.append(device)
LOG.error("Failed to update device %s up", device)
else:
devices_up.append(device)
devices = kwargs.get('devices_down')
if devices:
for device in devices:
try:
dev = self.update_device_down(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_down.append(device)
LOG.error("Failed to update device %s down", device)
else:
devices_down.append(dev)
return {'devices_up': devices_up,
'failed_devices_up': failed_devices_up,
'devices_down': devices_down,
'failed_devices_down': failed_devices_down}
def get_ports_by_vnic_type_and_host(self, rpc_context, vnic_type, host):
plugin = directory.get_plugin()
return plugin.get_ports_by_vnic_type_and_host(
rpc_context, vnic_type=vnic_type, host=host)
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
1.4 - Added network_update
1.5 - Added binding_activate and binding_deactivate
"""
def __init__(self, topic):
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
self.topic_network_update = topics.get_topic_name(topic,
topics.NETWORK,
topics.UPDATE)
self.topic_port_binding_deactivate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.DEACTIVATE)
self.topic_port_binding_activate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.ACTIVATE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
def port_delete(self, context, port_id):
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')
cctxt.cast(context, 'network_update', network=network)
def binding_deactivate(self, context, port_id, host, network_id):
cctxt = self.client.prepare(topic=self.topic_port_binding_deactivate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_deactivate', port_id=port_id, host=host,
network_id=network_id)
def binding_activate(self, context, port_id, host):
cctxt = self.client.prepare(topic=self.topic_port_binding_activate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_activate', port_id=port_id, host=host)