Monitor phys_bridges to reconfigured it if created again

In case when external bridge configured in OVS agent's bridge_mappings
will be destroyed and created again (for example by running ifup-ovs
script on Centos) bridge wasn't configured by OVS agent.
That might cause broken connectivity for OpenStack's dataplane if
dataplane network also uses same bridge.

This patch adds additional ovsdb-monitor to monitor if any
of physical bridges configured in bridge_mappings was created.
If so, agent will reconfigure it to restore proper openflow rules
on it.

Change-Id: I9c0dc587e70327e03be5a64522d0c679665f79bd
Closes-Bug: #1768990
This commit is contained in:
Sławek Kapłoński 2018-05-04 00:12:03 +02:00
parent 892dab2df4
commit 85b46cd51e
5 changed files with 177 additions and 33 deletions

View File

@ -12,6 +12,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import eventlet
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@ -19,6 +23,7 @@ from neutron.agent.linux import async_process
from neutron.agent.ovsdb import api as ovsdb from neutron.agent.ovsdb import api as ovsdb
from neutron.agent.ovsdb.native import helpers from neutron.agent.ovsdb.native import helpers
from neutron.common import utils from neutron.common import utils
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -29,11 +34,28 @@ OVSDB_ACTION_DELETE = 'delete'
OVSDB_ACTION_NEW = 'new' OVSDB_ACTION_NEW = 'new'
@contextlib.contextmanager
def get_bridges_monitor(
bridges, ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN)):
mon = SimpleBridgesMonitor(
bridges,
respawn_interval=ovsdb_monitor_respawn_interval,
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
mon.start()
try:
yield mon
finally:
mon.stop()
class OvsdbMonitor(async_process.AsyncProcess): class OvsdbMonitor(async_process.AsyncProcess):
"""Manages an invocation of 'ovsdb-client monitor'.""" """Manages an invocation of 'ovsdb-client monitor'."""
def __init__(self, table_name, columns=None, format=None, def __init__(self, table_name, columns=None, format=None,
respawn_interval=None, ovsdb_connection=None): respawn_interval=None, ovsdb_connection=None):
self.table_name = table_name
if ovsdb_connection: if ovsdb_connection:
# if ovsdb connection is configured (e.g. tcp:ip:port), use it, # if ovsdb connection is configured (e.g. tcp:ip:port), use it,
# and there is no need to run as root # and there is no need to run as root
@ -51,6 +73,18 @@ class OvsdbMonitor(async_process.AsyncProcess):
respawn_interval=respawn_interval, respawn_interval=respawn_interval,
log_output=True, log_output=True,
die_on_error=True) die_on_error=True)
self.new_events = {'added': [], 'removed': []}
def get_events(self):
self.process_events()
events = self.new_events
self.new_events = {'added': [], 'removed': []}
return events
def start(self, block=False, timeout=5):
super(OvsdbMonitor, self).start()
if block:
utils.wait_until_true(self.is_active)
class SimpleInterfaceMonitor(OvsdbMonitor): class SimpleInterfaceMonitor(OvsdbMonitor):
@ -69,7 +103,6 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
respawn_interval=respawn_interval, respawn_interval=respawn_interval,
ovsdb_connection=ovsdb_connection ovsdb_connection=ovsdb_connection
) )
self.new_events = {'added': [], 'removed': []}
@property @property
def has_updates(self): def has_updates(self):
@ -80,17 +113,11 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
temporary if respawn_interval is set. temporary if respawn_interval is set.
""" """
if not self.is_active(): if not self.is_active():
LOG.error("Interface monitor is not active") LOG.error("%s monitor is not active", self.table_name)
else: else:
self.process_events() self.process_events()
return bool(self.new_events['added'] or self.new_events['removed']) return bool(self.new_events['added'] or self.new_events['removed'])
def get_events(self):
self.process_events()
events = self.new_events
self.new_events = {'added': [], 'removed': []}
return events
def process_events(self): def process_events(self):
devices_added = [] devices_added = []
devices_removed = [] devices_removed = []
@ -118,7 +145,35 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
for event in self.new_events['added']: for event in self.new_events['added']:
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport']) event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])
def start(self, block=False, timeout=5):
super(SimpleInterfaceMonitor, self).start() class SimpleBridgesMonitor(OvsdbMonitor):
if block: """Monitors the Bridge table of the local host's ovsdb for changes.
utils.wait_until_true(self.is_active)
The bridges_added() method returns all newly created bridges in ovsdb
since the monitor started or since the previous access.
"""
def __init__(self, bridges, respawn_interval=None, ovsdb_connection=None):
super(SimpleBridgesMonitor, self).__init__(
'Bridge',
columns=['name'],
format='json',
respawn_interval=respawn_interval,
ovsdb_connection=ovsdb_connection
)
self.bridges = bridges
@property
def bridges_added(self):
eventlet.sleep()
return self.get_events()['added']
def process_events(self):
bridges_added = []
for row in self.iter_stdout():
json = jsonutils.loads(row).get('data')
for ovs_id, action, name in json:
if name in self.bridges and action == OVSDB_ACTION_INSERT:
bridges_added.append(name)
self.new_events['added'].extend(bridges_added)

View File

@ -47,6 +47,7 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import polling from neutron.agent.common import polling
from neutron.agent.common import utils from neutron.agent.common import utils
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
from neutron.agent.linux import ovsdb_monitor
from neutron.agent.linux import xenapi_root_helper from neutron.agent.linux import xenapi_root_helper
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as agent_sg_rpc from neutron.agent import securitygroups_rpc as agent_sg_rpc
@ -750,7 +751,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def port_bound(self, port, net_uuid, def port_bound(self, port, net_uuid,
network_type, physical_network, network_type, physical_network,
segmentation_id, fixed_ips, device_owner, segmentation_id, fixed_ips, device_owner,
ovs_restarted): provisioning_needed):
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic '''Bind port to net_uuid/lsw_id and install flow for inbound traffic
to vm. to vm.
@ -761,9 +762,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
:param fixed_ips: the ip addresses assigned to this port :param fixed_ips: the ip addresses assigned to this port
:param device_owner: the string indicative of owner of this port :param device_owner: the string indicative of owner of this port
:param ovs_restarted: indicates if this is called for an OVS restart. :param provisioning_needed: indicates if this is called for an OVS
restart or recreated physical bridges
and requires to do local vlan provisioning
''' '''
if net_uuid not in self.vlan_manager or ovs_restarted: if net_uuid not in self.vlan_manager or provisioning_needed:
self.provision_local_vlan(net_uuid, network_type, self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id) physical_network, segmentation_id)
lvm = self.vlan_manager.get(net_uuid) lvm = self.vlan_manager.get(net_uuid)
@ -1062,6 +1065,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
self.tun_br.setup_default_table(self.patch_int_ofport, self.tun_br.setup_default_table(self.patch_int_ofport,
self.arp_responder_enabled) self.arp_responder_enabled)
def _reconfigure_physical_bridges(self, bridges):
sync = False
bridge_mappings = {}
for bridge in bridges:
LOG.info("Physical bridge %s was just re-created.", bridge)
for phys_net, phys_br in self.bridge_mappings.items():
if bridge == phys_br:
bridge_mappings[phys_net] = bridge
if bridge_mappings:
sync = True
self.setup_physical_bridges(bridge_mappings)
return sync
def setup_physical_bridges(self, bridge_mappings): def setup_physical_bridges(self, bridge_mappings):
'''Setup the physical network bridges. '''Setup the physical network bridges.
@ -1393,7 +1409,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def treat_vif_port(self, vif_port, port_id, network_id, network_type, def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up, physical_network, segmentation_id, admin_state_up,
fixed_ips, device_owner, ovs_restarted): fixed_ips, device_owner, provisioning_needed):
# When this function is called for a port, the port should have # When this function is called for a port, the port should have
# an OVS ofport configured, as only these ports were considered # an OVS ofport configured, as only these ports were considered
# for being treated. If that does not happen, it is a potential # for being treated. If that does not happen, it is a potential
@ -1408,7 +1424,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_needs_binding = self.port_bound( port_needs_binding = self.port_bound(
vif_port, network_id, network_type, vif_port, network_id, network_type,
physical_network, segmentation_id, physical_network, segmentation_id,
fixed_ips, device_owner, ovs_restarted) fixed_ips, device_owner, provisioning_needed)
else: else:
LOG.info("VIF port: %s admin state up disabled, " LOG.info("VIF port: %s admin state up disabled, "
"putting on the dead VLAN", vif_port.vif_id) "putting on the dead VLAN", vif_port.vif_id)
@ -1492,7 +1508,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
br.cleanup_tunnel_port(ofport) br.cleanup_tunnel_port(ofport)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None) self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices, ovs_restarted): def treat_devices_added_or_updated(self, devices, provisioning_needed):
skipped_devices = [] skipped_devices = []
need_binding_devices = [] need_binding_devices = []
devices_details_list = ( devices_details_list = (
@ -1530,7 +1546,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
details['admin_state_up'], details['admin_state_up'],
details['fixed_ips'], details['fixed_ips'],
details['device_owner'], details['device_owner'],
ovs_restarted) provisioning_needed)
if need_binding: if need_binding:
need_binding_devices.append(details) need_binding_devices.append(details)
self._update_port_network(details['port_id'], self._update_port_network(details['port_id'],
@ -1617,7 +1633,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
if failed_devices: if failed_devices:
LOG.debug("Port down failed for %s", failed_devices) LOG.debug("Port down failed for %s", failed_devices)
def process_network_ports(self, port_info, ovs_restarted): def process_network_ports(self, port_info, provisioning_needed):
failed_devices = {'added': set(), 'removed': set()} failed_devices = {'added': set(), 'removed': set()}
# TODO(salv-orlando): consider a solution for ensuring notifications # TODO(salv-orlando): consider a solution for ensuring notifications
# are processed exactly in the same order in which they were # are processed exactly in the same order in which they were
@ -1639,7 +1655,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
(skipped_devices, need_binding_devices, (skipped_devices, need_binding_devices,
failed_devices['added']) = ( failed_devices['added']) = (
self.treat_devices_added_or_updated( self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted)) devices_added_updated, provisioning_needed))
LOG.debug("process_network_ports - iteration:%(iter_num)d - " LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. " "treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of " "Skipped %(num_skipped)d devices of "
@ -1983,7 +1999,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
polling_manager.stop() polling_manager.stop()
polling_manager.start() polling_manager.start()
def rpc_loop(self, polling_manager=None): def rpc_loop(self, polling_manager=None, bridges_monitor=None):
if not polling_manager: if not polling_manager:
polling_manager = polling.get_polling_manager( polling_manager = polling.get_polling_manager(
minimize_polling=False) minimize_polling=False)
@ -2005,6 +2021,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
LOG.info("rpc_loop doing a full sync.") LOG.info("rpc_loop doing a full sync.")
sync = True sync = True
self.fullsync = False self.fullsync = False
bridges_recreated = False
port_info = {} port_info = {}
ancillary_port_info = {} ancillary_port_info = {}
start = time.time() start = time.time()
@ -2021,6 +2038,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
port_stats = self.get_port_stats({}, {}) port_stats = self.get_port_stats({}, {})
self.loop_count_and_wait(start, port_stats) self.loop_count_and_wait(start, port_stats)
continue continue
# Check if any physical bridge wasn't recreated recently
if bridges_monitor:
bridges_recreated = self._reconfigure_physical_bridges(
bridges_monitor.bridges_added)
sync |= bridges_recreated
# Notify the plugin of tunnel IP # Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync: if self.enable_tunneling and tunnel_sync:
try: try:
@ -2069,8 +2091,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
ovs_restarted): ovs_restarted):
LOG.debug("Starting to process devices in:%s", LOG.debug("Starting to process devices in:%s",
port_info) port_info)
provisioning_needed = (
ovs_restarted or bridges_recreated)
failed_devices = self.process_network_ports( failed_devices = self.process_network_ports(
port_info, ovs_restarted) port_info, provisioning_needed)
if need_clean_stale_flow: if need_clean_stale_flow:
self.cleanup_stale_flows() self.cleanup_stale_flows()
need_clean_stale_flow = False need_clean_stale_flow = False
@ -2115,11 +2139,15 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
signal.signal(signal.SIGTERM, self._handle_sigterm) signal.signal(signal.SIGTERM, self._handle_sigterm)
if hasattr(signal, 'SIGHUP'): if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, self._handle_sighup) signal.signal(signal.SIGHUP, self._handle_sighup)
br_names = [br.br_name for br in self.phys_brs.values()]
with polling.get_polling_manager( with polling.get_polling_manager(
self.minimize_polling, self.minimize_polling,
self.ovsdb_monitor_respawn_interval) as pm: self.ovsdb_monitor_respawn_interval) as pm,\
ovsdb_monitor.get_bridges_monitor(
br_names,
self.ovsdb_monitor_respawn_interval) as bm:
self.rpc_loop(polling_manager=pm) self.rpc_loop(polling_manager=pm, bridges_monitor=bm)
def _handle_sigterm(self, signum, frame): def _handle_sigterm(self, signum, frame):
self.catch_sigterm = True self.catch_sigterm = True

View File

@ -141,10 +141,10 @@ class Host(fixtures.Fixture):
agent_cfg_fixture.get_br_tun_name())).bridge agent_cfg_fixture.get_br_tun_name())).bridge
self.connect_to_internal_network_via_tunneling() self.connect_to_internal_network_via_tunneling()
else: else:
br_phys = self.useFixture( self.br_phys = self.useFixture(
net_helpers.OVSBridgeFixture( net_helpers.OVSBridgeFixture(
agent_cfg_fixture.get_br_phys_name())).bridge agent_cfg_fixture.get_br_phys_name())).bridge
self.connect_to_internal_network_via_vlans(br_phys) self.connect_to_internal_network_via_vlans(self.br_phys)
self.ovs_agent = self.useFixture( self.ovs_agent = self.useFixture(
process.OVSAgentFixture( process.OVSAgentFixture(

View File

@ -208,6 +208,22 @@ class TestLegacyL3Agent(TestL3Agent):
# Verify north-south connectivity using ping6 to external_vm. # Verify north-south connectivity using ping6 to external_vm.
vm.block_until_ping(external_vm.ipv6) vm.block_until_ping(external_vm.ipv6)
# Now let's remove and create again phys bridge and check connectivity
# once again
br_phys = self.environment.hosts[0].br_phys
br_phys.destroy()
br_phys.create()
self.environment.hosts[0].connect_to_internal_network_via_vlans(
br_phys)
# ping floating ip from external vm
external_vm.block_until_ping(fip['floating_ip_address'])
# Verify VM is able to reach the router interface.
vm.block_until_ping(vm.gateway_ipv6)
# Verify north-south connectivity using ping6 to external_vm.
vm.block_until_ping(external_vm.ipv6)
class TestHAL3Agent(TestL3Agent): class TestHAL3Agent(TestL3Agent):

View File

@ -1672,14 +1672,59 @@ class TestOvsNeutronAgent(object):
self.agent.reclaim_local_vlan('net2') self.agent.reclaim_local_vlan('net2')
tun_br.delete_port.assert_called_once_with('gre-02020202') tun_br.delete_port.assert_called_once_with('gre-02020202')
def test_daemon_loop_uses_polling_manager(self): def test_ext_br_recreated(self):
bridge_mappings = {'physnet0': 'br-ex0',
'physnet1': 'br-ex1'}
ex_br_mocks = [mock.Mock(br_name='br-ex0'),
mock.Mock(br_name='br-ex1')]
phys_bridges = {'physnet0': ex_br_mocks[0],
'physnet1': ex_br_mocks[1]},
bm_mock = mock.Mock()
with mock.patch( with mock.patch(
'neutron.agent.common.polling.get_polling_manager') as mock_get_pm: 'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor',
with mock.patch.object(self.agent, 'rpc_loop') as mock_loop: return_value=bm_mock),\
self.agent.daemon_loop() mock.patch.object(
self.agent,
'check_ovs_status',
return_value=constants.OVS_NORMAL),\
mock.patch.object(
self.agent,
'_agent_has_updates',
side_effect=TypeError('loop exit')),\
mock.patch.dict(
self.agent.bridge_mappings, bridge_mappings, clear=True),\
mock.patch.dict(
self.agent.phys_brs, phys_bridges, clear=True),\
mock.patch.object(
self.agent,
'setup_physical_bridges') as setup_physical_bridges:
bm_mock.bridges_added = ['br-ex0']
try:
self.agent.rpc_loop(polling_manager=mock.Mock(),
bridges_monitor=bm_mock)
except TypeError:
pass
setup_physical_bridges.assert_called_once_with(
{'physnet0': 'br-ex0'})
def test_daemon_loop_uses_polling_manager(self):
ex_br_mock = mock.Mock(br_name="br-ex0")
with mock.patch(
'neutron.agent.common.polling.get_polling_manager'
) as mock_get_pm, mock.patch(
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor'
) as mock_get_bm, mock.patch.object(
self.agent, 'rpc_loop'
) as mock_loop, mock.patch.dict(
self.agent.phys_brs, {'physnet0': ex_br_mock}, clear=True):
self.agent.daemon_loop()
mock_get_pm.assert_called_with(True, mock_get_pm.assert_called_with(True,
constants.DEFAULT_OVSDBMON_RESPAWN) constants.DEFAULT_OVSDBMON_RESPAWN)
mock_loop.assert_called_once_with(polling_manager=mock.ANY) mock_get_bm.assert_called_once_with(
['br-ex0'], constants.DEFAULT_OVSDBMON_RESPAWN)
mock_loop.assert_called_once_with(
polling_manager=mock.ANY, bridges_monitor=mock.ANY)
def test_setup_tunnel_port_invalid_ofport(self): def test_setup_tunnel_port_invalid_ofport(self):
remote_ip = '1.2.3.4' remote_ip = '1.2.3.4'