OVS agent reacts to events instead of polling

OVSDB monitor generates the events that the OVS agent
needs to process (device added or updated). Instead of
polling the agent processes the queue of events.

Change-Id: I168a3cc3aa96a809153a30635ad7bda29e8ee47c
Partially-Implements: blueprint restructure-l2-agent
This commit is contained in:
rossella 2015-03-05 09:24:10 +00:00
parent 3ed7371332
commit 1992d52d63
5 changed files with 346 additions and 54 deletions

View File

@ -34,6 +34,7 @@ from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import ip_lib
from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
@ -1212,6 +1213,88 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info['removed'] = registered_ports - cur_ports
return port_info
def process_ports_events(self, events, registered_ports, ancillary_ports,
updated_ports=None):
port_info = {}
port_info['added'] = set()
port_info['removed'] = set()
port_info['current'] = registered_ports
ancillary_port_info = {}
ancillary_port_info['added'] = set()
ancillary_port_info['removed'] = set()
ancillary_port_info['current'] = (
ancillary_ports if ancillary_ports else set())
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
device_removed_or_added = [
dev for dev in events['added'] if dev in events['removed']]
for device in device_removed_or_added:
if ovs_lib.BaseOVS().port_exists(device['name']):
events['removed'].remove(device)
else:
events['added'].remove(device)
#TODO(rossella_s): scanning the ancillary bridge won't be needed
# anymore when https://review.openstack.org/#/c/203381 since the bridge
# id stored in external_ids will be used to identify the bridge the
# port belongs to
cur_ancillary_ports = set()
for bridge in self.ancillary_brs:
cur_ancillary_ports |= bridge.get_vif_port_set()
cur_ancillary_ports |= ancillary_port_info['current']
def _process_device(device, devices, ancillary_devices):
# check 'iface-id' is set otherwise is not a port
# the agent should care about
if 'attached-mac' in device.get('external_ids', []):
iface_id = self.int_br.portid_from_external_ids(
device['external_ids'])
if iface_id:
if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
#TODO(rossella_s) it's extreme to trigger a full resync
# if a port is not ready, resync only the device that
# is not ready
raise Exception(
_("Port %s is not ready, resync needed") % device[
'name'])
# check if device belong to ancillary bridge
if iface_id in cur_ancillary_ports:
ancillary_devices.add(iface_id)
else:
devices.add(iface_id)
for device in events['added']:
_process_device(device, port_info['added'],
ancillary_port_info['added'])
for device in events['removed']:
_process_device(device, port_info['removed'],
ancillary_port_info['removed'])
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
# Disregard devices that were never noticed by the agent
port_info['removed'] &= port_info['current']
port_info['current'] |= port_info['added']
port_info['current'] -= port_info['removed']
ancillary_port_info['removed'] &= ancillary_port_info['current']
ancillary_port_info['current'] |= ancillary_port_info['added']
ancillary_port_info['current'] -= ancillary_port_info['removed']
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= port_info['current']
port_info['updated'] = updated_ports
return port_info, ancillary_port_info
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
@ -1656,12 +1739,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs):
# There are polling managers that don't have get_events, e.g.
# AlwaysPoll used by windows implementations
# REVISIT (rossella_s) This needs to be reworked to hide implementation
# details regarding polling in BasePollingManager subclasses
if sync or not (hasattr(polling_manager, 'get_events')):
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
consecutive_resyncs = consecutive_resyncs + 1
if (consecutive_resyncs >=
constants.MAX_DEVICE_RETRIES):
LOG.warn(_LW(
"Clearing cache of registered ports,"
" retries to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
# NOTE(rossella_s) don't empty the queue of events
# calling polling_manager.get_events() since
# the agent might miss some event (for example a port
# deletion)
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
" - ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
else:
ancillary_port_info = {}
else:
consecutive_resyncs = 0
events = polling_manager.get_events()
ancillary_ports = (
ancillary_ports if self.ancillary_brs else None)
port_info, ancillary_port_info = (
self.process_ports_events(events, ports,
ancillary_ports, updated_ports_copy))
return port_info, ancillary_port_info, consecutive_resyncs
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
sync = True
sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
@ -1674,20 +1809,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
polling_manager.force_polling()
consecutive_resyncs = consecutive_resyncs + 1
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
LOG.warn(_LW("Clearing cache of registered ports, retrials"
" to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
sync = False
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
@ -1703,6 +1824,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
# restart the polling manager so that it will signal as added
# all the current ports
# REVISIT (rossella_s) Define a method "reset" in
# BasePollingManager that will be implemented by AlwaysPoll as
# no action and by InterfacePollingMinimizer as start/stop
if isinstance(
polling_manager, linux_polling.InterfacePollingMinimizer):
polling_manager.stop()
polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
@ -1719,7 +1849,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
if self._agent_has_updates(polling_manager) or ovs_restarted:
if self._agent_has_updates(polling_manager) or sync:
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
@ -1731,9 +1861,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
port_info, ancillary_port_info, consecutive_resyncs = (
self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs)
)
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
@ -1744,16 +1878,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or

View File

@ -118,13 +118,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
agent.ancillary_brs = None
return agent
def start_agent(self, agent, unplug_ports=None):
def _mock_get_events(self, agent, polling_manager, ports):
get_events = polling_manager.get_events
p_ids = [p['id'] for p in ports]
def filter_events():
events = get_events()
filtered_ports = []
for dev in events['added']:
iface_id = agent.int_br.portid_from_external_ids(
dev.get('external_ids', []))
if iface_id in p_ids:
# if the event is not about a port that was created by
# this test, we filter the event out. Since these tests are
# not run in isolation processing all the events might make
# some test fail ( e.g. the agent might keep resycing
# because it keeps finding not ready ports that are created
# by other tests)
filtered_ports.append(dev)
return {'added': filtered_ports, 'removed': events['removed']}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
if ports is None:
ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
@ -138,6 +163,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)

View File

@ -73,7 +73,6 @@ class TestOVSAgent(base.OVSAgentTestFramework):
port_dicts=self.create_test_ports())
self.wait_until_ports_state(self.ports, up=True)
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
# OVS restarted, the agent should reprocess all the ports
self.agent.plugin_rpc.update_device_list.reset_mock()
self.wait_until_ports_state(self.ports, up=True)

View File

@ -401,6 +401,130 @@ class TestOvsNeutronAgent(object):
updated_ports)
self.assertEqual(expected, actual)
def test_process_ports_events_returns_current_for_unchanged_ports(self):
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
events = {'added': [], 'removed': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
'removed': set()}
expected_ancillary = {'current': ancillary_ports, 'added': set(),
'removed': set()}
actual = self.agent.process_ports_events(events, registered_ports,
ancillary_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_returns_port_changes(self):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port2', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 4,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2}
ancillary_ports = {4}
expected_ports = dict(
current={1, 3}, added={3}, removed={2})
expected_ancillary_ports = dict(
current={5}, added={5}, removed={4})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {4, 5}
self.agent.ancillary_brs = [ancillary_bridge]
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 5, 2, 4]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports)
self.assertEqual(
(expected_ports, expected_ancillary_ports), actual)
def _test_process_port_events_with_updated_ports(self, updated_ports):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port2', 'ofport': 6,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3},
removed={2}, updated={4})
expected_ancillary = dict(current={6, 8}, added={6},
removed={5})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
self.agent.ancillary_brs = [ancillary_bridge]
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 6, 2, 5]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_finds_known_updated_ports(self):
self._test_process_port_events_with_updated_ports({4})
def test_process_port_events_ignores_unknown_updated_ports(self):
# the port '5' was not seen on current ports. Hence it has either
# never been wired or already removed and should be ignored
self._test_process_port_events_with_updated_ports({4, 5})
def test_process_port_events_ignores_updated_port_if_removed(self):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2}
updated_ports = {1, 2}
expected_ports = dict(current={1, 3}, added={3},
removed={2}, updated={1})
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, None, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
events = {'added': [], 'removed': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
added=set(), removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, None, updated_ports)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_process_port_events_ignores_removed_port_if_never_added(self):
events = {'added': [],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
expected_ports = dict(current=registered_ports, added=set(),
removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(events, registered_ports,
None)
self.assertEqual((expected_ports, expected_ancillary), actual)
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
@ -1371,11 +1495,17 @@ class TestOvsNeutronAgent(object):
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
mock.patch.object(async_process.AsyncProcess, "start"),\
mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -1393,7 +1523,8 @@ class TestOvsNeutronAgent(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
process_p_events.side_effect = [(reply2, reply_ancillary),
(reply3, reply_ancillary)]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
@ -1402,10 +1533,12 @@ class TestOvsNeutronAgent(object):
except Exception:
pass
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(), False, set())
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), None, set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), None,
set())
])
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)

View File

@ -495,14 +495,24 @@ class TunnelTest(object):
self._verify_mock_calls()
def test_daemon_loop(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),
'removed': set([])}
reply_ge_1 = {'added': set(['tap0']),
'removed': set([])}
reply3 = {'current': set(['tap2']),
'added': set([]),
reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
reply_pe_1 = {'current': set(['tap0']),
'added': set(['tap0']),
'removed': set([])}
reply_pe_2 = {'current': set([]),
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
@ -513,7 +523,7 @@ class TunnelTest(object):
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -528,8 +538,11 @@ class TunnelTest(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
process_p_events.side_effect = [
(reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
interface_polling = mock.Mock()
interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
@ -539,7 +552,7 @@ class TunnelTest(object):
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
n_agent.daemon_loop()
n_agent.rpc_loop(interface_polling)
except Exception:
pass
@ -547,17 +560,14 @@ class TunnelTest(object):
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(['tap0']), False, set())
process_p_events.assert_has_calls([
mock.call(reply_ge_1, set(), None, set()),
mock.call(reply_ge_2, set(['tap0']), None, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
'added': set([])}, False)
'added': set(['tap0'])}, False),
])
cleanup.assert_called_once_with()