neutron/neutron/plugins/ml2/rpc.py

491 lines
22 KiB
Python

# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.agent import topics
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import uplink_status_propagation as usp
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from osprofiler import profiler
from sqlalchemy.orm import exc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.db import l3_hamode_db
from neutron.db import provisioning_blocks
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
# mixins and eventually remove the direct dependencies on type_tunnel.
LOG = log.getLogger(__name__)
class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
# return value to include fixed_ips and device_owner for
# the device port
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
# 1.5 Support update_device_list and
# get_devices_details_list_and_failed_devices
# 1.6 Support get_network_details
# 1.7 Support get_ports_by_vnic_type_and_host
# 1.8 Rename agent_restarted to refresh_tunnels in
# update_device_list to reflect its expanded purpose
target = oslo_messaging.Target(version='1.8')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
super(RpcCallbacks, self).__init__()
def _get_new_status(self, host, port_context):
port = port_context.current
if not host or host == port_context.host:
new_status = (n_const.PORT_STATUS_BUILD if port['admin_state_up']
else n_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
return new_status
@staticmethod
def _get_request_details(kwargs):
return (kwargs.get('agent_id'),
kwargs.get('host'),
kwargs.get('device') or kwargs.get('network'))
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id, host, device = self._get_request_details(kwargs)
# cached networks used for reducing number of network db calls
# for server internal usage only
cached_networks = kwargs.get('cached_networks')
LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_context = plugin.get_bound_port_context(rpc_context,
port_id,
host,
cached_networks)
if not port_context:
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device, 'agent_id': agent_id})
return {'device': device}
port = port_context.current
# caching information about networks for future use
if cached_networks is not None:
if port['network_id'] not in cached_networks:
cached_networks[port['network_id']] = (
port_context.network.current)
result = self._get_device_details(rpc_context, agent_id=agent_id,
host=host, device=device,
port_context=port_context)
if 'network_id' in result:
# success so we update status
new_status = self._get_new_status(host, port_context)
if new_status:
plugin.update_port_status(rpc_context,
port_id,
new_status,
host,
port_context.network.current)
return result
def _get_device_details(self, rpc_context, agent_id, host, device,
port_context):
segment = port_context.bottom_bound_segment
port = port_context.current
if not segment:
LOG.warning("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s",
{'device': device,
'agent_id': agent_id,
'network_id': port['network_id'],
'vif_type': port_context.vif_type})
return {'device': device}
if (port['device_owner'].startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX) and
port[portbindings.HOST_ID] != host):
LOG.debug("Device %(device)s has no active binding in host "
"%(host)s", {'device': device,
'host': host})
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
network_qos_policy_id = port_context.network._network.get(
qos_consts.QOS_POLICY_ID)
entry = {'device': device,
'network_id': port['network_id'],
'port_id': port['id'],
'mac_address': port['mac_address'],
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK],
'mtu': port_context.network._network.get('mtu'),
'fixed_ips': port['fixed_ips'],
'device_owner': port['device_owner'],
'allowed_address_pairs': port['allowed_address_pairs'],
'port_security_enabled': port.get(psec.PORTSECURITY, True),
'qos_policy_id': port.get(qos_consts.QOS_POLICY_ID),
'network_qos_policy_id': network_qos_policy_id,
'profile': port[portbindings.PROFILE],
'propagate_uplink_status': port.get(
usp.PROPAGATE_UPLINK_STATUS, False)}
LOG.debug("Returning: %s", entry)
return entry
def get_devices_details_list(self, rpc_context, **kwargs):
# cached networks used for reducing number of network db calls
cached_networks = {}
return [
self.get_device_details(
rpc_context,
device=device,
cached_networks=cached_networks,
**kwargs
)
for device in kwargs.pop('devices', [])
]
def get_devices_details_list_and_failed_devices(self,
rpc_context,
**kwargs):
devices = []
failed_devices = []
devices_to_fetch = kwargs.pop('devices', [])
plugin = directory.get_plugin()
host = kwargs.get('host')
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
devices_to_fetch,
host)
for device in devices_to_fetch:
if not bound_contexts.get(device):
# unbound bound
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device,
'agent_id': kwargs.get('agent_id')})
devices.append({'device': device})
continue
try:
devices.append(self._get_device_details(
rpc_context,
agent_id=kwargs.get('agent_id'),
host=host,
device=device,
port_context=bound_contexts[device]))
except Exception:
LOG.exception("Failed to get details for device %s",
device)
failed_devices.append(device)
new_status_map = {ctxt.current['id']: self._get_new_status(host, ctxt)
for ctxt in bound_contexts.values() if ctxt}
# filter out any without status changes
new_status_map = {p: s for p, s in new_status_map.items() if s}
try:
plugin.update_port_statuses(rpc_context, new_status_map, host)
except Exception:
LOG.exception("Failure updating statuses, retrying all")
failed_devices = devices_to_fetch
devices = []
return {'devices': devices,
'failed_devices': failed_devices}
def get_network_details(self, rpc_context, **kwargs):
"""Agent requests network details."""
agent_id, host, network = self._get_request_details(kwargs)
LOG.debug("Network %(network)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'network': network, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
return plugin.get_network(rpc_context, network)
@profiler.trace("rpc")
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s no longer exists at agent "
"%(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_exists = True
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
else:
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being "
"executed concurrently. Ignoring StaleDataError.")
return {'device': device,
'exists': port_exists}
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_DOWN, host)
return {'device': device,
'exists': port_exists}
@profiler.trace("rpc")
def update_device_up(self, rpc_context, **kwargs):
"""Device is up on agent."""
refresh_tunnels = kwargs.pop('refresh_tunnels', False)
if not refresh_tunnels:
# For backward compatibility with older agents
refresh_tunnels = kwargs.pop('agent_restarted', False)
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s up at agent %(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port = plugin.port_bound_to_host(rpc_context, port_id, host)
if host and not port:
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
# this might mean that a VM is in the process of live migration
# and vif was plugged on the destination compute node;
# need to notify nova explicitly
port = ml2_db.get_port(rpc_context, port_id)
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case).
if not port:
LOG.debug("Port %s not found, will not notify nova.", port_id)
return
else:
if port.device_owner.startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX):
# NOTE(haleyb): It is possible for a test to override a
# config option after the plugin has been initialized so
# the nova_notifier attribute is not set on the plugin.
if (cfg.CONF.notify_nova_on_port_status_changes and
hasattr(plugin, 'nova_notifier')):
plugin.nova_notifier.notify_port_active_direct(port)
return
else:
self.update_port_status_to_active(port, rpc_context, port_id, host)
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_ACTIVE, host,
refresh_tunnels)
def update_port_status_to_active(self, port, rpc_context, port_id, host):
plugin = directory.get_plugin()
if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE:
# NOTE(kevinbenton): we have to special case DVR ports because of
# the special multi-binding status update logic they have that
# depends on the host
plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE, host)
else:
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case). We
# need to look up the full one before calling provisioning_complete
if not port:
port = ml2_db.get_port(rpc_context, port_id)
if not port:
# port doesn't exist, no need to add a provisioning block
return
provisioning_blocks.provisioning_complete(
rpc_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def notify_l2pop_port_wiring(self, port_id, rpc_context,
status, host, refresh_tunnels=False):
"""Notify the L2pop driver that a port has been wired/unwired.
The L2pop driver uses this notification to broadcast forwarding
entries to other agents on the same network as the port for port_id.
"""
plugin = directory.get_plugin()
l2pop_driver = plugin.mechanism_manager.mech_drivers.get(
'l2population')
if not l2pop_driver:
return
port = ml2_db.get_port(rpc_context, port_id)
if not port:
return
port_context = plugin.get_bound_port_context(
rpc_context, port_id, host)
if not port_context:
# port deleted
return
# NOTE: DVR ports are already handled and updated through l2pop
# and so we don't need to update it again here. But, l2pop did not
# handle DVR ports while restart neutron-*-agent, we need to handle
# it here.
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
not refresh_tunnels):
return
port = port_context.current
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
status == n_const.PORT_STATUS_ACTIVE and
port[portbindings.HOST_ID] != host and
not l3_hamode_db.is_ha_router_port(rpc_context,
port['device_owner'],
port['device_id'])):
# don't setup ACTIVE forwarding entries unless bound to this
# host or if it's an HA or DVR port (which is special-cased in
# the mech driver)
return
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:
l2pop_driver.obj.update_port_up(port_context, refresh_tunnels)
else:
l2pop_driver.obj.update_port_down(port_context)
@profiler.trace("rpc")
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []
devices_down = []
failed_devices_down = []
devices = kwargs.get('devices_up')
if devices:
for device in devices:
try:
self.update_device_up(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_up.append(device)
LOG.error("Failed to update device %s up", device)
else:
devices_up.append(device)
devices = kwargs.get('devices_down')
if devices:
for device in devices:
try:
dev = self.update_device_down(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_down.append(device)
LOG.error("Failed to update device %s down", device)
else:
devices_down.append(dev)
return {'devices_up': devices_up,
'failed_devices_up': failed_devices_up,
'devices_down': devices_down,
'failed_devices_down': failed_devices_down}
def get_ports_by_vnic_type_and_host(self, rpc_context, vnic_type, host):
plugin = directory.get_plugin()
return plugin.get_ports_by_vnic_type_and_host(
rpc_context, vnic_type=vnic_type, host=host)
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
1.4 - Added network_update
1.5 - Added binding_activate and binding_deactivate
"""
def __init__(self, topic):
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
self.topic_network_update = topics.get_topic_name(topic,
topics.NETWORK,
topics.UPDATE)
self.topic_port_binding_deactivate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.DEACTIVATE)
self.topic_port_binding_activate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.ACTIVATE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
def port_delete(self, context, port_id):
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')
cctxt.cast(context, 'network_update', network=network)
def binding_deactivate(self, context, port_id, host, network_id):
cctxt = self.client.prepare(topic=self.topic_port_binding_deactivate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_deactivate', port_id=port_id, host=host,
network_id=network_id)
def binding_activate(self, context, port_id, host):
cctxt = self.client.prepare(topic=self.topic_port_binding_activate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_activate', port_id=port_id, host=host)