ovsdb monitor: handle modified ports

This patch ensures that the ovsdb monitor propagates the events
received for modified ports.

We'll use a new list for the modified ports, which the neutron ovs
agent can handle.

In particular, this will cover the situation in which the ofport
changes. When using recent OVS Windows versions, VM ofports change
to -1 (invalid) when the VMs are shut down, receiving a valid
ofport when the VMs are powered back on (different than the initial
one). With this patch applied, "modify" events will be propagated
to the ovs agent, which will then update the OpenFlow rules.
The old rules are cleaned up by "update_stale_ofport_rules"
once the invalid ofport is detected.

Closes-Bug: #1843870
Co-authored-by: Alin Serdean <aserdean@cloudbasesolutions.com>

Change-Id: I0c3a570cbb3fbb03b4224744b32e034e9e255f8e
This commit is contained in:
Lucian Petrut 2019-09-12 17:34:04 +03:00
parent 8887343782
commit 451c21571f
6 changed files with 63 additions and 19 deletions

View File

@ -52,12 +52,12 @@ class OvsdbMonitor(async_process.AsyncProcess):
respawn_interval=respawn_interval,
log_output=True,
die_on_error=False)
self.new_events = {'added': [], 'removed': []}
self.new_events = {'added': [], 'removed': [], 'modified': []}
def get_events(self):
self.process_events()
events = self.new_events
self.new_events = {'added': [], 'removed': []}
self.new_events = {'added': [], 'removed': [], 'modified': []}
return events
def start(self, block=False, timeout=5):
@ -95,11 +95,14 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
LOG.error("%s monitor is not active", self.table_name)
else:
self.process_events()
return bool(self.new_events['added'] or self.new_events['removed'])
return bool(self.new_events['added'] or
self.new_events['removed'] or
self.new_events['modified'])
def process_events(self):
devices_added = []
devices_removed = []
devices_modified = []
dev_to_ofport = {}
for row in self.iter_stdout():
json = jsonutils.loads(row).get('data')
@ -116,10 +119,17 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
elif action == OVSDB_ACTION_DELETE:
devices_removed.append(device)
elif action == OVSDB_ACTION_NEW:
# We'll receive this event for "initial", "insert"
# and "modify" actions. If ever needed, the old state
# can also be included in the processed event as per
# https://tools.ietf.org/html/rfc7047#section-4.1.6
if device not in devices_added:
devices_modified.append(device)
dev_to_ofport[name] = ofport
self.new_events['added'].extend(devices_added)
self.new_events['removed'].extend(devices_removed)
self.new_events['modified'].extend(devices_modified)
# update any events with ofports received from 'new' action
for event in self.new_events['added']:
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])

View File

@ -1436,11 +1436,16 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
ancillary_port_info['current'] = ancillary_ports
ports_not_ready_yet = set()
if updated_ports is None:
updated_ports = set()
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
added_ports = {p['name'] for p in events['added']}
removed_ports = {p['name'] for p in events['removed']}
updated_ports.update({p['name'] for p in events['modified']})
ports_removed_and_added = added_ports & removed_ports
for p in ports_removed_and_added:
if ovs_lib.BaseOVS().port_exists(p):
@ -1502,8 +1507,6 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self._update_port_info_failed_devices_stats(ancillary_port_info,
failed_ancillary_devices)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
if updated_ports:

View File

@ -146,7 +146,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
# because it keeps finding not ready ports that are created
# by other tests)
filtered_ports.append(dev)
return {'added': filtered_ports, 'removed': events['removed']}
return {'added': filtered_ports, 'removed': events['removed'],
'modified': []}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def stop_agent(self, agent, rpc_loop_thread):

View File

@ -100,7 +100,8 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
events = get_events()
filtered_events = {
'added': [],
'removed': []
'removed': [],
'modified': []
}
for event_type in filtered_events:
for dev in events[event_type]:

View File

@ -13,6 +13,7 @@
# under the License.
import mock
from oslo_serialization import jsonutils
from neutron.agent.common import async_process
from neutron.agent.common import ovs_lib
@ -70,18 +71,42 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
def test_has_updates_after_calling_get_events_is_false(self):
with mock.patch.object(
self.monitor, 'process_events') as process_events:
self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']}
self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1'],
'modified': []}
self.assertTrue(self.monitor.has_updates)
self.monitor.get_events()
self.assertTrue(process_events.called)
self.assertFalse(self.monitor.has_updates)
def _get_event(self, ovs_id='e040fbec-0579-4990-8324-d338da33ae88',
action="insert", name="fake_dev", ofport=10,
external_ids=None, as_string=True):
event = {"data": [[ovs_id, action, name, ["set", [ofport]],
["map", external_ids or []]]]}
if as_string:
event = jsonutils.dumps(event)
return event
def process_event_unassigned_of_port(self):
output = '{"data":[["e040fbec-0579-4990-8324-d338da33ae88","insert",'
output += '"m50",["set",[]],["map",[]]]],"headings":["row","action",'
output += '"name","ofport","external_ids"]}'
output = self._get_event()
with mock.patch.object(
self.monitor, 'iter_stdout', return_value=[output]):
self.monitor.process_events()
self.assertEqual(self.monitor.new_events['added'][0]['ofport'],
ovs_lib.UNASSIGNED_OFPORT)
def test_process_changed_of_port(self):
event0 = self._get_event(action="old", ofport=-1)
event1 = self._get_event(action="new", ofport=10)
expected_dev = {
'name': 'fake_dev',
'ofport': [10],
'external_ids': {}
}
with mock.patch.object(
self.monitor, 'iter_stdout', return_value=[event0, event1]):
self.monitor.process_events()
self.assertIn(expected_dev,
self.monitor.new_events['modified'])

View File

@ -483,7 +483,8 @@ class TestOvsNeutronAgent(object):
[{'ofport': 2,
'external_ids': {'iface-id': port_id,
'attached-mac': 'fa:16:3e:f6:1b:fb'},
'name': 'qvof6f104bd-37'}]
'name': 'qvof6f104bd-37'}],
'modified': []
}
registered_ports = {port_id}
expected_ancillary = dict(current=set(), added=set(), removed=set())
@ -509,7 +510,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary)
def test_process_ports_events_returns_current_for_unchanged_ports(self):
events = {'added': [], 'removed': []}
events = {'added': [], 'removed': [], 'modified': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
@ -521,7 +522,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary)
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
events = {'added': [], 'removed': []}
events = {'added': [], 'removed': [], 'modified': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
@ -532,7 +533,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary, updated_ports)
def test_process_port_events_ignores_removed_port_if_never_added(self):
events = {'added': [],
events = {'added': [], 'modified': [],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
@ -558,7 +559,7 @@ class TestOvsNeutronAgent(object):
def test_process_port_events_port_not_ready_yet(self):
events = {'added': [{'name': 'port5', 'ofport': [],
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': []}
'removed': [], 'modified': []}
old_devices_not_ready = {'port4'}
registered_ports = set([1, 2, 3])
expected_ports = dict(current=set([1, 2, 3, 4]),
@ -592,7 +593,8 @@ class TestOvsNeutronAgent(object):
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}]}
'external_ids': {'attached-mac': 'test-mac'}}],
'modified': []}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
@ -2086,10 +2088,12 @@ class TestOvsNeutronAgent(object):
pass
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), set(), set(),
mock.call({'removed': [], 'added': [], 'modified': []},
set(), set(), set(),
failed_devices, failed_ancillary_devices,
set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), set(),
mock.call({'removed': [], 'added': [], 'modified': []},
set(['tap0']), set(),
set(), failed_devices, failed_ancillary_devices,
set())
])