diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index c240dae8c..dee9d2517 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -77,7 +77,9 @@ class PluginApi(n_rpc.RpcProxy): API version history: 1.0 - Initial version. - + 1.3 - get_device_details rpc signature upgrade to obtain 'host' and + return value to include fixed_ips and device_owner for + the device port ''' BASE_RPC_API_VERSION = '1.1' @@ -86,25 +88,31 @@ class PluginApi(n_rpc.RpcProxy): super(PluginApi, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) - def get_device_details(self, context, device, agent_id): + def get_device_details(self, context, device, agent_id, host=None): return self.call(context, self.make_msg('get_device_details', device=device, - agent_id=agent_id), + agent_id=agent_id, host=host), topic=self.topic) - def get_devices_details_list(self, context, devices, agent_id): + def get_devices_details_list(self, context, devices, agent_id, host=None): res = [] try: res = self.call(context, self.make_msg('get_devices_details_list', devices=devices, - agent_id=agent_id), - topic=self.topic, version='1.2') + agent_id=agent_id, + host=host), + topic=self.topic, version='1.3') except messaging.UnsupportedVersion: + # If the server has not been upgraded yet, a DVR-enabled agent + # may not work correctly, however it can function in 'degraded' + # mode, in that DVR routers may not be in the system yet, and + # it might be not necessary to retrieve info about the host. + LOG.warn(_('DVR functionality requires a server upgrade.')) res = [ self.call(context, self.make_msg('get_device_details', device=device, - agent_id=agent_id), + agent_id=agent_id, host=host), topic=self.topic) for device in devices ] diff --git a/neutron/api/rpc/handlers/__init__.py b/neutron/api/rpc/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/api/rpc/handlers/dvr_rpc.py b/neutron/api/rpc/handlers/dvr_rpc.py new file mode 100644 index 000000000..3a9623ead --- /dev/null +++ b/neutron/api/rpc/handlers/dvr_rpc.py @@ -0,0 +1,122 @@ +# Copyright 2014, Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import log +from neutron.common import topics +from neutron import manager +from neutron.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class DVRServerRpcApiMixin(object): + """Agent-side RPC (stub) for agent-to-plugin interaction.""" + + DVR_RPC_VERSION = "1.0" + + @log.log + def get_dvr_mac_address_by_host(self, context, host): + return self.call(context, + self.make_msg('get_dvr_mac_address_by_host', + host=host), + version=self.DVR_RPC_VERSION, + topic=self.topic) + + @log.log + def get_dvr_mac_address_list(self, context): + return self.call(context, + self.make_msg('get_dvr_mac_address_list'), + version=self.DVR_RPC_VERSION, + topic=self.topic) + + @log.log + def get_compute_ports_on_host_by_subnet(self, context, host, subnet): + return self.call(context, + self.make_msg('get_compute_ports_on_host_by_subnet', + host=host, + subnet=subnet), + version=self.DVR_RPC_VERSION, + topic=self.topic) + + @log.log + def get_subnet_for_dvr(self, context, subnet): + return self.call(context, + self.make_msg('get_subnet_for_dvr', + subnet=subnet), + version=self.DVR_RPC_VERSION, + topic=self.topic) + + +class DVRServerRpcCallbackMixin(object): + """Plugin-side RPC (implementation) for agent-to-plugin interaction.""" + + @property + def plugin(self): + if not getattr(self, '_plugin', None): + self._plugin = manager.NeutronManager.get_plugin() + return self._plugin + + def get_dvr_mac_address_list(self, context): + return self.plugin.get_dvr_mac_address_list(context) + + def get_dvr_mac_address_by_host(self, context, host): + return self.plugin.get_dvr_mac_address_by_host(context, host) + + def get_compute_ports_on_host_by_subnet(self, context, host, subnet): + return self.plugin.get_compute_ports_on_host_by_subnet(context, + host, + subnet) + + def get_subnet_for_dvr(self, context, subnet): + return self.plugin.get_subnet_for_dvr(context, subnet) + + +class DVRAgentRpcApiMixin(object): + """Plugin-side RPC (stub) for plugin-to-agent interaction.""" + + DVR_RPC_VERSION = "1.0" + + def _get_dvr_update_topic(self): + return topics.get_topic_name(self.topic, + topics.DVR, + topics.UPDATE) + + def dvr_mac_address_update(self, context, dvr_macs): + """Notify dvr mac address updates.""" + if not dvr_macs: + return + self.fanout_cast(context, + self.make_msg('dvr_mac_address_update', + dvr_macs=dvr_macs), + version=self.DVR_RPC_VERSION, + topic=self._get_dvr_update_topic()) + + +class DVRAgentRpcCallbackMixin(object): + """Agent-side RPC (implementation) for plugin-to-agent interaction.""" + + dvr_agent = None + + def dvr_mac_address_update(self, context, **kwargs): + """Callback for dvr_mac_addresses update. + + :param dvr_macs: list of updated dvr_macs + """ + dvr_macs = kwargs.get('dvr_macs', []) + LOG.debug("dvr_macs updated on remote: %s", dvr_macs) + if not self.dvr_agent: + LOG.warn(_("DVR agent binding currently not set.")) + return + self.dvr_agent.dvr_mac_address_update(dvr_macs) diff --git a/neutron/common/topics.py b/neutron/common/topics.py index 5e23bce25..9bb1956e7 100644 --- a/neutron/common/topics.py +++ b/neutron/common/topics.py @@ -18,6 +18,7 @@ SUBNET = 'subnet' PORT = 'port' SECURITY_GROUP = 'security_group' L2POPULATION = 'l2population' +DVR = 'dvr' CREATE = 'create' DELETE = 'delete' diff --git a/neutron/db/dvr_mac_db.py b/neutron/db/dvr_mac_db.py index 1e0913e1d..c590b3ac2 100644 --- a/neutron/db/dvr_mac_db.py +++ b/neutron/db/dvr_mac_db.py @@ -70,7 +70,7 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase): return dvrma def _create_dvr_mac_address(self, context, host): - """Create dvr mac address for a given host.""" + """Create DVR mac address for a given host.""" base_mac = cfg.CONF.dvr_base_mac.split(':') max_retries = cfg.CONF.mac_generation_retries for attempt in reversed(range(max_retries)): @@ -83,6 +83,10 @@ class DVRDbMixin(ext_dvr.DVRMacAddressPluginBase): LOG.debug("Generated DVR mac for host %(host)s " "is %(mac_address)s", {'host': host, 'mac_address': mac_address}) + dvr_macs = self.get_dvr_mac_address_list(context) + # TODO(vivek): improve scalability of this fanout by + # sending a single mac address rather than the entire set + self.notifier.dvr_mac_address_update(context, dvr_macs) return self._make_dvr_mac_address_dict(dvr_mac_binding) except db_exc.DBDuplicateEntry: LOG.debug("Generated DVR mac %(mac)s exists." diff --git a/neutron/db/l3_rpc_base.py b/neutron/db/l3_rpc_base.py index ef51523af..7de452262 100644 --- a/neutron/db/l3_rpc_base.py +++ b/neutron/db/l3_rpc_base.py @@ -70,18 +70,36 @@ class L3RpcCallbackMixin(object): LOG.debug(_("Checking router: %(id)s for host: %(host)s"), {'id': router['id'], 'host': host}) self._ensure_host_set_on_port(context, plugin, host, - router.get('gw_port')) + router.get('gw_port'), + router['id']) for interface in router.get(constants.INTERFACE_KEY, []): self._ensure_host_set_on_port(context, plugin, host, - interface) + interface, router['id']) - def _ensure_host_set_on_port(self, context, plugin, host, port): + def _ensure_host_set_on_port(self, context, plugin, host, port, + router_id=None): if (port and - (port.get(portbindings.HOST_ID) != host or + (port.get('device_owner') != + constants.DEVICE_OWNER_DVR_INTERFACE and + port.get(portbindings.HOST_ID) != host or port.get(portbindings.VIF_TYPE) == portbindings.VIF_TYPE_BINDING_FAILED)): + # All ports, including ports created for SNAT'ing for + # DVR are handled here plugin.update_port(context, port['id'], {'port': {portbindings.HOST_ID: host}}) + elif (port and + port.get('device_owner') == + constants.DEVICE_OWNER_DVR_INTERFACE): + # Ports that are DVR interfaces have multiple bindings (based on + # of hosts on which DVR router interfaces are spawned). Such + # bindings are created/updated here by invoking + # update_dvr_port_binding + plugin.update_dvr_port_binding(context, port['id'], + {'port': + {portbindings.HOST_ID: host, + 'device_id': router_id} + }) def get_external_network_id(self, context, **kwargs): """Get one external network id for l3 agent. diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 0eb7b809e..294deb668 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -938,7 +938,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return self._bind_port_if_needed(port_context) - def update_port_status(self, context, port_id, status): + def update_port_status(self, context, port_id, status, host=None): updated = False session = context.session # REVISIT: Serialize this operation with a semaphore to @@ -969,6 +969,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return True - def port_bound_to_host(self, port_id, host): + def port_bound_to_host(self, context, port_id, host): port_host = db.get_port_binding_host(port_id) return (port_host == host) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 2876231b8..553ba1f1f 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -14,6 +14,7 @@ # under the License. from neutron.agent import securitygroups_rpc as sg_rpc +from neutron.api.rpc.handlers import dvr_rpc from neutron.common import constants as q_const from neutron.common import rpc as n_rpc from neutron.common import topics @@ -23,6 +24,7 @@ from neutron.extensions import portbindings from neutron import manager from neutron.openstack.common import log from neutron.openstack.common import uuidutils +from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import db from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel @@ -37,14 +39,16 @@ TAP_DEVICE_PREFIX_LENGTH = 3 class RpcCallbacks(n_rpc.RpcCallback, dhcp_rpc_base.DhcpRpcCallbackMixin, + dvr_rpc.DVRServerRpcCallbackMixin, sg_db_rpc.SecurityGroupServerRpcCallbackMixin, type_tunnel.TunnelRpcCallbackMixin): - RPC_API_VERSION = '1.2' + RPC_API_VERSION = '1.3' # history # 1.0 Initial version (from openvswitch/linuxbridge) # 1.1 Support Security Group RPC # 1.2 Support get_devices_details_list + # 1.3 Support Distributed Virtual Router (DVR) def __init__(self, notifier, type_manager): self.setup_tunnel_callback_mixin(notifier, type_manager) @@ -78,9 +82,10 @@ class RpcCallbacks(n_rpc.RpcCallback, """Agent requests device details.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') - LOG.debug(_("Device %(device)s details requested by agent " - "%(agent_id)s"), - {'device': device, 'agent_id': agent_id}) + host = kwargs.get('host') + LOG.debug("Device %(device)s details requested by agent " + "%(agent_id)s with host %(host)s", + {'device': device, 'agent_id': agent_id, 'host': host}) port_id = self._device_to_port_id(device) plugin = manager.NeutronManager.get_plugin() @@ -109,7 +114,8 @@ class RpcCallbacks(n_rpc.RpcCallback, if port['status'] != new_status: plugin.update_port_status(rpc_context, port_id, - new_status) + new_status, + host) entry = {'device': device, 'network_id': port['network_id'], @@ -117,7 +123,9 @@ class RpcCallbacks(n_rpc.RpcCallback, 'admin_state_up': port['admin_state_up'], 'network_type': segment[api.NETWORK_TYPE], 'segmentation_id': segment[api.SEGMENTATION_ID], - 'physical_network': segment[api.PHYSICAL_NETWORK]} + 'physical_network': segment[api.PHYSICAL_NETWORK], + 'fixed_ips': port['fixed_ips'], + 'device_owner': port['device_owner']} LOG.debug(_("Returning: %s"), entry) return entry @@ -143,7 +151,8 @@ class RpcCallbacks(n_rpc.RpcCallback, plugin = manager.NeutronManager.get_plugin() port_id = self._device_to_port_id(device) port_exists = True - if (host and not plugin.port_bound_to_host(port_id, host)): + if (host and not plugin.port_bound_to_host(rpc_context, + port_id, host)): LOG.debug(_("Device %(device)s not bound to the" " agent host %(host)s"), {'device': device, 'host': host}) @@ -151,7 +160,8 @@ class RpcCallbacks(n_rpc.RpcCallback, 'exists': port_exists} port_exists = plugin.update_port_status(rpc_context, port_id, - q_const.PORT_STATUS_DOWN) + q_const.PORT_STATUS_DOWN, + host) return {'device': device, 'exists': port_exists} @@ -165,17 +175,42 @@ class RpcCallbacks(n_rpc.RpcCallback, {'device': device, 'agent_id': agent_id}) plugin = manager.NeutronManager.get_plugin() port_id = self._device_to_port_id(device) - if (host and not plugin.port_bound_to_host(port_id, host)): + if (host and not plugin.port_bound_to_host(rpc_context, + port_id, host)): LOG.debug(_("Device %(device)s not bound to the" " agent host %(host)s"), {'device': device, 'host': host}) return plugin.update_port_status(rpc_context, port_id, - q_const.PORT_STATUS_ACTIVE) + q_const.PORT_STATUS_ACTIVE, + host) + l3plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + if l3plugin: + l3plugin.dvr_vmarp_table_update(rpc_context, port_id, "add") + + def get_dvr_mac_address_by_host(self, rpc_context, **kwargs): + host = kwargs.get('host') + LOG.debug("DVR Agent requests mac_address for host %s", host) + return super(RpcCallbacks, self).get_dvr_mac_address_by_host( + rpc_context, host) + + def get_compute_ports_on_host_by_subnet(self, rpc_context, **kwargs): + host = kwargs.get('host') + subnet = kwargs.get('subnet') + LOG.debug("DVR Agent requests list of VM ports on host %s", host) + return super(RpcCallbacks, self).get_compute_ports_on_host_by_subnet( + rpc_context, host, subnet) + + def get_subnet_for_dvr(self, rpc_context, **kwargs): + subnet = kwargs.get('subnet') + return super(RpcCallbacks, self).get_subnet_for_dvr(rpc_context, + subnet) class AgentNotifierApi(n_rpc.RpcProxy, + dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 4fba320b5..06372561c 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -109,7 +109,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase): rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_devices_details_list(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) @@ -117,8 +118,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase): rpcapi, topics.PLUGIN, 'get_devices_details_list', rpc_method='call', devices=['fake_device1', 'fake_device2'], - agent_id='fake_agent_id', - version='1.2') + agent_id='fake_agent_id', host='fake_host', + version='1.3') def test_update_device_down(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/linuxbridge/test_rpcapi.py b/neutron/tests/unit/linuxbridge/test_rpcapi.py index 4d59b1a98..8b049e013 100644 --- a/neutron/tests/unit/linuxbridge/test_rpcapi.py +++ b/neutron/tests/unit/linuxbridge/test_rpcapi.py @@ -113,15 +113,16 @@ class rpcApiTestCase(base.BaseTestCase): self._test_lb_api(rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_devices_details_list(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_lb_api(rpcapi, topics.PLUGIN, 'get_devices_details_list', rpc_method='call', devices=['fake_device1', 'fake_device2'], - agent_id='fake_agent_id', - version='1.2') + agent_id='fake_agent_id', host='fake_host', + version='1.3') def test_update_device_down(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index 763642d0f..9eed323bf 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -89,15 +89,16 @@ class RpcApiTestCase(base.BaseTestCase): self._test_rpc_api(rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_devices_details_list(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_rpc_api(rpcapi, topics.PLUGIN, 'get_devices_details_list', rpc_method='call', devices=['fake_device1', 'fake_device2'], - agent_id='fake_agent_id', - version='1.2') + agent_id='fake_agent_id', host='fake_host', + version='1.3') def test_update_device_down(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/mlnx/test_rpcapi.py b/neutron/tests/unit/mlnx/test_rpcapi.py index 3f4ef9d1d..b094ffb16 100644 --- a/neutron/tests/unit/mlnx/test_rpcapi.py +++ b/neutron/tests/unit/mlnx/test_rpcapi.py @@ -136,15 +136,16 @@ class rpcApiTestCase(base.BaseTestCase): self._test_mlnx_api(rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_devices_details_list(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_mlnx_api(rpcapi, topics.PLUGIN, 'get_devices_details_list', rpc_method='call', devices=['fake_device1', 'fake_device1'], - agent_id='fake_agent_id', - version='1.2') + agent_id='fake_agent_id', host='fake_host', + version='1.3') def test_update_device_down(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py index 7ef620292..9d61dd375 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py +++ b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py @@ -97,15 +97,16 @@ class rpcApiTestCase(base.BaseTestCase): self._test_ovs_api(rpcapi, topics.PLUGIN, 'get_device_details', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_devices_details_list(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_ovs_api(rpcapi, topics.PLUGIN, 'get_devices_details_list', rpc_method='call', devices=['fake_device1', 'fake_device2'], - agent_id='fake_agent_id', - version='1.2') + agent_id='fake_agent_id', host='fake_host', + version='1.3') def test_update_device_down(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN)