Do not mark device as processed if it wasn't
Currently treat_devices_added_or_updated in the OVS agent skips processing devices which disappeared from the integration bridge during the agent loop. This is fine, however the agent should not mark these devices as processed. Otherwise they won't be processed, should they appear again on the bridge. This patch ensures these devices are not added to the current device set. The patch also changes treat_devices_added_or_updated. The function now will return the list of skipped devices and not anymore a flag signalling whether a resync is required. With the current logic a resync would be required if retrieval of device details fails. With this change, the function treat_devices_added_or_updated will raise in this case and the exception will be handled in process_network_ports. For the sake of consistency, this patch also updates the similar function treat_ancillary_devices_added in order to use the same logic. Finally, this patch amends an innaccurate related comment. Closes-Bug: #1329546 Change-Id: Icc744f32494c7a76004ff161536316924594fbdb
This commit is contained in:
parent
6cd7c57aa0
commit
aa4c24c516
@ -35,6 +35,7 @@ from neutron.agent import rpc as agent_rpc
|
|||||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||||
from neutron.common import config as common_config
|
from neutron.common import config as common_config
|
||||||
from neutron.common import constants as q_const
|
from neutron.common import constants as q_const
|
||||||
|
from neutron.common import exceptions
|
||||||
from neutron.common import rpc as n_rpc
|
from neutron.common import rpc as n_rpc
|
||||||
from neutron.common import topics
|
from neutron.common import topics
|
||||||
from neutron.common import utils as q_utils
|
from neutron.common import utils as q_utils
|
||||||
@ -52,6 +53,11 @@ LOG = logging.getLogger(__name__)
|
|||||||
DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
|
DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
|
||||||
|
|
||||||
|
|
||||||
|
class DeviceListRetrievalError(exceptions.NeutronException):
|
||||||
|
message = _("Unable to retrieve port details for devices: %(devices)s "
|
||||||
|
"because of error: %(error)s")
|
||||||
|
|
||||||
|
|
||||||
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
|
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
|
||||||
# attributes set).
|
# attributes set).
|
||||||
class LocalVLANMapping:
|
class LocalVLANMapping:
|
||||||
@ -1106,26 +1112,23 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||||
|
|
||||||
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
||||||
|
skipped_devices = []
|
||||||
try:
|
try:
|
||||||
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
devices_details_list = self.plugin_rpc.get_devices_details_list(
|
||||||
self.context,
|
self.context,
|
||||||
devices,
|
devices,
|
||||||
self.agent_id)
|
self.agent_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.debug("Unable to get port details for %(devices)s: %(e)s",
|
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||||
{'devices': devices, 'e': e})
|
|
||||||
# resync is needed
|
|
||||||
return True
|
|
||||||
for details in devices_details_list:
|
for details in devices_details_list:
|
||||||
device = details['device']
|
device = details['device']
|
||||||
LOG.debug("Processing port: %s", device)
|
LOG.debug("Processing port: %s", device)
|
||||||
port = self.int_br.get_vif_port_by_id(device)
|
port = self.int_br.get_vif_port_by_id(device)
|
||||||
if not port:
|
if not port:
|
||||||
# The port has disappeared and should not be processed
|
# The port disappeared and cannot be processed
|
||||||
# There is no need to put the port DOWN in the plugin as
|
|
||||||
# it never went up in the first place
|
|
||||||
LOG.info(_("Port %s was not found on the integration bridge "
|
LOG.info(_("Port %s was not found on the integration bridge "
|
||||||
"and will therefore not be processed"), device)
|
"and will therefore not be processed"), device)
|
||||||
|
skipped_devices.append(device)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if 'port_id' in details:
|
if 'port_id' in details:
|
||||||
@ -1139,6 +1142,10 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
details['admin_state_up'],
|
details['admin_state_up'],
|
||||||
ovs_restarted)
|
ovs_restarted)
|
||||||
# update plugin about port status
|
# update plugin about port status
|
||||||
|
# FIXME(salv-orlando): Failures while updating device status
|
||||||
|
# must be handled appropriately. Otherwise this might prevent
|
||||||
|
# neutron server from sending network-vif-* events to the nova
|
||||||
|
# API server, thus possibly preventing instance spawn.
|
||||||
if details.get('admin_state_up'):
|
if details.get('admin_state_up'):
|
||||||
LOG.debug(_("Setting status for %s to UP"), device)
|
LOG.debug(_("Setting status for %s to UP"), device)
|
||||||
self.plugin_rpc.update_device_up(
|
self.plugin_rpc.update_device_up(
|
||||||
@ -1152,7 +1159,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
LOG.warn(_("Device %s not defined on plugin"), device)
|
LOG.warn(_("Device %s not defined on plugin"), device)
|
||||||
if (port and port.ofport != -1):
|
if (port and port.ofport != -1):
|
||||||
self.port_dead(port)
|
self.port_dead(port)
|
||||||
return False
|
return skipped_devices
|
||||||
|
|
||||||
def treat_ancillary_devices_added(self, devices):
|
def treat_ancillary_devices_added(self, devices):
|
||||||
try:
|
try:
|
||||||
@ -1161,10 +1168,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
devices,
|
devices,
|
||||||
self.agent_id)
|
self.agent_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.debug("Unable to get port details for "
|
raise DeviceListRetrievalError(devices=devices, error=e)
|
||||||
"%(devices)s: %(e)s", {'devices': devices, 'e': e})
|
|
||||||
# resync is needed
|
|
||||||
return True
|
|
||||||
|
|
||||||
for details in devices_details_list:
|
for details in devices_details_list:
|
||||||
device = details['device']
|
device = details['device']
|
||||||
@ -1175,7 +1179,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
device,
|
device,
|
||||||
self.agent_id,
|
self.agent_id,
|
||||||
cfg.CONF.host)
|
cfg.CONF.host)
|
||||||
return False
|
|
||||||
|
|
||||||
def treat_devices_removed(self, devices):
|
def treat_devices_removed(self, devices):
|
||||||
resync = False
|
resync = False
|
||||||
@ -1238,13 +1241,29 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
port_info.get('updated', set()))
|
port_info.get('updated', set()))
|
||||||
if devices_added_updated:
|
if devices_added_updated:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
resync_a = self.treat_devices_added_or_updated(
|
try:
|
||||||
devices_added_updated, ovs_restarted)
|
skipped_devices = self.treat_devices_added_or_updated(
|
||||||
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
|
devices_added_updated, ovs_restarted)
|
||||||
"treat_devices_added_or_updated completed "
|
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
|
||||||
"in %(elapsed).3f"),
|
"treat_devices_added_or_updated completed. "
|
||||||
{'iter_num': self.iter_num,
|
"Skipped %(num_skipped)d devices of "
|
||||||
'elapsed': time.time() - start})
|
"%(num_current)d devices currently available. "
|
||||||
|
"Time elapsed: %(elapsed).3f"),
|
||||||
|
{'iter_num': self.iter_num,
|
||||||
|
'num_skipped': len(skipped_devices),
|
||||||
|
'num_current': len(port_info['current']),
|
||||||
|
'elapsed': time.time() - start})
|
||||||
|
# Update the list of current ports storing only those which
|
||||||
|
# have been actually processed.
|
||||||
|
port_info['current'] = (port_info['current'] -
|
||||||
|
set(skipped_devices))
|
||||||
|
except DeviceListRetrievalError:
|
||||||
|
# Need to resync as there was an error with server
|
||||||
|
# communication.
|
||||||
|
LOG.exception(_("process_network_ports - iteration:%d - "
|
||||||
|
"failure while retrieving port details "
|
||||||
|
"from server"), self.iter_num)
|
||||||
|
resync_a = True
|
||||||
if 'removed' in port_info:
|
if 'removed' in port_info:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
resync_b = self.treat_devices_removed(port_info['removed'])
|
resync_b = self.treat_devices_removed(port_info['removed'])
|
||||||
@ -1260,12 +1279,20 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
resync_b = False
|
resync_b = False
|
||||||
if 'added' in port_info:
|
if 'added' in port_info:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
resync_a = self.treat_ancillary_devices_added(port_info['added'])
|
try:
|
||||||
LOG.debug(_("process_ancillary_network_ports - iteration: "
|
self.treat_ancillary_devices_added(port_info['added'])
|
||||||
"%(iter_num)d - treat_ancillary_devices_added "
|
LOG.debug(_("process_ancillary_network_ports - iteration: "
|
||||||
"completed in %(elapsed).3f"),
|
"%(iter_num)d - treat_ancillary_devices_added "
|
||||||
{'iter_num': self.iter_num,
|
"completed in %(elapsed).3f"),
|
||||||
'elapsed': time.time() - start})
|
{'iter_num': self.iter_num,
|
||||||
|
'elapsed': time.time() - start})
|
||||||
|
except DeviceListRetrievalError:
|
||||||
|
# Need to resync as there was an error with server
|
||||||
|
# communication.
|
||||||
|
LOG.exception(_("process_ancillary_network_ports - "
|
||||||
|
"iteration:%d - failure while retrieving "
|
||||||
|
"port details from server"), self.iter_num)
|
||||||
|
resync_a = True
|
||||||
if 'removed' in port_info:
|
if 'removed' in port_info:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
resync_b = self.treat_ancillary_devices_removed(
|
resync_b = self.treat_ancillary_devices_removed(
|
||||||
@ -1386,7 +1413,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
self.updated_ports = set()
|
self.updated_ports = set()
|
||||||
reg_ports = (set() if ovs_restarted else ports)
|
reg_ports = (set() if ovs_restarted else ports)
|
||||||
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
port_info = self.scan_ports(reg_ports, updated_ports_copy)
|
||||||
ports = port_info['current']
|
|
||||||
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||||
"port information retrieved. "
|
"port information retrieved. "
|
||||||
"Elapsed:%(elapsed).3f"),
|
"Elapsed:%(elapsed).3f"),
|
||||||
@ -1412,6 +1438,7 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
len(port_info.get('updated', [])))
|
len(port_info.get('updated', [])))
|
||||||
port_stats['regular']['removed'] = (
|
port_stats['regular']['removed'] = (
|
||||||
len(port_info.get('removed', [])))
|
len(port_info.get('removed', [])))
|
||||||
|
ports = port_info['current']
|
||||||
# Treat ancillary devices if they exist
|
# Treat ancillary devices if they exist
|
||||||
if self.ancillary_brs:
|
if self.ancillary_brs:
|
||||||
port_info = self.update_ancillary_ports(
|
port_info = self.update_ancillary_ports(
|
||||||
|
@ -283,15 +283,16 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||||||
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
|
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual(expected, actual)
|
||||||
|
|
||||||
def test_treat_devices_added_returns_true_for_missing_device(self):
|
def test_treat_devices_added_returns_raises_for_missing_device(self):
|
||||||
with contextlib.nested(
|
with contextlib.nested(
|
||||||
mock.patch.object(self.agent.plugin_rpc,
|
mock.patch.object(self.agent.plugin_rpc,
|
||||||
'get_devices_details_list',
|
'get_devices_details_list',
|
||||||
side_effect=Exception()),
|
side_effect=Exception()),
|
||||||
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||||
return_value=mock.Mock())):
|
return_value=mock.Mock())):
|
||||||
self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
|
self.assertRaises(
|
||||||
False))
|
ovs_neutron_agent.DeviceListRetrievalError,
|
||||||
|
self.agent.treat_devices_added_or_updated, [{}], False)
|
||||||
|
|
||||||
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
def _mock_treat_devices_added_updated(self, details, port, func_name):
|
||||||
"""Mock treat devices added or updated.
|
"""Mock treat devices added or updated.
|
||||||
@ -311,8 +312,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||||||
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||||
mock.patch.object(self.agent, func_name)
|
mock.patch.object(self.agent, func_name)
|
||||||
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
|
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
|
||||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
|
skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
|
||||||
False))
|
# The function should not raise
|
||||||
|
self.assertFalse(skip_devs)
|
||||||
return func.called
|
return func.called
|
||||||
|
|
||||||
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
||||||
@ -335,12 +337,34 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||||||
) as (get_dev_fn, get_vif_func):
|
) as (get_dev_fn, get_vif_func):
|
||||||
self.assertFalse(get_dev_fn.called)
|
self.assertFalse(get_dev_fn.called)
|
||||||
|
|
||||||
def test_treat_devices_added__updated_updates_known_port(self):
|
def test_treat_devices_added_updated_updates_known_port(self):
|
||||||
details = mock.MagicMock()
|
details = mock.MagicMock()
|
||||||
details.__contains__.side_effect = lambda x: True
|
details.__contains__.side_effect = lambda x: True
|
||||||
self.assertTrue(self._mock_treat_devices_added_updated(
|
self.assertTrue(self._mock_treat_devices_added_updated(
|
||||||
details, mock.Mock(), 'treat_vif_port'))
|
details, mock.Mock(), 'treat_vif_port'))
|
||||||
|
|
||||||
|
def test_treat_devices_added_updated_skips_if_port_not_found(self):
|
||||||
|
dev_mock = mock.MagicMock()
|
||||||
|
dev_mock.__getitem__.return_value = 'the_skipped_one'
|
||||||
|
with contextlib.nested(
|
||||||
|
mock.patch.object(self.agent.plugin_rpc,
|
||||||
|
'get_devices_details_list',
|
||||||
|
return_value=[dev_mock]),
|
||||||
|
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
||||||
|
return_value=None),
|
||||||
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
||||||
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
||||||
|
mock.patch.object(self.agent, 'treat_vif_port')
|
||||||
|
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||||
|
upd_dev_down, treat_vif_port):
|
||||||
|
skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
|
||||||
|
# The function should return False for resync and no device
|
||||||
|
# processed
|
||||||
|
self.assertEqual(['the_skipped_one'], skip_devs)
|
||||||
|
self.assertFalse(treat_vif_port.called)
|
||||||
|
self.assertFalse(upd_dev_down.called)
|
||||||
|
self.assertFalse(upd_dev_up.called)
|
||||||
|
|
||||||
def test_treat_devices_added_updated_put_port_down(self):
|
def test_treat_devices_added_updated_put_port_down(self):
|
||||||
fake_details_dict = {'admin_state_up': False,
|
fake_details_dict = {'admin_state_up': False,
|
||||||
'port_id': 'xxx',
|
'port_id': 'xxx',
|
||||||
@ -360,8 +384,9 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||||||
mock.patch.object(self.agent, 'treat_vif_port')
|
mock.patch.object(self.agent, 'treat_vif_port')
|
||||||
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
) as (get_dev_fn, get_vif_func, upd_dev_up,
|
||||||
upd_dev_down, treat_vif_port):
|
upd_dev_down, treat_vif_port):
|
||||||
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
|
skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
|
||||||
False))
|
# The function should return False for resync
|
||||||
|
self.assertFalse(skip_devs)
|
||||||
self.assertTrue(treat_vif_port.called)
|
self.assertTrue(treat_vif_port.called)
|
||||||
self.assertTrue(upd_dev_down.called)
|
self.assertTrue(upd_dev_down.called)
|
||||||
|
|
||||||
@ -388,7 +413,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||||||
with contextlib.nested(
|
with contextlib.nested(
|
||||||
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
|
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
|
||||||
mock.patch.object(self.agent, "treat_devices_added_or_updated",
|
mock.patch.object(self.agent, "treat_devices_added_or_updated",
|
||||||
return_value=False),
|
return_value=[]),
|
||||||
mock.patch.object(self.agent, "treat_devices_removed",
|
mock.patch.object(self.agent, "treat_devices_removed",
|
||||||
return_value=False)
|
return_value=False)
|
||||||
) as (setup_port_filters, device_added_updated, device_removed):
|
) as (setup_port_filters, device_added_updated, device_removed):
|
||||||
|
Loading…
Reference in New Issue
Block a user