Merge "[OVS] Handle added/removed ports in the same polling iteration" into stable/queens

This commit is contained in:
Zuul 2020-02-18 17:12:21 +00:00 committed by Gerrit Code Review
commit bc280c8dcd
2 changed files with 80 additions and 78 deletions

View File

@ -37,6 +37,7 @@ from oslo_service import loopingcall
from oslo_service import systemd
from oslo_utils import netutils
from osprofiler import profiler
import six
from six import moves
from neutron._i18n import _
@ -69,6 +70,11 @@ from neutron.plugins.ml2.drivers.openvswitch.agent \
import ovs_dvr_neutron_agent
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
if six.PY2:
abc_dict = dict
else:
abc_dict = collections.UserDict
LOG = logging.getLogger(__name__)
cfg.CONF.import_group('AGENT', 'neutron.plugins.ml2.drivers.openvswitch.'
@ -85,6 +91,17 @@ class OVSPluginApi(agent_rpc.CacheBackedPluginApi):
pass
class PortInfo(abc_dict):
def __init__(self, current=None, added=None, removed=None, updated=None,
re_added=None):
_dict = {'current': current or set(),
'added': added or set(),
'removed': removed or set(),
'updated': updated or set(),
're_added': re_added or set()}
super(PortInfo, self).__init__(_dict)
def has_zero_prefixlen_address(ip_addresses):
return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses)
@ -1280,9 +1297,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def _get_port_info(self, registered_ports, cur_ports,
readd_registered_ports):
port_info = {'current': cur_ports,
'added': set(),
'removed': set()}
port_info = PortInfo(current=cur_ports)
# FIXME(salv-orlando): It's not really necessary to return early
# if nothing has changed.
if not readd_registered_ports and cur_ports == registered_ports:
@ -1316,15 +1331,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def process_ports_events(self, events, registered_ports, ancillary_ports,
old_ports_not_ready, failed_devices,
failed_ancillary_devices, updated_ports=None):
port_info = {}
port_info['added'] = set()
port_info['removed'] = set()
port_info['current'] = registered_ports
ancillary_port_info = {}
ancillary_port_info['added'] = set()
ancillary_port_info['removed'] = set()
ancillary_port_info['current'] = ancillary_ports
port_info = PortInfo(current=registered_ports)
ancillary_port_info = PortInfo(current=ancillary_ports)
ports_not_ready_yet = set()
# if a port was added and then removed or viceversa since the agent
@ -1332,9 +1340,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# to determine if the port was added or deleted
added_ports = {p['name'] for p in events['added']}
removed_ports = {p['name'] for p in events['removed']}
ports_removed_and_added = added_ports & removed_ports
for p in ports_removed_and_added:
ports_re_added = added_ports & removed_ports
for p in ports_re_added:
if ovs_lib.BaseOVS().port_exists(p):
events['re_added'] = [e for e in events['removed']
if e['name'] == p]
events['removed'] = [e for e in events['removed']
if e['name'] != p]
else:
@ -1382,12 +1393,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
ports_not_ready_yet |= old_ports_not_ready_yet
events['added'].extend(old_ports_not_ready_attrs)
for port in events['added']:
_process_port(port, port_info['added'],
ancillary_port_info['added'])
for port in events['removed']:
_process_port(port, port_info['removed'],
ancillary_port_info['removed'])
for event_type in ('added', 'removed', 're_added'):
for port in events.get(event_type, []):
_process_port(port, port_info[event_type],
ancillary_port_info[event_type])
self._update_port_info_failed_devices_stats(port_info, failed_devices)
self._update_port_info_failed_devices_stats(ancillary_port_info,
@ -1565,7 +1574,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
br.cleanup_tunnel_port(ofport)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices, provisioning_needed):
def treat_devices_added_or_updated(self, devices, provisioning_needed,
re_added):
skipped_devices = []
need_binding_devices = []
devices_details_list = (
@ -1608,6 +1618,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
need_binding_devices.append(details)
self._update_port_network(details['port_id'],
details['network_id'])
if details['device'] in re_added:
self.ext_manager.delete_port(self.context, details)
self.ext_manager.handle_port(self.context, details)
else:
LOG.warning(
@ -1705,6 +1717,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# list at the same time; avoid processing it twice.
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
re_added = port_info.get('re_added', set())
need_binding_devices = []
skipped_devices = set()
start = time.time()
@ -1712,7 +1725,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
(skipped_devices, need_binding_devices,
failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, provisioning_needed))
devices_added_updated, provisioning_needed, re_added))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "

View File

@ -399,24 +399,23 @@ class TestOvsNeutronAgent(object):
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 3])
expected = {'current': vif_port_set,
'added': set(),
'removed': set()}
expected = ovs_agent.PortInfo(current=vif_port_set)
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ports_returns_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
expected = ovs_agent.PortInfo(current=vif_port_set, added=set([3]),
removed=set([2]))
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ports_returns_port_changes_with_sync(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=vif_port_set,
removed=set([2]))
expected = ovs_agent.PortInfo(current=vif_port_set, added=vif_port_set,
removed=set([2]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
sync=True)
self.assertEqual(expected, actual)
@ -424,8 +423,8 @@ class TestOvsNeutronAgent(object):
def _test_scan_ports_with_updated_ports(self, updated_ports):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([4]))
expected = ovs_agent.PortInfo(current=vif_port_set, added={3},
removed={2}, updated={4})
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
@ -442,8 +441,8 @@ class TestOvsNeutronAgent(object):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
updated_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([1]))
expected = ovs_agent.PortInfo(current=vif_port_set, added=set([3]),
removed=set([2]), updated=set([1]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
@ -452,8 +451,7 @@ class TestOvsNeutronAgent(object):
vif_port_set = set([1, 2, 3])
registered_ports = set([1, 2, 3])
updated_ports = set([2])
expected = dict(current=vif_port_set, updated=set([2]),
added=set(), removed=set())
expected = ovs_agent.PortInfo(current=vif_port_set, updated=set([2]))
actual = self.mock_scan_ports(vif_port_set, registered_ports,
updated_ports)
self.assertEqual(expected, actual)
@ -491,12 +489,11 @@ class TestOvsNeutronAgent(object):
'name': 'qvof6f104bd-37'}]
}
registered_ports = {port_id}
expected_ancillary = dict(current=set(), added=set(), removed=set())
expected_ancillary = ovs_agent.PortInfo()
# port was removed and then added
expected_ports = dict(current={port_id},
added={port_id},
removed=set())
expected_ports = ovs_agent.PortInfo(current={port_id}, added={port_id},
re_added={port_id})
with mock.patch.object(ovs_lib.BaseOVS, "port_exists",
return_value=True):
self._test_process_ports_events(events.copy(), registered_ports,
@ -504,9 +501,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary)
# port was added and then removed
expected_ports = dict(current=set(),
added=set(),
removed={port_id})
expected_ports = ovs_agent.PortInfo(removed={port_id})
with mock.patch.object(ovs_lib.BaseOVS, "port_exists",
return_value=False):
self._test_process_ports_events(events.copy(), registered_ports,
@ -517,10 +512,8 @@ class TestOvsNeutronAgent(object):
events = {'added': [], 'removed': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
'removed': set()}
expected_ancillary = {'current': ancillary_ports, 'added': set(),
'removed': set()}
expected_ports = ovs_agent.PortInfo(current=registered_ports)
expected_ancillary = ovs_agent.PortInfo(current=ancillary_ports)
self._test_process_ports_events(events, registered_ports,
ancillary_ports, expected_ports,
expected_ancillary)
@ -529,9 +522,9 @@ class TestOvsNeutronAgent(object):
events = {'added': [], 'removed': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
added=set(), removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
expected_ports = ovs_agent.PortInfo(current=registered_ports,
updated={2})
expected_ancillary = ovs_agent.PortInfo()
self._test_process_ports_events(events, registered_ports,
set(), expected_ports,
expected_ancillary, updated_ports)
@ -541,9 +534,8 @@ class TestOvsNeutronAgent(object):
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
expected_ports = dict(current=registered_ports, added=set(),
removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
expected_ports = ovs_agent.PortInfo(current=registered_ports)
expected_ancillary = ovs_agent.PortInfo()
devices_not_ready_yet = set()
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[2]), \
@ -565,11 +557,11 @@ class TestOvsNeutronAgent(object):
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': []}
old_devices_not_ready = {'port4'}
registered_ports = set([1, 2, 3])
expected_ports = dict(current=set([1, 2, 3, 4]),
added=set([4]), removed=set())
registered_ports = {1, 2, 3}
expected_ports = ovs_agent.PortInfo(current={1, 2, 3, 4}, added={4},
removed=set())
self.agent.ancillary_brs = []
expected_ancillary = dict(current=set(), added=set(), removed=set())
expected_ancillary = ovs_agent.PortInfo()
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[5, 4]), \
mock.patch.object(self.agent, 'check_changed_vlans',
@ -600,11 +592,10 @@ class TestOvsNeutronAgent(object):
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
if updated_ports:
expected_ports['updated'] = updated_ports
expected_ancillary = dict(current={6, 8}, added={6},
removed={5})
expected_ports = ovs_agent.PortInfo(current={1, 3, 4}, added={3},
removed={2}, updated=updated_ports)
expected_ancillary = ovs_agent.PortInfo(current={6, 8}, added={6},
removed={5})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
self.agent.ancillary_brs = [ancillary_bridge]
@ -648,10 +639,8 @@ class TestOvsNeutronAgent(object):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
port_tags_dict = {1: []}
expected = dict(
added=set([3]), current=vif_port_set,
removed=set([2]), updated=set([1])
)
expected = ovs_agent.PortInfo(added={3}, current=vif_port_set,
removed={2}, updated={1})
with mock.patch.object(self.agent, 'tun_br', autospec=True), \
mock.patch.object(self.agent.plugin_rpc,
'update_device_list') as upd_l:
@ -855,8 +844,8 @@ class TestOvsNeutronAgent(object):
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
skip_devs, _, _ = (
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should not raise
self.assertFalse(skip_devs)
return func.called
@ -909,7 +898,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent, 'treat_vif_port',
return_value=False):
self.agent.treat_devices_added_or_updated([], False)
self.agent.treat_devices_added_or_updated([], False, set())
def test_treat_devices_added_updated_skips_if_port_not_found(self):
dev_mock = mock.MagicMock()
@ -928,7 +917,8 @@ class TestOvsNeutronAgent(object):
"delete_port") as ext_mgr_delete_port,\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs = self.agent.treat_devices_added_or_updated([], False)
skip_devs = self.agent.treat_devices_added_or_updated(
[], False, set())
# The function should return False for resync and no device
# processed
self.assertEqual((['the_skipped_one'], [], set()), skip_devs)
@ -949,7 +939,7 @@ class TestOvsNeutronAgent(object):
'treat_vif_port') as treat_vif_port:
failed_devices = {'added': set(), 'removed': set()}
(_, _, failed_devices['added']) = (
self.agent.treat_devices_added_or_updated([], False))
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should return False for resync and no device
# processed
self.assertEqual(set([dev_mock]), failed_devices.get('added'))
@ -979,8 +969,8 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices, _ = (
self.agent.treat_devices_added_or_updated([], False))
skip_devs, _, _ = (
self.agent.treat_devices_added_or_updated([], False, set()))
# The function should return False for resync
self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
@ -1098,7 +1088,7 @@ class TestOvsNeutronAgent(object):
port_info.get('updated', set()))
if devices_added_updated:
device_added_updated.assert_called_once_with(
devices_added_updated, False)
devices_added_updated, False, set())
if port_info.get('removed', set()):
device_removed.assert_called_once_with(port_info['removed'])
if skipped_devices:
@ -2475,24 +2465,23 @@ class AncillaryBridgesTest(object):
def test_scan_ancillary_ports_returns_cur_only_for_unchanged_ports(self):
vif_port_set = set([1, 2])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set,
added=set(),
removed=set())
expected = ovs_agent.PortInfo(current=vif_port_set)
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ancillary_ports_returns_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
expected = ovs_agent.PortInfo(current=vif_port_set, added={3},
removed={2})
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def test_scan_ancillary_ports_returns_port_changes_with_sync(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=vif_port_set,
removed=set([2]))
expected = ovs_agent.PortInfo(current=vif_port_set, added=vif_port_set,
removed={2})
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports,
sync=True)
self.assertEqual(expected, actual)