change port status only if port is bound to the good host

if host is set in the rpc message update_device_up/down sent by the agent,
the port status will be changed only if the port is bound to the host.

Change-Id: I0e607c734fbebf0b69f83c3bbd3e25a9783672dc
Closes-Bug: #1224967
This commit is contained in:
mathieu-rohon 2013-10-02 14:13:36 +02:00
parent 7da3d24f8a
commit f0f87ca56b
16 changed files with 135 additions and 47 deletions

View File

@ -94,16 +94,16 @@ class PluginApi(proxy.RpcProxy):
agent_id=agent_id), agent_id=agent_id),
topic=self.topic) topic=self.topic)
def update_device_down(self, context, device, agent_id): def update_device_down(self, context, device, agent_id, host=None):
return self.call(context, return self.call(context,
self.make_msg('update_device_down', device=device, self.make_msg('update_device_down', device=device,
agent_id=agent_id), agent_id=agent_id, host=host),
topic=self.topic) topic=self.topic)
def update_device_up(self, context, device, agent_id): def update_device_up(self, context, device, agent_id, host=None):
return self.call(context, return self.call(context,
self.make_msg('update_device_up', device=device, self.make_msg('update_device_up', device=device,
agent_id=agent_id), agent_id=agent_id, host=host),
topic=self.topic) topic=self.topic)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None): def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):

View File

@ -319,7 +319,8 @@ class HyperVNeutronAgent(object):
try: try:
self.plugin_rpc.update_device_down(self.context, self.plugin_rpc.update_device_down(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
except Exception as e: except Exception as e:
LOG.debug( LOG.debug(
_("Removing port failed for device %(device)s: %(e)s"), _("Removing port failed for device %(device)s: %(e)s"),

View File

@ -653,11 +653,13 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status # update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context, self.agent.plugin_rpc.update_device_up(self.context,
tap_device_name, tap_device_name,
self.agent.agent_id) self.agent.agent_id,
cfg.CONF.host)
else: else:
self.plugin_rpc.update_device_down(self.context, self.plugin_rpc.update_device_down(self.context,
tap_device_name, tap_device_name,
self.agent.agent_id) self.agent.agent_id,
cfg.CONF.host)
else: else:
bridge_name = self.agent.br_mgr.get_bridge_name( bridge_name = self.agent.br_mgr.get_bridge_name(
port['network_id']) port['network_id'])
@ -666,7 +668,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status # update plugin about port status
self.agent.plugin_rpc.update_device_down(self.context, self.agent.plugin_rpc.update_device_down(self.context,
tap_device_name, tap_device_name,
self.agent.agent_id) self.agent.agent_id,
cfg.CONF.host)
except rpc_common.Timeout: except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id']) LOG.error(_("RPC timeout while updating port %s"), port['id'])
@ -855,11 +858,13 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
# update plugin about port status # update plugin about port status
self.plugin_rpc.update_device_up(self.context, self.plugin_rpc.update_device_up(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
else: else:
self.plugin_rpc.update_device_down(self.context, self.plugin_rpc.update_device_down(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
else: else:
self.remove_port_binding(details['network_id'], self.remove_port_binding(details['network_id'],
details['port_id']) details['port_id'])
@ -875,7 +880,8 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
try: try:
details = self.plugin_rpc.update_device_down(self.context, details = self.plugin_rpc.update_device_down(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
except Exception as e: except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"), LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e}) {'device': device, 'e': e})

View File

@ -41,6 +41,7 @@ from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron.extensions import providernet as provider from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc from neutron.openstack.common import rpc
@ -116,13 +117,20 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# TODO(garyk) - live migration and port status # TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device) plugin = manager.NeutronManager.get_plugin()
if port: if port:
entry = {'device': device, entry = {'device': device,
'exists': True} 'exists': True}
if port['status'] != q_const.PORT_STATUS_DOWN: if (host and not
plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN # Set port status to DOWN
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN) db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else: else:
@ -135,13 +143,21 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent.""" """Device is up on agent."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
LOG.debug(_("Device %(device)s up %(agent_id)s"), host = kwargs.get('host')
port = self.get_port_from_device.get_port(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device) plugin = manager.NeutronManager.get_plugin()
if port: if port:
if port['status'] != q_const.PORT_STATUS_ACTIVE: if (host and
# Set port status to ACTIVE not plugin.get_port_host(rpc_context, port['id']) == host):
db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE) LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
elif port['status'] != q_const.PORT_STATUS_ACTIVE:
db.set_port_status(port['id'],
q_const.PORT_STATUS_ACTIVE)
else: else:
LOG.debug(_("%s can not be found in database"), device) LOG.debug(_("%s can not be found in database"), device)

View File

@ -119,3 +119,17 @@ def get_port_and_sgs(port_id):
port_dict['fixed_ips'] = [ip['ip_address'] port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']] for ip in port['fixed_ips']]
return port_dict return port_dict
def get_port_binding_host(port_id):
session = db_api.get_session()
with session.begin(subtransactions=True):
try:
query = (session.query(models.PortBinding).
filter(models.PortBinding.port_id.startswith(port_id)).
one())
except exc.NoResultFound:
LOG.debug(_("No binding found for port %(port_id)s"),
{'port_id': port_id})
return
return query.host

View File

@ -591,7 +591,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.warning(_("Port %(port)s updated up by agent not found"), LOG.warning(_("Port %(port)s updated up by agent not found"),
{'port': port_id}) {'port': port_id})
return False return False
if port.status != status: if port.status != status:
original_port = self._make_port_dict(port) original_port = self._make_port_dict(port)
port.status = status port.status = status
@ -608,3 +607,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.update_port_postcommit(mech_context) self.mechanism_manager.update_port_postcommit(mech_context)
return True return True
def port_bound_to_host(self, port_id, host):
port_host = db.get_port_binding_host(port_id)
return (port_host == host)

View File

@ -153,12 +153,20 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
# TODO(garyk) - live migration and port status # TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host')
LOG.debug(_("Device %(device)s no longer exists at agent " LOG.debug(_("Device %(device)s no longer exists at agent "
"%(agent_id)s"), "%(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
port_exists = True
if (host and not plugin.port_bound_to_host(port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return {'device': device,
'exists': port_exists}
port_exists = plugin.update_port_status(rpc_context, port_id, port_exists = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_DOWN) q_const.PORT_STATUS_DOWN)
@ -169,11 +177,17 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent.""" """Device is up on agent."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host')
LOG.debug(_("Device %(device)s up at agent %(agent_id)s"), LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
if (host and not plugin.port_bound_to_host(port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
plugin.update_port_status(rpc_context, port_id, plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE) q_const.PORT_STATUS_ACTIVE)

View File

@ -297,11 +297,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if port['admin_state_up']: if port['admin_state_up']:
# update plugin about port status # update plugin about port status
self.plugin_rpc.update_device_up(self.context, port['id'], self.plugin_rpc.update_device_up(self.context, port['id'],
self.agent_id) self.agent_id,
cfg.CONF.host)
else: else:
# update plugin about port status # update plugin about port status
self.plugin_rpc.update_device_down(self.context, port['id'], self.plugin_rpc.update_device_down(self.context, port['id'],
self.agent_id) self.agent_id,
cfg.CONF.host)
except rpc_common.Timeout: except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id']) LOG.error(_("RPC timeout while updating port %s"), port['id'])
@ -910,7 +912,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status # update plugin about port status
self.plugin_rpc.update_device_up(self.context, self.plugin_rpc.update_device_up(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
else: else:
LOG.debug(_("Device %s not defined on plugin"), device) LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1): if (port and int(port.ofport) != -1):
@ -934,7 +937,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# update plugin about port status # update plugin about port status
self.plugin_rpc.update_device_up(self.context, self.plugin_rpc.update_device_up(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
return resync return resync
def treat_devices_removed(self, devices): def treat_devices_removed(self, devices):
@ -945,7 +949,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
try: try:
details = self.plugin_rpc.update_device_down(self.context, details = self.plugin_rpc.update_device_down(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
except Exception as e: except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"), LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e}) {'device': device, 'e': e})
@ -966,7 +971,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
try: try:
details = self.plugin_rpc.update_device_down(self.context, details = self.plugin_rpc.update_device_down(self.context,
device, device,
self.agent_id) self.agent_id,
cfg.CONF.host)
except Exception as e: except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"), LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e}) {'device': device, 'e': e})

View File

@ -52,6 +52,7 @@ from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import extra_dhcp_opt as edo_ext from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron.extensions import providernet as provider from neutron.extensions import providernet as provider
from neutron import manager
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc from neutron.openstack.common import rpc
@ -123,18 +124,25 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
def update_device_down(self, rpc_context, **kwargs): def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent.""" """Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host')
port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = ovs_db_v2.get_port(device)
if port: if port:
entry = {'device': device, entry = {'device': device,
'exists': True} 'exists': True}
if port['status'] != q_const.PORT_STATUS_DOWN: plugin = manager.NeutronManager.get_plugin()
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN # Set port status to DOWN
ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_DOWN) ovs_db_v2.set_port_status(port['id'],
q_const.PORT_STATUS_DOWN)
else: else:
entry = {'device': device, entry = {'device': device,
'exists': False} 'exists': False}
@ -145,11 +153,19 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
"""Device is up on agent.""" """Device is up on agent."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host')
port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"), LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = ovs_db_v2.get_port(device) plugin = manager.NeutronManager.get_plugin()
if port: if port:
if port['status'] != q_const.PORT_STATUS_ACTIVE: if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
elif port['status'] != q_const.PORT_STATUS_ACTIVE:
ovs_db_v2.set_port_status(port['id'], ovs_db_v2.set_port_status(port['id'],
q_const.PORT_STATUS_ACTIVE) q_const.PORT_STATUS_ACTIVE)
else: else:

View File

@ -116,7 +116,8 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
rpcapi, topics.PLUGIN, rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self): def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -753,7 +753,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
rpc_obj.update_device_down.assert_called_with( rpc_obj.update_device_down.assert_called_with(
self.lb_rpc.context, self.lb_rpc.context,
"tap123", "tap123",
self.lb_rpc.agent.agent_id self.lb_rpc.agent.agent_id,
cfg.CONF.host
) )
def test_port_update_plugin_rpc_failed(self): def test_port_update_plugin_rpc_failed(self):

View File

@ -118,11 +118,13 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_lb_api(rpcapi, topics.PLUGIN, self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')
def test_update_device_up(self): def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN, self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call', 'update_device_up', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')

View File

@ -93,7 +93,8 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, topics.PLUGIN, self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self): def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@ -107,4 +108,5 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, topics.PLUGIN, self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call', 'update_device_up', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')

View File

@ -141,11 +141,13 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_mlnx_api(rpcapi, topics.PLUGIN, self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')
def test_update_device_up(self): def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN, self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call', 'update_device_up', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')

View File

@ -288,7 +288,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
"124", "vlan", "physnet", "124", "vlan", "physnet",
"1", False) "1", False)
upddown_fn.assert_called_with(self.agent.context, upddown_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id) "123", self.agent.agent_id,
cfg.CONF.host)
port["admin_state_up"] = True port["admin_state_up"] = True
self.agent.port_update("unused_context", self.agent.port_update("unused_context",
@ -297,7 +298,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
segmentation_id="1", segmentation_id="1",
physical_network="physnet") physical_network="physnet")
updup_fn.assert_called_with(self.agent.context, updup_fn.assert_called_with(self.agent.context,
"123", self.agent.agent_id) "123", self.agent.agent_id,
cfg.CONF.host)
def test_port_update_plugin_rpc_failed(self): def test_port_update_plugin_rpc_failed(self):
port = {'id': 1, port = {'id': 1,

View File

@ -102,7 +102,8 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_ovs_api(rpcapi, topics.PLUGIN, self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call', 'update_device_down', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')
def test_tunnel_sync(self): def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN) rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
@ -116,4 +117,5 @@ class rpcApiTestCase(base.BaseTestCase):
self._test_ovs_api(rpcapi, topics.PLUGIN, self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call', 'update_device_up', rpc_method='call',
device='fake_device', device='fake_device',
agent_id='fake_agent_id') agent_id='fake_agent_id',
host='fake_host')