diff --git a/neutron/agent/linux/ovsdb_monitor.py b/neutron/agent/linux/ovsdb_monitor.py index ed4dff56a11..b066ebfc659 100644 --- a/neutron/agent/linux/ovsdb_monitor.py +++ b/neutron/agent/linux/ovsdb_monitor.py @@ -12,6 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + +import eventlet +from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils @@ -19,6 +23,7 @@ from neutron.agent.linux import async_process from neutron.agent.ovsdb import api as ovsdb from neutron.agent.ovsdb.native import helpers from neutron.common import utils +from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants LOG = logging.getLogger(__name__) @@ -29,11 +34,28 @@ OVSDB_ACTION_DELETE = 'delete' OVSDB_ACTION_NEW = 'new' +@contextlib.contextmanager +def get_bridges_monitor( + bridges, ovsdb_monitor_respawn_interval=( + constants.DEFAULT_OVSDBMON_RESPAWN)): + + mon = SimpleBridgesMonitor( + bridges, + respawn_interval=ovsdb_monitor_respawn_interval, + ovsdb_connection=cfg.CONF.OVS.ovsdb_connection) + mon.start() + try: + yield mon + finally: + mon.stop() + + class OvsdbMonitor(async_process.AsyncProcess): """Manages an invocation of 'ovsdb-client monitor'.""" def __init__(self, table_name, columns=None, format=None, respawn_interval=None, ovsdb_connection=None): + self.table_name = table_name if ovsdb_connection: # if ovsdb connection is configured (e.g. tcp:ip:port), use it, # and there is no need to run as root @@ -51,6 +73,18 @@ class OvsdbMonitor(async_process.AsyncProcess): respawn_interval=respawn_interval, log_output=True, die_on_error=True) + self.new_events = {'added': [], 'removed': []} + + def get_events(self): + self.process_events() + events = self.new_events + self.new_events = {'added': [], 'removed': []} + return events + + def start(self, block=False, timeout=5): + super(OvsdbMonitor, self).start() + if block: + utils.wait_until_true(self.is_active) class SimpleInterfaceMonitor(OvsdbMonitor): @@ -69,7 +103,6 @@ class SimpleInterfaceMonitor(OvsdbMonitor): respawn_interval=respawn_interval, ovsdb_connection=ovsdb_connection ) - self.new_events = {'added': [], 'removed': []} @property def has_updates(self): @@ -80,17 +113,11 @@ class SimpleInterfaceMonitor(OvsdbMonitor): temporary if respawn_interval is set. """ if not self.is_active(): - LOG.error("Interface monitor is not active") + LOG.error("%s monitor is not active", self.table_name) else: self.process_events() return bool(self.new_events['added'] or self.new_events['removed']) - def get_events(self): - self.process_events() - events = self.new_events - self.new_events = {'added': [], 'removed': []} - return events - def process_events(self): devices_added = [] devices_removed = [] @@ -118,7 +145,35 @@ class SimpleInterfaceMonitor(OvsdbMonitor): for event in self.new_events['added']: event['ofport'] = dev_to_ofport.get(event['name'], event['ofport']) - def start(self, block=False, timeout=5): - super(SimpleInterfaceMonitor, self).start() - if block: - utils.wait_until_true(self.is_active) + +class SimpleBridgesMonitor(OvsdbMonitor): + """Monitors the Bridge table of the local host's ovsdb for changes. + + The bridges_added() method returns all newly created bridges in ovsdb + since the monitor started or since the previous access. + """ + + def __init__(self, bridges, respawn_interval=None, ovsdb_connection=None): + super(SimpleBridgesMonitor, self).__init__( + 'Bridge', + columns=['name'], + format='json', + respawn_interval=respawn_interval, + ovsdb_connection=ovsdb_connection + ) + self.bridges = bridges + + @property + def bridges_added(self): + eventlet.sleep() + return self.get_events()['added'] + + def process_events(self): + bridges_added = [] + for row in self.iter_stdout(): + json = jsonutils.loads(row).get('data') + for ovs_id, action, name in json: + if name in self.bridges and action == OVSDB_ACTION_INSERT: + bridges_added.append(name) + + self.new_events['added'].extend(bridges_added) diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index a132ef41816..988bf802266 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -47,6 +47,7 @@ from neutron.agent.common import ovs_lib from neutron.agent.common import polling from neutron.agent.common import utils from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager +from neutron.agent.linux import ovsdb_monitor from neutron.agent.linux import xenapi_root_helper from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as agent_sg_rpc @@ -750,7 +751,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, def port_bound(self, port, net_uuid, network_type, physical_network, segmentation_id, fixed_ips, device_owner, - ovs_restarted): + provisioning_needed): '''Bind port to net_uuid/lsw_id and install flow for inbound traffic to vm. @@ -761,9 +762,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' :param fixed_ips: the ip addresses assigned to this port :param device_owner: the string indicative of owner of this port - :param ovs_restarted: indicates if this is called for an OVS restart. + :param provisioning_needed: indicates if this is called for an OVS + restart or recreated physical bridges + and requires to do local vlan provisioning ''' - if net_uuid not in self.vlan_manager or ovs_restarted: + if net_uuid not in self.vlan_manager or provisioning_needed: self.provision_local_vlan(net_uuid, network_type, physical_network, segmentation_id) lvm = self.vlan_manager.get(net_uuid) @@ -1062,6 +1065,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, self.tun_br.setup_default_table(self.patch_int_ofport, self.arp_responder_enabled) + def _reconfigure_physical_bridges(self, bridges): + sync = False + bridge_mappings = {} + for bridge in bridges: + LOG.info("Physical bridge %s was just re-created.", bridge) + for phys_net, phys_br in self.bridge_mappings.items(): + if bridge == phys_br: + bridge_mappings[phys_net] = bridge + if bridge_mappings: + sync = True + self.setup_physical_bridges(bridge_mappings) + return sync + def setup_physical_bridges(self, bridge_mappings): '''Setup the physical network bridges. @@ -1393,7 +1409,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, def treat_vif_port(self, vif_port, port_id, network_id, network_type, physical_network, segmentation_id, admin_state_up, - fixed_ips, device_owner, ovs_restarted): + fixed_ips, device_owner, provisioning_needed): # When this function is called for a port, the port should have # an OVS ofport configured, as only these ports were considered # for being treated. If that does not happen, it is a potential @@ -1408,7 +1424,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, port_needs_binding = self.port_bound( vif_port, network_id, network_type, physical_network, segmentation_id, - fixed_ips, device_owner, ovs_restarted) + fixed_ips, device_owner, provisioning_needed) else: LOG.info("VIF port: %s admin state up disabled, " "putting on the dead VLAN", vif_port.vif_id) @@ -1492,7 +1508,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, br.cleanup_tunnel_port(ofport) self.tun_br_ofports[tunnel_type].pop(remote_ip, None) - def treat_devices_added_or_updated(self, devices, ovs_restarted): + def treat_devices_added_or_updated(self, devices, provisioning_needed): skipped_devices = [] need_binding_devices = [] devices_details_list = ( @@ -1530,7 +1546,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, details['admin_state_up'], details['fixed_ips'], details['device_owner'], - ovs_restarted) + provisioning_needed) if need_binding: need_binding_devices.append(details) self._update_port_network(details['port_id'], @@ -1617,7 +1633,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, if failed_devices: LOG.debug("Port down failed for %s", failed_devices) - def process_network_ports(self, port_info, ovs_restarted): + def process_network_ports(self, port_info, provisioning_needed): failed_devices = {'added': set(), 'removed': set()} # TODO(salv-orlando): consider a solution for ensuring notifications # are processed exactly in the same order in which they were @@ -1639,7 +1655,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, (skipped_devices, need_binding_devices, failed_devices['added']) = ( self.treat_devices_added_or_updated( - devices_added_updated, ovs_restarted)) + devices_added_updated, provisioning_needed)) LOG.debug("process_network_ports - iteration:%(iter_num)d - " "treat_devices_added_or_updated completed. " "Skipped %(num_skipped)d devices of " @@ -1983,7 +1999,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, polling_manager.stop() polling_manager.start() - def rpc_loop(self, polling_manager=None): + def rpc_loop(self, polling_manager=None, bridges_monitor=None): if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) @@ -2005,6 +2021,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, LOG.info("rpc_loop doing a full sync.") sync = True self.fullsync = False + bridges_recreated = False port_info = {} ancillary_port_info = {} start = time.time() @@ -2021,6 +2038,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, port_stats = self.get_port_stats({}, {}) self.loop_count_and_wait(start, port_stats) continue + # Check if any physical bridge wasn't recreated recently + if bridges_monitor: + bridges_recreated = self._reconfigure_physical_bridges( + bridges_monitor.bridges_added) + sync |= bridges_recreated # Notify the plugin of tunnel IP if self.enable_tunneling and tunnel_sync: try: @@ -2069,8 +2091,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, ovs_restarted): LOG.debug("Starting to process devices in:%s", port_info) + provisioning_needed = ( + ovs_restarted or bridges_recreated) failed_devices = self.process_network_ports( - port_info, ovs_restarted) + port_info, provisioning_needed) if need_clean_stale_flow: self.cleanup_stale_flows() need_clean_stale_flow = False @@ -2115,11 +2139,15 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, signal.signal(signal.SIGTERM, self._handle_sigterm) if hasattr(signal, 'SIGHUP'): signal.signal(signal.SIGHUP, self._handle_sighup) + br_names = [br.br_name for br in self.phys_brs.values()] with polling.get_polling_manager( - self.minimize_polling, - self.ovsdb_monitor_respawn_interval) as pm: + self.minimize_polling, + self.ovsdb_monitor_respawn_interval) as pm,\ + ovsdb_monitor.get_bridges_monitor( + br_names, + self.ovsdb_monitor_respawn_interval) as bm: - self.rpc_loop(polling_manager=pm) + self.rpc_loop(polling_manager=pm, bridges_monitor=bm) def _handle_sigterm(self, signum, frame): self.catch_sigterm = True diff --git a/neutron/tests/fullstack/resources/environment.py b/neutron/tests/fullstack/resources/environment.py index 174c0cc58c0..3f58dc92f66 100644 --- a/neutron/tests/fullstack/resources/environment.py +++ b/neutron/tests/fullstack/resources/environment.py @@ -141,10 +141,10 @@ class Host(fixtures.Fixture): agent_cfg_fixture.get_br_tun_name())).bridge self.connect_to_internal_network_via_tunneling() else: - br_phys = self.useFixture( + self.br_phys = self.useFixture( net_helpers.OVSBridgeFixture( agent_cfg_fixture.get_br_phys_name())).bridge - self.connect_to_internal_network_via_vlans(br_phys) + self.connect_to_internal_network_via_vlans(self.br_phys) self.ovs_agent = self.useFixture( process.OVSAgentFixture( diff --git a/neutron/tests/fullstack/test_l3_agent.py b/neutron/tests/fullstack/test_l3_agent.py index c6f1a8394a6..f67be5953d5 100644 --- a/neutron/tests/fullstack/test_l3_agent.py +++ b/neutron/tests/fullstack/test_l3_agent.py @@ -208,6 +208,22 @@ class TestLegacyL3Agent(TestL3Agent): # Verify north-south connectivity using ping6 to external_vm. vm.block_until_ping(external_vm.ipv6) + # Now let's remove and create again phys bridge and check connectivity + # once again + br_phys = self.environment.hosts[0].br_phys + br_phys.destroy() + br_phys.create() + self.environment.hosts[0].connect_to_internal_network_via_vlans( + br_phys) + + # ping floating ip from external vm + external_vm.block_until_ping(fip['floating_ip_address']) + + # Verify VM is able to reach the router interface. + vm.block_until_ping(vm.gateway_ipv6) + # Verify north-south connectivity using ping6 to external_vm. + vm.block_until_ping(external_vm.ipv6) + class TestHAL3Agent(TestL3Agent): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 94fba6f67a5..f9f915b2cdd 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -1672,14 +1672,59 @@ class TestOvsNeutronAgent(object): self.agent.reclaim_local_vlan('net2') tun_br.delete_port.assert_called_once_with('gre-02020202') - def test_daemon_loop_uses_polling_manager(self): + def test_ext_br_recreated(self): + bridge_mappings = {'physnet0': 'br-ex0', + 'physnet1': 'br-ex1'} + ex_br_mocks = [mock.Mock(br_name='br-ex0'), + mock.Mock(br_name='br-ex1')] + phys_bridges = {'physnet0': ex_br_mocks[0], + 'physnet1': ex_br_mocks[1]}, + bm_mock = mock.Mock() with mock.patch( - 'neutron.agent.common.polling.get_polling_manager') as mock_get_pm: - with mock.patch.object(self.agent, 'rpc_loop') as mock_loop: - self.agent.daemon_loop() + 'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor', + return_value=bm_mock),\ + mock.patch.object( + self.agent, + 'check_ovs_status', + return_value=constants.OVS_NORMAL),\ + mock.patch.object( + self.agent, + '_agent_has_updates', + side_effect=TypeError('loop exit')),\ + mock.patch.dict( + self.agent.bridge_mappings, bridge_mappings, clear=True),\ + mock.patch.dict( + self.agent.phys_brs, phys_bridges, clear=True),\ + mock.patch.object( + self.agent, + 'setup_physical_bridges') as setup_physical_bridges: + bm_mock.bridges_added = ['br-ex0'] + try: + self.agent.rpc_loop(polling_manager=mock.Mock(), + bridges_monitor=bm_mock) + except TypeError: + pass + setup_physical_bridges.assert_called_once_with( + {'physnet0': 'br-ex0'}) + + def test_daemon_loop_uses_polling_manager(self): + ex_br_mock = mock.Mock(br_name="br-ex0") + with mock.patch( + 'neutron.agent.common.polling.get_polling_manager' + ) as mock_get_pm, mock.patch( + 'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor' + ) as mock_get_bm, mock.patch.object( + self.agent, 'rpc_loop' + ) as mock_loop, mock.patch.dict( + self.agent.phys_brs, {'physnet0': ex_br_mock}, clear=True): + + self.agent.daemon_loop() mock_get_pm.assert_called_with(True, constants.DEFAULT_OVSDBMON_RESPAWN) - mock_loop.assert_called_once_with(polling_manager=mock.ANY) + mock_get_bm.assert_called_once_with( + ['br-ex0'], constants.DEFAULT_OVSDBMON_RESPAWN) + mock_loop.assert_called_once_with( + polling_manager=mock.ANY, bridges_monitor=mock.ANY) def test_setup_tunnel_port_invalid_ofport(self): remote_ip = '1.2.3.4'