Merge "[OVS] Handle added/removed ports in the same polling iteration" into stable/rocky
This commit is contained in:
commit
020d69f6a2
|
@ -39,6 +39,7 @@ from oslo_service import loopingcall
|
|||
from oslo_service import systemd
|
||||
from oslo_utils import netutils
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
from six import moves
|
||||
|
||||
from neutron._i18n import _
|
||||
|
@ -70,6 +71,11 @@ from neutron.plugins.ml2.drivers.openvswitch.agent \
|
|||
import ovs_dvr_neutron_agent
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent import vlanmanager
|
||||
|
||||
if six.PY2:
|
||||
abc_dict = dict
|
||||
else:
|
||||
abc_dict = collections.UserDict
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
cfg.CONF.import_group('AGENT', 'neutron.plugins.ml2.drivers.openvswitch.'
|
||||
|
@ -86,6 +92,17 @@ class OVSPluginApi(agent_rpc.CacheBackedPluginApi):
|
|||
pass
|
||||
|
||||
|
||||
class PortInfo(abc_dict):
|
||||
def __init__(self, current=None, added=None, removed=None, updated=None,
|
||||
re_added=None):
|
||||
_dict = {'current': current or set(),
|
||||
'added': added or set(),
|
||||
'removed': removed or set(),
|
||||
'updated': updated or set(),
|
||||
're_added': re_added or set()}
|
||||
super(PortInfo, self).__init__(_dict)
|
||||
|
||||
|
||||
def has_zero_prefixlen_address(ip_addresses):
|
||||
return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses)
|
||||
|
||||
|
@ -1328,9 +1345,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
|
||||
def _get_port_info(self, registered_ports, cur_ports,
|
||||
readd_registered_ports):
|
||||
port_info = {'current': cur_ports,
|
||||
'added': set(),
|
||||
'removed': set()}
|
||||
port_info = PortInfo(current=cur_ports)
|
||||
# FIXME(salv-orlando): It's not really necessary to return early
|
||||
# if nothing has changed.
|
||||
if not readd_registered_ports and cur_ports == registered_ports:
|
||||
|
@ -1364,15 +1379,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
def process_ports_events(self, events, registered_ports, ancillary_ports,
|
||||
old_ports_not_ready, failed_devices,
|
||||
failed_ancillary_devices, updated_ports=None):
|
||||
port_info = {}
|
||||
port_info['added'] = set()
|
||||
port_info['removed'] = set()
|
||||
port_info['current'] = registered_ports
|
||||
|
||||
ancillary_port_info = {}
|
||||
ancillary_port_info['added'] = set()
|
||||
ancillary_port_info['removed'] = set()
|
||||
ancillary_port_info['current'] = ancillary_ports
|
||||
port_info = PortInfo(current=registered_ports)
|
||||
ancillary_port_info = PortInfo(current=ancillary_ports)
|
||||
|
||||
ports_not_ready_yet = set()
|
||||
# if a port was added and then removed or viceversa since the agent
|
||||
|
@ -1380,9 +1388,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
# to determine if the port was added or deleted
|
||||
added_ports = {p['name'] for p in events['added']}
|
||||
removed_ports = {p['name'] for p in events['removed']}
|
||||
ports_removed_and_added = added_ports & removed_ports
|
||||
for p in ports_removed_and_added:
|
||||
|
||||
ports_re_added = added_ports & removed_ports
|
||||
for p in ports_re_added:
|
||||
if ovs_lib.BaseOVS().port_exists(p):
|
||||
events['re_added'] = [e for e in events['removed']
|
||||
if e['name'] == p]
|
||||
events['removed'] = [e for e in events['removed']
|
||||
if e['name'] != p]
|
||||
else:
|
||||
|
@ -1430,12 +1441,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
ports_not_ready_yet |= old_ports_not_ready_yet
|
||||
events['added'].extend(old_ports_not_ready_attrs)
|
||||
|
||||
for port in events['added']:
|
||||
_process_port(port, port_info['added'],
|
||||
ancillary_port_info['added'])
|
||||
for port in events['removed']:
|
||||
_process_port(port, port_info['removed'],
|
||||
ancillary_port_info['removed'])
|
||||
for event_type in ('added', 'removed', 're_added'):
|
||||
for port in events.get(event_type, []):
|
||||
_process_port(port, port_info[event_type],
|
||||
ancillary_port_info[event_type])
|
||||
|
||||
self._update_port_info_failed_devices_stats(port_info, failed_devices)
|
||||
self._update_port_info_failed_devices_stats(ancillary_port_info,
|
||||
|
@ -1613,7 +1622,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
br.cleanup_tunnel_port(ofport)
|
||||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||
|
||||
def treat_devices_added_or_updated(self, devices, provisioning_needed):
|
||||
def treat_devices_added_or_updated(self, devices, provisioning_needed,
|
||||
re_added):
|
||||
skipped_devices = []
|
||||
need_binding_devices = []
|
||||
binding_no_activated_devices = set()
|
||||
|
@ -1659,6 +1669,8 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
need_binding_devices.append(details)
|
||||
self._update_port_network(details['port_id'],
|
||||
details['network_id'])
|
||||
if details['device'] in re_added:
|
||||
self.ext_manager.delete_port(self.context, details)
|
||||
self.ext_manager.handle_port(self.context, details)
|
||||
else:
|
||||
if c_const.NO_ACTIVE_BINDING in details:
|
||||
|
@ -1764,6 +1776,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
# list at the same time; avoid processing it twice.
|
||||
devices_added_updated = (port_info.get('added', set()) |
|
||||
port_info.get('updated', set()))
|
||||
re_added = port_info.get('re_added', set())
|
||||
need_binding_devices = []
|
||||
skipped_devices = set()
|
||||
binding_no_activated_devices = set()
|
||||
|
@ -1772,7 +1785,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
(skipped_devices, binding_no_activated_devices,
|
||||
need_binding_devices, failed_devices['added']) = (
|
||||
self.treat_devices_added_or_updated(
|
||||
devices_added_updated, provisioning_needed))
|
||||
devices_added_updated, provisioning_needed, re_added))
|
||||
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
|
||||
"treat_devices_added_or_updated completed. "
|
||||
"Skipped %(num_skipped)d and no activated binding "
|
||||
|
|
|
@ -401,24 +401,23 @@ class TestOvsNeutronAgent(object):
|
|||
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 3])
|
||||
expected = {'current': vif_port_set,
|
||||
'added': set(),
|
||||
'removed': set()}
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set)
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_returns_port_changes(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]))
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ports_returns_port_changes_with_sync(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=vif_port_set,
|
||||
removed=set([2]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added=vif_port_set,
|
||||
removed=set([2]))
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
||||
sync=True)
|
||||
self.assertEqual(expected, actual)
|
||||
|
@ -426,8 +425,8 @@ class TestOvsNeutronAgent(object):
|
|||
def _test_scan_ports_with_updated_ports(self, updated_ports):
|
||||
vif_port_set = set([1, 3, 4])
|
||||
registered_ports = set([1, 2, 4])
|
||||
expected = dict(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set([4]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added={3},
|
||||
removed={2}, updated={4})
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
||||
updated_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
@ -444,8 +443,8 @@ class TestOvsNeutronAgent(object):
|
|||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
updated_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set([1]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added=set([3]),
|
||||
removed=set([2]), updated=set([1]))
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
||||
updated_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
@ -454,8 +453,7 @@ class TestOvsNeutronAgent(object):
|
|||
vif_port_set = set([1, 2, 3])
|
||||
registered_ports = set([1, 2, 3])
|
||||
updated_ports = set([2])
|
||||
expected = dict(current=vif_port_set, updated=set([2]),
|
||||
added=set(), removed=set())
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, updated=set([2]))
|
||||
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
||||
updated_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
@ -493,12 +491,11 @@ class TestOvsNeutronAgent(object):
|
|||
'name': 'qvof6f104bd-37'}]
|
||||
}
|
||||
registered_ports = {port_id}
|
||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
||||
expected_ancillary = ovs_agent.PortInfo()
|
||||
|
||||
# port was removed and then added
|
||||
expected_ports = dict(current={port_id},
|
||||
added={port_id},
|
||||
removed=set())
|
||||
expected_ports = ovs_agent.PortInfo(current={port_id}, added={port_id},
|
||||
re_added={port_id})
|
||||
with mock.patch.object(ovs_lib.BaseOVS, "port_exists",
|
||||
return_value=True):
|
||||
self._test_process_ports_events(events.copy(), registered_ports,
|
||||
|
@ -506,9 +503,7 @@ class TestOvsNeutronAgent(object):
|
|||
expected_ancillary)
|
||||
|
||||
# port was added and then removed
|
||||
expected_ports = dict(current=set(),
|
||||
added=set(),
|
||||
removed={port_id})
|
||||
expected_ports = ovs_agent.PortInfo(removed={port_id})
|
||||
with mock.patch.object(ovs_lib.BaseOVS, "port_exists",
|
||||
return_value=False):
|
||||
self._test_process_ports_events(events.copy(), registered_ports,
|
||||
|
@ -519,10 +514,8 @@ class TestOvsNeutronAgent(object):
|
|||
events = {'added': [], 'removed': []}
|
||||
registered_ports = {1, 3}
|
||||
ancillary_ports = {2, 5}
|
||||
expected_ports = {'current': registered_ports, 'added': set(),
|
||||
'removed': set()}
|
||||
expected_ancillary = {'current': ancillary_ports, 'added': set(),
|
||||
'removed': set()}
|
||||
expected_ports = ovs_agent.PortInfo(current=registered_ports)
|
||||
expected_ancillary = ovs_agent.PortInfo(current=ancillary_ports)
|
||||
self._test_process_ports_events(events, registered_ports,
|
||||
ancillary_ports, expected_ports,
|
||||
expected_ancillary)
|
||||
|
@ -531,9 +524,9 @@ class TestOvsNeutronAgent(object):
|
|||
events = {'added': [], 'removed': []}
|
||||
registered_ports = {1, 2, 3}
|
||||
updated_ports = {2}
|
||||
expected_ports = dict(current=registered_ports, updated={2},
|
||||
added=set(), removed=set())
|
||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
||||
expected_ports = ovs_agent.PortInfo(current=registered_ports,
|
||||
updated={2})
|
||||
expected_ancillary = ovs_agent.PortInfo()
|
||||
self._test_process_ports_events(events, registered_ports,
|
||||
set(), expected_ports,
|
||||
expected_ancillary, updated_ports)
|
||||
|
@ -543,9 +536,8 @@ class TestOvsNeutronAgent(object):
|
|||
'removed': [{'name': 'port2', 'ofport': 2,
|
||||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
||||
registered_ports = {1}
|
||||
expected_ports = dict(current=registered_ports, added=set(),
|
||||
removed=set())
|
||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
||||
expected_ports = ovs_agent.PortInfo(current=registered_ports)
|
||||
expected_ancillary = ovs_agent.PortInfo()
|
||||
devices_not_ready_yet = set()
|
||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
||||
side_effect=[2]), \
|
||||
|
@ -567,11 +559,11 @@ class TestOvsNeutronAgent(object):
|
|||
'external_ids': {'attached-mac': 'test-mac'}}],
|
||||
'removed': []}
|
||||
old_devices_not_ready = {'port4'}
|
||||
registered_ports = set([1, 2, 3])
|
||||
expected_ports = dict(current=set([1, 2, 3, 4]),
|
||||
added=set([4]), removed=set())
|
||||
registered_ports = {1, 2, 3}
|
||||
expected_ports = ovs_agent.PortInfo(current={1, 2, 3, 4}, added={4},
|
||||
removed=set())
|
||||
self.agent.ancillary_brs = []
|
||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
||||
expected_ancillary = ovs_agent.PortInfo()
|
||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
||||
side_effect=[5, 4]), \
|
||||
mock.patch.object(self.agent, 'check_changed_vlans',
|
||||
|
@ -602,11 +594,10 @@ class TestOvsNeutronAgent(object):
|
|||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
||||
registered_ports = {1, 2, 4}
|
||||
ancillary_ports = {5, 8}
|
||||
expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
|
||||
if updated_ports:
|
||||
expected_ports['updated'] = updated_ports
|
||||
expected_ancillary = dict(current={6, 8}, added={6},
|
||||
removed={5})
|
||||
expected_ports = ovs_agent.PortInfo(current={1, 3, 4}, added={3},
|
||||
removed={2}, updated=updated_ports)
|
||||
expected_ancillary = ovs_agent.PortInfo(current={6, 8}, added={6},
|
||||
removed={5})
|
||||
ancillary_bridge = mock.Mock()
|
||||
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
|
||||
self.agent.ancillary_brs = [ancillary_bridge]
|
||||
|
@ -650,10 +641,8 @@ class TestOvsNeutronAgent(object):
|
|||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
port_tags_dict = {1: []}
|
||||
expected = dict(
|
||||
added=set([3]), current=vif_port_set,
|
||||
removed=set([2]), updated=set([1])
|
||||
)
|
||||
expected = ovs_agent.PortInfo(added={3}, current=vif_port_set,
|
||||
removed={2}, updated={1})
|
||||
with mock.patch.object(self.agent, 'tun_br', autospec=True), \
|
||||
mock.patch.object(self.agent.plugin_rpc,
|
||||
'update_device_list') as upd_l:
|
||||
|
@ -858,7 +847,7 @@ class TestOvsNeutronAgent(object):
|
|||
return_value={}),\
|
||||
mock.patch.object(self.agent, func_name) as func:
|
||||
skip_devs, _, need_bound_devices, _ = (
|
||||
self.agent.treat_devices_added_or_updated([], False))
|
||||
self.agent.treat_devices_added_or_updated([], False, set()))
|
||||
# The function should not raise
|
||||
self.assertFalse(skip_devs)
|
||||
return func.called
|
||||
|
@ -876,7 +865,7 @@ class TestOvsNeutronAgent(object):
|
|||
return_value={details['device']: port}),\
|
||||
mock.patch.object(self.agent, 'port_dead') as func:
|
||||
skip_devs, binding_no_activated_devices, _, _ = (
|
||||
self.agent.treat_devices_added_or_updated([], False))
|
||||
self.agent.treat_devices_added_or_updated([], False, set()))
|
||||
self.assertFalse(skip_devs)
|
||||
self.assertTrue(func.called)
|
||||
self.assertIn('id', binding_no_activated_devices)
|
||||
|
@ -929,7 +918,7 @@ class TestOvsNeutronAgent(object):
|
|||
mock.patch.object(self.agent, 'treat_vif_port',
|
||||
return_value=False):
|
||||
|
||||
self.agent.treat_devices_added_or_updated([], False)
|
||||
self.agent.treat_devices_added_or_updated([], False, set())
|
||||
|
||||
def test_treat_devices_added_updated_skips_if_port_not_found(self):
|
||||
dev_mock = mock.MagicMock()
|
||||
|
@ -948,7 +937,8 @@ class TestOvsNeutronAgent(object):
|
|||
"delete_port") as ext_mgr_delete_port,\
|
||||
mock.patch.object(self.agent,
|
||||
'treat_vif_port') as treat_vif_port:
|
||||
skip_devs = self.agent.treat_devices_added_or_updated([], False)
|
||||
skip_devs = self.agent.treat_devices_added_or_updated(
|
||||
[], False, set())
|
||||
# The function should return False for resync and no device
|
||||
# processed
|
||||
self.assertEqual((['the_skipped_one'], set(), [], set()),
|
||||
|
@ -970,7 +960,7 @@ class TestOvsNeutronAgent(object):
|
|||
'treat_vif_port') as treat_vif_port:
|
||||
failed_devices = {'added': set(), 'removed': set()}
|
||||
(_, _, _, failed_devices['added']) = (
|
||||
self.agent.treat_devices_added_or_updated([], False))
|
||||
self.agent.treat_devices_added_or_updated([], False, set()))
|
||||
# The function should return False for resync and no device
|
||||
# processed
|
||||
self.assertEqual(set([dev_mock]), failed_devices.get('added'))
|
||||
|
@ -1001,7 +991,7 @@ class TestOvsNeutronAgent(object):
|
|||
mock.patch.object(self.agent,
|
||||
'treat_vif_port') as treat_vif_port:
|
||||
skip_devs, _, need_bound_devices, _ = (
|
||||
self.agent.treat_devices_added_or_updated([], False))
|
||||
self.agent.treat_devices_added_or_updated([], False, set()))
|
||||
# The function should return False for resync
|
||||
self.assertFalse(skip_devs)
|
||||
self.assertTrue(treat_vif_port.called)
|
||||
|
@ -1122,7 +1112,7 @@ class TestOvsNeutronAgent(object):
|
|||
port_info.get('updated', set()))
|
||||
if devices_added_updated:
|
||||
device_added_updated.assert_called_once_with(
|
||||
devices_added_updated, False)
|
||||
devices_added_updated, False, set())
|
||||
if port_info.get('removed', set()):
|
||||
device_removed.assert_called_once_with(port_info['removed'])
|
||||
if skipped_devices:
|
||||
|
@ -2561,24 +2551,23 @@ class AncillaryBridgesTest(object):
|
|||
def test_scan_ancillary_ports_returns_cur_only_for_unchanged_ports(self):
|
||||
vif_port_set = set([1, 2])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set,
|
||||
added=set(),
|
||||
removed=set())
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set)
|
||||
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ancillary_ports_returns_port_changes(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added={3},
|
||||
removed={2})
|
||||
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_scan_ancillary_ports_returns_port_changes_with_sync(self):
|
||||
vif_port_set = set([1, 3])
|
||||
registered_ports = set([1, 2])
|
||||
expected = dict(current=vif_port_set, added=vif_port_set,
|
||||
removed=set([2]))
|
||||
expected = ovs_agent.PortInfo(current=vif_port_set, added=vif_port_set,
|
||||
removed={2})
|
||||
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports,
|
||||
sync=True)
|
||||
self.assertEqual(expected, actual)
|
||||
|
|
Loading…
Reference in New Issue