Merge "ovsdb monitor: handle modified ports"

This commit is contained in:
Zuul 2019-10-09 09:27:47 +00:00 committed by Gerrit Code Review
commit 2ffaa40b43
6 changed files with 63 additions and 19 deletions

View File

@ -52,12 +52,12 @@ class OvsdbMonitor(async_process.AsyncProcess):
respawn_interval=respawn_interval,
log_output=True,
die_on_error=False)
self.new_events = {'added': [], 'removed': []}
self.new_events = {'added': [], 'removed': [], 'modified': []}
def get_events(self):
self.process_events()
events = self.new_events
self.new_events = {'added': [], 'removed': []}
self.new_events = {'added': [], 'removed': [], 'modified': []}
return events
def start(self, block=False, timeout=5):
@ -95,11 +95,14 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
LOG.error("%s monitor is not active", self.table_name)
else:
self.process_events()
return bool(self.new_events['added'] or self.new_events['removed'])
return bool(self.new_events['added'] or
self.new_events['removed'] or
self.new_events['modified'])
def process_events(self):
devices_added = []
devices_removed = []
devices_modified = []
dev_to_ofport = {}
for row in self.iter_stdout():
json = jsonutils.loads(row).get('data')
@ -116,10 +119,17 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
elif action == OVSDB_ACTION_DELETE:
devices_removed.append(device)
elif action == OVSDB_ACTION_NEW:
# We'll receive this event for "initial", "insert"
# and "modify" actions. If ever needed, the old state
# can also be included in the processed event as per
# https://tools.ietf.org/html/rfc7047#section-4.1.6
if device not in devices_added:
devices_modified.append(device)
dev_to_ofport[name] = ofport
self.new_events['added'].extend(devices_added)
self.new_events['removed'].extend(devices_removed)
self.new_events['modified'].extend(devices_modified)
# update any events with ofports received from 'new' action
for event in self.new_events['added']:
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])

View File

@ -1571,11 +1571,16 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
ancillary_port_info['current'] = ancillary_ports
ports_not_ready_yet = set()
if updated_ports is None:
updated_ports = set()
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
added_ports = {p['name'] for p in events['added']}
removed_ports = {p['name'] for p in events['removed']}
updated_ports.update({p['name'] for p in events['modified']})
ports_removed_and_added = added_ports & removed_ports
for p in ports_removed_and_added:
if ovs_lib.BaseOVS().port_exists(p):
@ -1637,8 +1642,6 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self._update_port_info_failed_devices_stats(ancillary_port_info,
failed_ancillary_devices)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
if updated_ports:

View File

@ -206,7 +206,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
# because it keeps finding not ready ports that are created
# by other tests)
filtered_ports.append(dev)
return {'added': filtered_ports, 'removed': events['removed']}
return {'added': filtered_ports, 'removed': events['removed'],
'modified': []}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def stop_agent(self, agent, rpc_loop_thread):

View File

@ -100,7 +100,8 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
events = get_events()
filtered_events = {
'added': [],
'removed': []
'removed': [],
'modified': []
}
for event_type in filtered_events:
for dev in events[event_type]:

View File

@ -13,6 +13,7 @@
# under the License.
import mock
from oslo_serialization import jsonutils
from neutron.agent.common import async_process
from neutron.agent.common import ovs_lib
@ -70,18 +71,42 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
def test_has_updates_after_calling_get_events_is_false(self):
with mock.patch.object(
self.monitor, 'process_events') as process_events:
self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']}
self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1'],
'modified': []}
self.assertTrue(self.monitor.has_updates)
self.monitor.get_events()
self.assertTrue(process_events.called)
self.assertFalse(self.monitor.has_updates)
def _get_event(self, ovs_id='e040fbec-0579-4990-8324-d338da33ae88',
action="insert", name="fake_dev", ofport=10,
external_ids=None, as_string=True):
event = {"data": [[ovs_id, action, name, ["set", [ofport]],
["map", external_ids or []]]]}
if as_string:
event = jsonutils.dumps(event)
return event
def process_event_unassigned_of_port(self):
output = '{"data":[["e040fbec-0579-4990-8324-d338da33ae88","insert",'
output += '"m50",["set",[]],["map",[]]]],"headings":["row","action",'
output += '"name","ofport","external_ids"]}'
output = self._get_event()
with mock.patch.object(
self.monitor, 'iter_stdout', return_value=[output]):
self.monitor.process_events()
self.assertEqual(self.monitor.new_events['added'][0]['ofport'],
ovs_lib.UNASSIGNED_OFPORT)
def test_process_changed_of_port(self):
event0 = self._get_event(action="old", ofport=-1)
event1 = self._get_event(action="new", ofport=10)
expected_dev = {
'name': 'fake_dev',
'ofport': [10],
'external_ids': {}
}
with mock.patch.object(
self.monitor, 'iter_stdout', return_value=[event0, event1]):
self.monitor.process_events()
self.assertIn(expected_dev,
self.monitor.new_events['modified'])

View File

@ -503,7 +503,8 @@ class TestOvsNeutronAgent(object):
[{'ofport': 2,
'external_ids': {'iface-id': port_id,
'attached-mac': 'fa:16:3e:f6:1b:fb'},
'name': 'qvof6f104bd-37'}]
'name': 'qvof6f104bd-37'}],
'modified': []
}
registered_ports = {port_id}
expected_ancillary = dict(current=set(), added=set(), removed=set())
@ -529,7 +530,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary)
def test_process_ports_events_returns_current_for_unchanged_ports(self):
events = {'added': [], 'removed': []}
events = {'added': [], 'removed': [], 'modified': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
@ -541,7 +542,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary)
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
events = {'added': [], 'removed': []}
events = {'added': [], 'removed': [], 'modified': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
@ -552,7 +553,7 @@ class TestOvsNeutronAgent(object):
expected_ancillary, updated_ports)
def test_process_port_events_ignores_removed_port_if_never_added(self):
events = {'added': [],
events = {'added': [], 'modified': [],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
@ -578,7 +579,7 @@ class TestOvsNeutronAgent(object):
def test_process_port_events_port_not_ready_yet(self):
events = {'added': [{'name': 'port5', 'ofport': [],
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': []}
'removed': [], 'modified': []}
old_devices_not_ready = {'port4'}
registered_ports = set([1, 2, 3])
expected_ports = dict(current=set([1, 2, 3, 4]),
@ -612,7 +613,8 @@ class TestOvsNeutronAgent(object):
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}]}
'external_ids': {'attached-mac': 'test-mac'}}],
'modified': []}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
@ -2210,10 +2212,12 @@ class TestOvsNeutronAgent(object):
pass
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), set(), set(),
mock.call({'removed': [], 'added': [], 'modified': []},
set(), set(), set(),
failed_devices, failed_ancillary_devices,
set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), set(),
mock.call({'removed': [], 'added': [], 'modified': []},
set(['tap0']), set(),
set(), failed_devices, failed_ancillary_devices,
set())
])