ofagent: Stop monitoring ovsdb for port changes

Perform dumb polling instead.
Now ofagent port monitoring is mostly ovsdb-free.
(except _find_lost_vlan_port stuff, which is planned to be retired
by blueprint ofagent-merge-bridges)

Partially-Implements: blueprint ofagent-port-monitor
Change-Id: Ib4701f7c8ea0ee03229d207c0fbbf6f42a55aecb
This commit is contained in:
YAMAMOTO Takashi 2014-07-31 15:31:16 +09:00
parent 5a63085d89
commit f151caea9b
2 changed files with 50 additions and 102 deletions

View File

@ -33,7 +33,6 @@ from ryu.ofproto import ofproto_v1_3 as ryu_ofp13
from neutron.agent import l2population_rpc
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import polling
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
@ -200,9 +199,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
def __init__(self, ryuapp, integ_br, tun_br, local_ip,
bridge_mappings, root_helper,
polling_interval, tunnel_types=None,
veth_mtu=None, minimize_polling=False,
ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN)):
veth_mtu=None):
"""Constructor.
:param ryuapp: object of the ryu app.
@ -216,11 +213,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
the agent. If set, will automatically set enable_tunneling to
True.
:param veth_mtu: MTU size for veth interfaces.
:param minimize_polling: Optional, whether to minimize polling by
monitoring ovsdb for interface changes.
:param ovsdb_monitor_respawn_interval: Optional, when using polling
minimization, the number of seconds to wait before respawning
the ovsdb monitor.
"""
super(OFANeutronAgent, self).__init__()
self.ryuapp = ryuapp
@ -254,8 +246,6 @@ class OFANeutronAgent(n_rpc.RpcCallback,
p_const.TYPE_VXLAN: {}}
self.polling_interval = polling_interval
self.minimize_polling = minimize_polling
self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
self.enable_tunneling = bool(self.tunnel_types)
self.local_ip = local_ip
@ -1260,35 +1250,27 @@ class OFANeutronAgent(n_rpc.RpcCallback,
resync = True
return resync
def _agent_has_updates(self, polling_manager):
return (polling_manager.is_polling_required or
self.updated_ports or
self.sg_agent.firewall_refresh_needed())
def _port_info_has_changes(self, port_info):
return (port_info.get('added') or
port_info.get('removed') or
port_info.get('updated'))
def ovsdb_monitor_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
def daemon_loop(self):
# TODO(yamamoto):
# It might be better to monitor port status async messages
sync = True
ports = set()
updated_ports_copy = set()
tunnel_sync = True
while True:
start = time.time()
port_stats = {'regular': {'added': 0, 'updated': 0, 'removed': 0}}
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%d started"),
LOG.debug("Agent daemon_loop - iteration:%d started",
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
sync = False
polling_manager.force_polling()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
@ -1297,82 +1279,66 @@ class OFANeutronAgent(n_rpc.RpcCallback,
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
if self._agent_has_updates(polling_manager):
try:
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
try:
# Save updated ports dict to perform rollback in
# case resync would be needed, and then clear
# self.updated_ports. As the greenthread should not yield
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
ports = port_info['current']
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed()):
LOG.debug("Starting to process devices in:%s",
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
LOG.debug("Agent daemon_loop - "
"iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Save updated ports dict to perform rollback in
# case resync would be needed, and then clear
# self.updated_ports. As the greenthread should not yield
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
ports = port_info['current']
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed()):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
LOG.debug(_("Agent ovsdb_monitor_loop - "
"iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
port_stats['regular']['updated'] = (
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
polling_manager.polling_completed()
except Exception:
LOG.exception(_("Error while processing VIF ports"))
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
port_stats['regular']['updated'] = (
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
except Exception:
LOG.exception(_("Error while processing VIF ports"))
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
LOG.debug(_("Agent ovsdb_monitor_loop - iteration:%(iter_num)d "
"completed. Processed ports statistics:"
"%(port_stats)s. Elapsed:%(elapsed).3f"),
LOG.debug("Agent daemon_loop - iteration:%(iter_num)d "
"completed. Processed ports statistics:"
"%(port_stats)s. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'port_stats': port_stats,
'elapsed': elapsed})
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug(_("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!"),
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
self.iter_num = self.iter_num + 1
def daemon_loop(self):
# TODO(yamamoto): make polling logic stop using ovsdb monitor
# - make it a dumb periodic polling
# - or, monitor port status async messages
with polling.get_polling_manager(
self.minimize_polling,
self.root_helper,
self.ovsdb_monitor_respawn_interval) as pm:
self.ovsdb_monitor_loop(polling_manager=pm)
def create_agent_config_map(config):
"""Create a map of agent config parameters.
@ -1392,10 +1358,8 @@ def create_agent_config_map(config):
bridge_mappings=bridge_mappings,
root_helper=config.AGENT.root_helper,
polling_interval=config.AGENT.polling_interval,
minimize_polling=config.AGENT.minimize_polling,
tunnel_types=config.AGENT.tunnel_types,
veth_mtu=config.AGENT.veth_mtu,
ovsdb_monitor_respawn_interval=constants.DEFAULT_OVSDBMON_RESPAWN,
)
# If enable_tunneling is TRUE, set tunnel_type to default to GRE

View File

@ -808,22 +808,6 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
self.agent.reclaim_local_vlan(self.lvms[1].net)
del_port_fn.assert_called_once_with(self.tun_name2)
def test_daemon_loop_uses_polling_manager(self):
with mock.patch(
'neutron.agent.linux.polling.get_polling_manager'
) as mock_get_pm:
fake_pm = mock.Mock()
mock_get_pm.return_value = fake_pm
fake_pm.__enter__ = mock.Mock()
fake_pm.__exit__ = mock.Mock()
with mock.patch.object(
self.agent, 'ovsdb_monitor_loop'
) as mock_loop:
self.agent.daemon_loop()
mock_get_pm.assert_called_once_with(True, 'fake_helper',
constants.DEFAULT_OVSDBMON_RESPAWN)
mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
def test__setup_tunnel_port_error_negative(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',