diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 16f1cfb0293..3dbed74de72 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -94,16 +94,16 @@ class PluginApi(proxy.RpcProxy): agent_id=agent_id), topic=self.topic) - def update_device_down(self, context, device, agent_id): + def update_device_down(self, context, device, agent_id, host=None): return self.call(context, self.make_msg('update_device_down', device=device, - agent_id=agent_id), + agent_id=agent_id, host=host), topic=self.topic) - def update_device_up(self, context, device, agent_id): + def update_device_up(self, context, device, agent_id, host=None): return self.call(context, self.make_msg('update_device_up', device=device, - agent_id=agent_id), + agent_id=agent_id, host=host), topic=self.topic) def tunnel_sync(self, context, tunnel_ip, tunnel_type=None): diff --git a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py index c4421a3b3b7..209b9122c43 100644 --- a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py +++ b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py @@ -319,7 +319,8 @@ class HyperVNeutronAgent(object): try: self.plugin_rpc.update_device_down(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) except Exception as e: LOG.debug( _("Removing port failed for device %(device)s: %(e)s"), diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index 549a08c9e64..b25f56fb5fd 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -653,11 +653,13 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # update plugin about port status self.agent.plugin_rpc.update_device_up(self.context, tap_device_name, - self.agent.agent_id) + self.agent.agent_id, + cfg.CONF.host) else: self.plugin_rpc.update_device_down(self.context, tap_device_name, - self.agent.agent_id) + self.agent.agent_id, + cfg.CONF.host) else: bridge_name = self.agent.br_mgr.get_bridge_name( port['network_id']) @@ -666,7 +668,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # update plugin about port status self.agent.plugin_rpc.update_device_down(self.context, tap_device_name, - self.agent.agent_id) + self.agent.agent_id, + cfg.CONF.host) except rpc_common.Timeout: LOG.error(_("RPC timeout while updating port %s"), port['id']) @@ -855,11 +858,13 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): # update plugin about port status self.plugin_rpc.update_device_up(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) else: self.plugin_rpc.update_device_down(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) else: self.remove_port_binding(details['network_id'], details['port_id']) @@ -875,7 +880,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): try: details = self.plugin_rpc.update_device_down(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) except Exception as e: LOG.debug(_("port_removed failed for %(device)s: %(e)s"), {'device': device, 'e': e}) diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 588dd4ece94..1920d0e8d96 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -41,6 +41,7 @@ from neutron.db import quota_db # noqa from neutron.db import securitygroups_rpc_base as sg_db_rpc from neutron.extensions import portbindings from neutron.extensions import providernet as provider +from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import rpc @@ -116,13 +117,20 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # TODO(garyk) - live migration and port status agent_id = kwargs.get('agent_id') device = kwargs.get('device') + host = kwargs.get('host') + port = self.get_port_from_device(device) LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port = self.get_port_from_device(device) + plugin = manager.NeutronManager.get_plugin() if port: entry = {'device': device, 'exists': True} - if port['status'] != q_const.PORT_STATUS_DOWN: + if (host and not + plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + elif port['status'] != q_const.PORT_STATUS_DOWN: # Set port status to DOWN db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN) else: @@ -135,13 +143,21 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, """Device is up on agent.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') - LOG.debug(_("Device %(device)s up %(agent_id)s"), + host = kwargs.get('host') + port = self.get_port_from_device.get_port(device) + LOG.debug(_("Device %(device)s up on %(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port = self.get_port_from_device(device) + plugin = manager.NeutronManager.get_plugin() if port: - if port['status'] != q_const.PORT_STATUS_ACTIVE: - # Set port status to ACTIVE - db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE) + if (host and + not plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + return + elif port['status'] != q_const.PORT_STATUS_ACTIVE: + db.set_port_status(port['id'], + q_const.PORT_STATUS_ACTIVE) else: LOG.debug(_("%s can not be found in database"), device) diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index c987d78602a..bf911d3d1a2 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -119,3 +119,17 @@ def get_port_and_sgs(port_id): port_dict['fixed_ips'] = [ip['ip_address'] for ip in port['fixed_ips']] return port_dict + + +def get_port_binding_host(port_id): + session = db_api.get_session() + with session.begin(subtransactions=True): + try: + query = (session.query(models.PortBinding). + filter(models.PortBinding.port_id.startswith(port_id)). + one()) + except exc.NoResultFound: + LOG.debug(_("No binding found for port %(port_id)s"), + {'port_id': port_id}) + return + return query.host diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 3c8a7d5f28e..ce4c8d635f1 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -591,7 +591,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.warning(_("Port %(port)s updated up by agent not found"), {'port': port_id}) return False - if port.status != status: original_port = self._make_port_dict(port) port.status = status @@ -608,3 +607,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.mechanism_manager.update_port_postcommit(mech_context) return True + + def port_bound_to_host(self, port_id, host): + port_host = db.get_port_binding_host(port_id) + return (port_host == host) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 4ead0e33942..f44a3eb1e32 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -153,12 +153,20 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # TODO(garyk) - live migration and port status agent_id = kwargs.get('agent_id') device = kwargs.get('device') + host = kwargs.get('host') LOG.debug(_("Device %(device)s no longer exists at agent " "%(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port_id = self._device_to_port_id(device) - plugin = manager.NeutronManager.get_plugin() + port_id = self._device_to_port_id(device) + port_exists = True + if (host and not plugin.port_bound_to_host(port_id, host)): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + return {'device': device, + 'exists': port_exists} + port_exists = plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_DOWN) @@ -169,11 +177,17 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, """Device is up on agent.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') + host = kwargs.get('host') LOG.debug(_("Device %(device)s up at agent %(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port_id = self._device_to_port_id(device) - plugin = manager.NeutronManager.get_plugin() + port_id = self._device_to_port_id(device) + if (host and not plugin.port_bound_to_host(port_id, host)): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + return + plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_ACTIVE) diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index eefe384367f..d702b0b8ec2 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -297,11 +297,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if port['admin_state_up']: # update plugin about port status self.plugin_rpc.update_device_up(self.context, port['id'], - self.agent_id) + self.agent_id, + cfg.CONF.host) else: # update plugin about port status self.plugin_rpc.update_device_down(self.context, port['id'], - self.agent_id) + self.agent_id, + cfg.CONF.host) except rpc_common.Timeout: LOG.error(_("RPC timeout while updating port %s"), port['id']) @@ -910,7 +912,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # update plugin about port status self.plugin_rpc.update_device_up(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) else: LOG.debug(_("Device %s not defined on plugin"), device) if (port and int(port.ofport) != -1): @@ -934,7 +937,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # update plugin about port status self.plugin_rpc.update_device_up(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) return resync def treat_devices_removed(self, devices): @@ -945,7 +949,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, try: details = self.plugin_rpc.update_device_down(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) except Exception as e: LOG.debug(_("port_removed failed for %(device)s: %(e)s"), {'device': device, 'e': e}) @@ -966,7 +971,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, try: details = self.plugin_rpc.update_device_down(self.context, device, - self.agent_id) + self.agent_id, + cfg.CONF.host) except Exception as e: LOG.debug(_("port_removed failed for %(device)s: %(e)s"), {'device': device, 'e': e}) diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index f7222660525..6377c073ba5 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -52,6 +52,7 @@ from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import extra_dhcp_opt as edo_ext from neutron.extensions import portbindings from neutron.extensions import providernet as provider +from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import rpc @@ -123,18 +124,25 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" - # TODO(garyk) - live migration and port status agent_id = kwargs.get('agent_id') device = kwargs.get('device') + host = kwargs.get('host') + port = ovs_db_v2.get_port(device) LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port = ovs_db_v2.get_port(device) if port: entry = {'device': device, 'exists': True} - if port['status'] != q_const.PORT_STATUS_DOWN: + plugin = manager.NeutronManager.get_plugin() + if (host and + not plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + elif port['status'] != q_const.PORT_STATUS_DOWN: # Set port status to DOWN - ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_DOWN) + ovs_db_v2.set_port_status(port['id'], + q_const.PORT_STATUS_DOWN) else: entry = {'device': device, 'exists': False} @@ -145,11 +153,19 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, """Device is up on agent.""" agent_id = kwargs.get('agent_id') device = kwargs.get('device') + host = kwargs.get('host') + port = ovs_db_v2.get_port(device) LOG.debug(_("Device %(device)s up on %(agent_id)s"), {'device': device, 'agent_id': agent_id}) - port = ovs_db_v2.get_port(device) + plugin = manager.NeutronManager.get_plugin() if port: - if port['status'] != q_const.PORT_STATUS_ACTIVE: + if (host and + not plugin.get_port_host(rpc_context, port['id']) == host): + LOG.debug(_("Device %(device)s not bound to the" + " agent host %(host)s"), + {'device': device, 'host': host}) + return + elif port['status'] != q_const.PORT_STATUS_ACTIVE: ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE) else: diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 70fe33da1cc..2765acf0437 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -116,7 +116,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase): rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_tunnel_sync(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py index fd40f74d211..b0846439cb1 100644 --- a/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py +++ b/neutron/tests/unit/linuxbridge/test_lb_neutron_agent.py @@ -753,7 +753,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase): rpc_obj.update_device_down.assert_called_with( self.lb_rpc.context, "tap123", - self.lb_rpc.agent.agent_id + self.lb_rpc.agent.agent_id, + cfg.CONF.host ) def test_port_update_plugin_rpc_failed(self): diff --git a/neutron/tests/unit/linuxbridge/test_rpcapi.py b/neutron/tests/unit/linuxbridge/test_rpcapi.py index 1db75a12bb6..e6e8587e0ec 100644 --- a/neutron/tests/unit/linuxbridge/test_rpcapi.py +++ b/neutron/tests/unit/linuxbridge/test_rpcapi.py @@ -118,11 +118,13 @@ class rpcApiTestCase(base.BaseTestCase): self._test_lb_api(rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_update_device_up(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_lb_api(rpcapi, topics.PLUGIN, 'update_device_up', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a51c6a3c500..0f4a858b72b 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -93,7 +93,8 @@ class RpcApiTestCase(base.BaseTestCase): self._test_rpc_api(rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_tunnel_sync(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) @@ -107,4 +108,5 @@ class RpcApiTestCase(base.BaseTestCase): self._test_rpc_api(rpcapi, topics.PLUGIN, 'update_device_up', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') diff --git a/neutron/tests/unit/mlnx/test_rpcapi.py b/neutron/tests/unit/mlnx/test_rpcapi.py index 820c33a6073..900a50f0fd8 100644 --- a/neutron/tests/unit/mlnx/test_rpcapi.py +++ b/neutron/tests/unit/mlnx/test_rpcapi.py @@ -141,11 +141,13 @@ class rpcApiTestCase(base.BaseTestCase): self._test_mlnx_api(rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_update_device_up(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_mlnx_api(rpcapi, topics.PLUGIN, 'update_device_up', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index e74fc314958..c292eaf0c41 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -288,7 +288,8 @@ class TestOvsNeutronAgent(base.BaseTestCase): "124", "vlan", "physnet", "1", False) upddown_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id) + "123", self.agent.agent_id, + cfg.CONF.host) port["admin_state_up"] = True self.agent.port_update("unused_context", @@ -297,7 +298,8 @@ class TestOvsNeutronAgent(base.BaseTestCase): segmentation_id="1", physical_network="physnet") updup_fn.assert_called_with(self.agent.context, - "123", self.agent.agent_id) + "123", self.agent.agent_id, + cfg.CONF.host) def test_port_update_plugin_rpc_failed(self): port = {'id': 1, diff --git a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py index 868d21dff5d..1a480a44a42 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py +++ b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py @@ -102,7 +102,8 @@ class rpcApiTestCase(base.BaseTestCase): self._test_ovs_api(rpcapi, topics.PLUGIN, 'update_device_down', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host') def test_tunnel_sync(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) @@ -116,4 +117,5 @@ class rpcApiTestCase(base.BaseTestCase): self._test_ovs_api(rpcapi, topics.PLUGIN, 'update_device_up', rpc_method='call', device='fake_device', - agent_id='fake_agent_id') + agent_id='fake_agent_id', + host='fake_host')