Merge "RPC additions to support DVR"

This commit is contained in:
Jenkins 2014-07-17 21:05:53 +00:00 committed by Gerrit Code Review
commit 68713c94ee
13 changed files with 232 additions and 39 deletions

View File

@ -77,7 +77,9 @@ class PluginApi(n_rpc.RpcProxy):
API version history:
1.0 - Initial version.
1.3 - get_device_details rpc signature upgrade to obtain 'host' and
return value to include fixed_ips and device_owner for
the device port
'''
BASE_RPC_API_VERSION = '1.1'
@ -86,25 +88,31 @@ class PluginApi(n_rpc.RpcProxy):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
def get_device_details(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
agent_id=agent_id, host=host),
topic=self.topic)
def get_devices_details_list(self, context, devices, agent_id):
def get_devices_details_list(self, context, devices, agent_id, host=None):
res = []
try:
res = self.call(context,
self.make_msg('get_devices_details_list',
devices=devices,
agent_id=agent_id),
topic=self.topic, version='1.2')
agent_id=agent_id,
host=host),
topic=self.topic, version='1.3')
except messaging.UnsupportedVersion:
# If the server has not been upgraded yet, a DVR-enabled agent
# may not work correctly, however it can function in 'degraded'
# mode, in that DVR routers may not be in the system yet, and
# it might be not necessary to retrieve info about the host.
LOG.warn(_('DVR functionality requires a server upgrade.'))
res = [
self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
agent_id=agent_id, host=host),
topic=self.topic)
for device in devices
]

View File

View File

@ -0,0 +1,122 @@
# Copyright 2014, Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import log
from neutron.common import topics
from neutron import manager
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class DVRServerRpcApiMixin(object):
"""Agent-side RPC (stub) for agent-to-plugin interaction."""
DVR_RPC_VERSION = "1.0"
@log.log
def get_dvr_mac_address_by_host(self, context, host):
return self.call(context,
self.make_msg('get_dvr_mac_address_by_host',
host=host),
version=self.DVR_RPC_VERSION,
topic=self.topic)
@log.log
def get_dvr_mac_address_list(self, context):
return self.call(context,
self.make_msg('get_dvr_mac_address_list'),
version=self.DVR_RPC_VERSION,
topic=self.topic)
@log.log
def get_compute_ports_on_host_by_subnet(self, context, host, subnet):
return self.call(context,
self.make_msg('get_compute_ports_on_host_by_subnet',
host=host,
subnet=subnet),
version=self.DVR_RPC_VERSION,
topic=self.topic)
@log.log
def get_subnet_for_dvr(self, context, subnet):
return self.call(context,
self.make_msg('get_subnet_for_dvr',
subnet=subnet),
version=self.DVR_RPC_VERSION,
topic=self.topic)
class DVRServerRpcCallbackMixin(object):
"""Plugin-side RPC (implementation) for agent-to-plugin interaction."""
@property
def plugin(self):
if not getattr(self, '_plugin', None):
self._plugin = manager.NeutronManager.get_plugin()
return self._plugin
def get_dvr_mac_address_list(self, context):
return self.plugin.get_dvr_mac_address_list(context)
def get_dvr_mac_address_by_host(self, context, host):
return self.plugin.get_dvr_mac_address_by_host(context, host)
def get_compute_ports_on_host_by_subnet(self, context, host, subnet):
return self.plugin.get_compute_ports_on_host_by_subnet(context,
host,
subnet)
def get_subnet_for_dvr(self, context, subnet):
return self.plugin.get_subnet_for_dvr(context, subnet)
class DVRAgentRpcApiMixin(object):
"""Plugin-side RPC (stub) for plugin-to-agent interaction."""
DVR_RPC_VERSION = "1.0"
def _get_dvr_update_topic(self):
return topics.get_topic_name(self.topic,
topics.DVR,
topics.UPDATE)
def dvr_mac_address_update(self, context, dvr_macs):
"""Notify dvr mac address updates."""
if not dvr_macs:
return
self.fanout_cast(context,
self.make_msg('dvr_mac_address_update',
dvr_macs=dvr_macs),
version=self.DVR_RPC_VERSION,
topic=self._get_dvr_update_topic())
class DVRAgentRpcCallbackMixin(object):
"""Agent-side RPC (implementation) for plugin-to-agent interaction."""
dvr_agent = None
def dvr_mac_address_update(self, context, **kwargs):
"""Callback for dvr_mac_addresses update.
:param dvr_macs: list of updated dvr_macs
"""
dvr_macs = kwargs.get('dvr_macs', [])
LOG.debug("dvr_macs updated on remote: %s", dvr_macs)
if not self.dvr_agent:
LOG.warn(_("DVR agent binding currently not set."))
return
self.dvr_agent.dvr_mac_address_update(dvr_macs)

View File

@ -18,6 +18,7 @@ SUBNET = 'subnet'
PORT = 'port'
SECURITY_GROUP = 'security_group'
L2POPULATION = 'l2population'
DVR = 'dvr'
CREATE = 'create'
DELETE = 'delete'

View File

@ -70,7 +70,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
return dvrma
def _create_dvr_mac_address(self, context, host):
"""Create dvr mac address for a given host."""
"""Create DVR mac address for a given host."""
base_mac = cfg.CONF.dvr_base_mac.split(':')
max_retries = cfg.CONF.mac_generation_retries
for attempt in reversed(range(max_retries)):
@ -83,6 +83,10 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase):
LOG.debug("Generated DVR mac for host %(host)s "
"is %(mac_address)s",
{'host': host, 'mac_address': mac_address})
dvr_macs = self.get_dvr_mac_address_list(context)
# TODO(vivek): improve scalability of this fanout by
# sending a single mac address rather than the entire set
self.notifier.dvr_mac_address_update(context, dvr_macs)
return self._make_dvr_mac_address_dict(dvr_mac_binding)
except db_exc.DBDuplicateEntry:
LOG.debug("Generated DVR mac %(mac)s exists."

View File

@ -70,18 +70,36 @@ class L3RpcCallbackMixin(object):
LOG.debug(_("Checking router: %(id)s for host: %(host)s"),
{'id': router['id'], 'host': host})
self._ensure_host_set_on_port(context, plugin, host,
router.get('gw_port'))
router.get('gw_port'),
router['id'])
for interface in router.get(constants.INTERFACE_KEY, []):
self._ensure_host_set_on_port(context, plugin, host,
interface)
interface, router['id'])
def _ensure_host_set_on_port(self, context, plugin, host, port):
def _ensure_host_set_on_port(self, context, plugin, host, port,
router_id=None):
if (port and
(port.get(portbindings.HOST_ID) != host or
(port.get('device_owner') !=
constants.DEVICE_OWNER_DVR_INTERFACE and
port.get(portbindings.HOST_ID) != host or
port.get(portbindings.VIF_TYPE) ==
portbindings.VIF_TYPE_BINDING_FAILED)):
# All ports, including ports created for SNAT'ing for
# DVR are handled here
plugin.update_port(context, port['id'],
{'port': {portbindings.HOST_ID: host}})
elif (port and
port.get('device_owner') ==
constants.DEVICE_OWNER_DVR_INTERFACE):
# Ports that are DVR interfaces have multiple bindings (based on
# of hosts on which DVR router interfaces are spawned). Such
# bindings are created/updated here by invoking
# update_dvr_port_binding
plugin.update_dvr_port_binding(context, port['id'],
{'port':
{portbindings.HOST_ID: host,
'device_id': router_id}
})
def get_external_network_id(self, context, **kwargs):
"""Get one external network id for l3 agent.

View File

@ -938,7 +938,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._bind_port_if_needed(port_context)
def update_port_status(self, context, port_id, status):
def update_port_status(self, context, port_id, status, host=None):
updated = False
session = context.session
# REVISIT: Serialize this operation with a semaphore to
@ -969,6 +969,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return True
def port_bound_to_host(self, port_id, host):
def port_bound_to_host(self, context, port_id, host):
port_host = db.get_port_binding_host(port_id)
return (port_host == host)

View File

@ -14,6 +14,7 @@
# under the License.
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
@ -23,6 +24,7 @@ from neutron.extensions import portbindings
from neutron import manager
from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
@ -37,14 +39,16 @@ TAP_DEVICE_PREFIX_LENGTH = 3
class RpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
dvr_rpc.DVRServerRpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin):
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
# 1.3 Support Distributed Virtual Router (DVR)
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
@ -78,9 +82,10 @@ class RpcCallbacks(n_rpc.RpcCallback,
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested by agent "
"%(agent_id)s"),
{'device': device, 'agent_id': agent_id})
host = kwargs.get('host')
LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin()
@ -109,7 +114,8 @@ class RpcCallbacks(n_rpc.RpcCallback,
if port['status'] != new_status:
plugin.update_port_status(rpc_context,
port_id,
new_status)
new_status,
host)
entry = {'device': device,
'network_id': port['network_id'],
@ -117,7 +123,9 @@ class RpcCallbacks(n_rpc.RpcCallback,
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK]}
'physical_network': segment[api.PHYSICAL_NETWORK],
'fixed_ips': port['fixed_ips'],
'device_owner': port['device_owner']}
LOG.debug(_("Returning: %s"), entry)
return entry
@ -143,7 +151,8 @@ class RpcCallbacks(n_rpc.RpcCallback,
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
port_exists = True
if (host and not plugin.port_bound_to_host(port_id, host)):
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
@ -151,7 +160,8 @@ class RpcCallbacks(n_rpc.RpcCallback,
'exists': port_exists}
port_exists = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_DOWN)
q_const.PORT_STATUS_DOWN,
host)
return {'device': device,
'exists': port_exists}
@ -165,17 +175,42 @@ class RpcCallbacks(n_rpc.RpcCallback,
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
if (host and not plugin.port_bound_to_host(port_id, host)):
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE)
q_const.PORT_STATUS_ACTIVE,
host)
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if l3plugin:
l3plugin.dvr_vmarp_table_update(rpc_context, port_id, "add")
def get_dvr_mac_address_by_host(self, rpc_context, **kwargs):
host = kwargs.get('host')
LOG.debug("DVR Agent requests mac_address for host %s", host)
return super(RpcCallbacks, self).get_dvr_mac_address_by_host(
rpc_context, host)
def get_compute_ports_on_host_by_subnet(self, rpc_context, **kwargs):
host = kwargs.get('host')
subnet = kwargs.get('subnet')
LOG.debug("DVR Agent requests list of VM ports on host %s", host)
return super(RpcCallbacks, self).get_compute_ports_on_host_by_subnet(
rpc_context, host, subnet)
def get_subnet_for_dvr(self, rpc_context, **kwargs):
subnet = kwargs.get('subnet')
return super(RpcCallbacks, self).get_subnet_for_dvr(rpc_context,
subnet)
class AgentNotifierApi(n_rpc.RpcProxy,
dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.

View File

@ -109,7 +109,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@ -117,8 +118,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id',
version='1.2')
agent_id='fake_agent_id', host='fake_host',
version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -113,15 +113,16 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id',
version='1.2')
agent_id='fake_agent_id', host='fake_host',
version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -89,15 +89,16 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id',
version='1.2')
agent_id='fake_agent_id', host='fake_host',
version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -136,15 +136,16 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device1'],
agent_id='fake_agent_id',
version='1.2')
agent_id='fake_agent_id', host='fake_host',
version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -97,15 +97,16 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_ovs_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
agent_id='fake_agent_id',
host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id',
version='1.2')
agent_id='fake_agent_id', host='fake_host',
version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)