Merge "Monitor phys_bridges to reconfigured it if created again"
This commit is contained in:
commit
06b9603f0c
@ -12,6 +12,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
@ -19,6 +23,7 @@ from neutron.agent.linux import async_process
|
||||
from neutron.agent.ovsdb import api as ovsdb
|
||||
from neutron.agent.ovsdb.native import helpers
|
||||
from neutron.common import utils
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -29,11 +34,28 @@ OVSDB_ACTION_DELETE = 'delete'
|
||||
OVSDB_ACTION_NEW = 'new'
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def get_bridges_monitor(
|
||||
bridges, ovsdb_monitor_respawn_interval=(
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)):
|
||||
|
||||
mon = SimpleBridgesMonitor(
|
||||
bridges,
|
||||
respawn_interval=ovsdb_monitor_respawn_interval,
|
||||
ovsdb_connection=cfg.CONF.OVS.ovsdb_connection)
|
||||
mon.start()
|
||||
try:
|
||||
yield mon
|
||||
finally:
|
||||
mon.stop()
|
||||
|
||||
|
||||
class OvsdbMonitor(async_process.AsyncProcess):
|
||||
"""Manages an invocation of 'ovsdb-client monitor'."""
|
||||
|
||||
def __init__(self, table_name, columns=None, format=None,
|
||||
respawn_interval=None, ovsdb_connection=None):
|
||||
self.table_name = table_name
|
||||
if ovsdb_connection:
|
||||
# if ovsdb connection is configured (e.g. tcp:ip:port), use it,
|
||||
# and there is no need to run as root
|
||||
@ -51,6 +73,18 @@ class OvsdbMonitor(async_process.AsyncProcess):
|
||||
respawn_interval=respawn_interval,
|
||||
log_output=True,
|
||||
die_on_error=True)
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
|
||||
def get_events(self):
|
||||
self.process_events()
|
||||
events = self.new_events
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
return events
|
||||
|
||||
def start(self, block=False, timeout=5):
|
||||
super(OvsdbMonitor, self).start()
|
||||
if block:
|
||||
utils.wait_until_true(self.is_active)
|
||||
|
||||
|
||||
class SimpleInterfaceMonitor(OvsdbMonitor):
|
||||
@ -69,7 +103,6 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
||||
respawn_interval=respawn_interval,
|
||||
ovsdb_connection=ovsdb_connection
|
||||
)
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
|
||||
@property
|
||||
def has_updates(self):
|
||||
@ -80,17 +113,11 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
||||
temporary if respawn_interval is set.
|
||||
"""
|
||||
if not self.is_active():
|
||||
LOG.error("Interface monitor is not active")
|
||||
LOG.error("%s monitor is not active", self.table_name)
|
||||
else:
|
||||
self.process_events()
|
||||
return bool(self.new_events['added'] or self.new_events['removed'])
|
||||
|
||||
def get_events(self):
|
||||
self.process_events()
|
||||
events = self.new_events
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
return events
|
||||
|
||||
def process_events(self):
|
||||
devices_added = []
|
||||
devices_removed = []
|
||||
@ -118,7 +145,35 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
||||
for event in self.new_events['added']:
|
||||
event['ofport'] = dev_to_ofport.get(event['name'], event['ofport'])
|
||||
|
||||
def start(self, block=False, timeout=5):
|
||||
super(SimpleInterfaceMonitor, self).start()
|
||||
if block:
|
||||
utils.wait_until_true(self.is_active)
|
||||
|
||||
class SimpleBridgesMonitor(OvsdbMonitor):
|
||||
"""Monitors the Bridge table of the local host's ovsdb for changes.
|
||||
|
||||
The bridges_added() method returns all newly created bridges in ovsdb
|
||||
since the monitor started or since the previous access.
|
||||
"""
|
||||
|
||||
def __init__(self, bridges, respawn_interval=None, ovsdb_connection=None):
|
||||
super(SimpleBridgesMonitor, self).__init__(
|
||||
'Bridge',
|
||||
columns=['name'],
|
||||
format='json',
|
||||
respawn_interval=respawn_interval,
|
||||
ovsdb_connection=ovsdb_connection
|
||||
)
|
||||
self.bridges = bridges
|
||||
|
||||
@property
|
||||
def bridges_added(self):
|
||||
eventlet.sleep()
|
||||
return self.get_events()['added']
|
||||
|
||||
def process_events(self):
|
||||
bridges_added = []
|
||||
for row in self.iter_stdout():
|
||||
json = jsonutils.loads(row).get('data')
|
||||
for ovs_id, action, name in json:
|
||||
if name in self.bridges and action == OVSDB_ACTION_INSERT:
|
||||
bridges_added.append(name)
|
||||
|
||||
self.new_events['added'].extend(bridges_added)
|
||||
|
@ -47,6 +47,7 @@ from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.common import polling
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.linux import xenapi_root_helper
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as agent_sg_rpc
|
||||
@ -750,7 +751,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
def port_bound(self, port, net_uuid,
|
||||
network_type, physical_network,
|
||||
segmentation_id, fixed_ips, device_owner,
|
||||
ovs_restarted):
|
||||
provisioning_needed):
|
||||
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic
|
||||
to vm.
|
||||
|
||||
@ -761,9 +762,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
|
||||
:param fixed_ips: the ip addresses assigned to this port
|
||||
:param device_owner: the string indicative of owner of this port
|
||||
:param ovs_restarted: indicates if this is called for an OVS restart.
|
||||
:param provisioning_needed: indicates if this is called for an OVS
|
||||
restart or recreated physical bridges
|
||||
and requires to do local vlan provisioning
|
||||
'''
|
||||
if net_uuid not in self.vlan_manager or ovs_restarted:
|
||||
if net_uuid not in self.vlan_manager or provisioning_needed:
|
||||
self.provision_local_vlan(net_uuid, network_type,
|
||||
physical_network, segmentation_id)
|
||||
lvm = self.vlan_manager.get(net_uuid)
|
||||
@ -1062,6 +1065,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
self.tun_br.setup_default_table(self.patch_int_ofport,
|
||||
self.arp_responder_enabled)
|
||||
|
||||
def _reconfigure_physical_bridges(self, bridges):
|
||||
sync = False
|
||||
bridge_mappings = {}
|
||||
for bridge in bridges:
|
||||
LOG.info("Physical bridge %s was just re-created.", bridge)
|
||||
for phys_net, phys_br in self.bridge_mappings.items():
|
||||
if bridge == phys_br:
|
||||
bridge_mappings[phys_net] = bridge
|
||||
if bridge_mappings:
|
||||
sync = True
|
||||
self.setup_physical_bridges(bridge_mappings)
|
||||
return sync
|
||||
|
||||
def setup_physical_bridges(self, bridge_mappings):
|
||||
'''Setup the physical network bridges.
|
||||
|
||||
@ -1393,7 +1409,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
|
||||
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
|
||||
physical_network, segmentation_id, admin_state_up,
|
||||
fixed_ips, device_owner, ovs_restarted):
|
||||
fixed_ips, device_owner, provisioning_needed):
|
||||
# When this function is called for a port, the port should have
|
||||
# an OVS ofport configured, as only these ports were considered
|
||||
# for being treated. If that does not happen, it is a potential
|
||||
@ -1408,7 +1424,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
port_needs_binding = self.port_bound(
|
||||
vif_port, network_id, network_type,
|
||||
physical_network, segmentation_id,
|
||||
fixed_ips, device_owner, ovs_restarted)
|
||||
fixed_ips, device_owner, provisioning_needed)
|
||||
else:
|
||||
LOG.info("VIF port: %s admin state up disabled, "
|
||||
"putting on the dead VLAN", vif_port.vif_id)
|
||||
@ -1492,7 +1508,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
br.cleanup_tunnel_port(ofport)
|
||||
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
|
||||
|
||||
def treat_devices_added_or_updated(self, devices, ovs_restarted):
|
||||
def treat_devices_added_or_updated(self, devices, provisioning_needed):
|
||||
skipped_devices = []
|
||||
need_binding_devices = []
|
||||
devices_details_list = (
|
||||
@ -1530,7 +1546,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
details['admin_state_up'],
|
||||
details['fixed_ips'],
|
||||
details['device_owner'],
|
||||
ovs_restarted)
|
||||
provisioning_needed)
|
||||
if need_binding:
|
||||
need_binding_devices.append(details)
|
||||
self._update_port_network(details['port_id'],
|
||||
@ -1617,7 +1633,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
if failed_devices:
|
||||
LOG.debug("Port down failed for %s", failed_devices)
|
||||
|
||||
def process_network_ports(self, port_info, ovs_restarted):
|
||||
def process_network_ports(self, port_info, provisioning_needed):
|
||||
failed_devices = {'added': set(), 'removed': set()}
|
||||
# TODO(salv-orlando): consider a solution for ensuring notifications
|
||||
# are processed exactly in the same order in which they were
|
||||
@ -1639,7 +1655,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
(skipped_devices, need_binding_devices,
|
||||
failed_devices['added']) = (
|
||||
self.treat_devices_added_or_updated(
|
||||
devices_added_updated, ovs_restarted))
|
||||
devices_added_updated, provisioning_needed))
|
||||
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
|
||||
"treat_devices_added_or_updated completed. "
|
||||
"Skipped %(num_skipped)d devices of "
|
||||
@ -1983,7 +1999,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
polling_manager.stop()
|
||||
polling_manager.start()
|
||||
|
||||
def rpc_loop(self, polling_manager=None):
|
||||
def rpc_loop(self, polling_manager=None, bridges_monitor=None):
|
||||
if not polling_manager:
|
||||
polling_manager = polling.get_polling_manager(
|
||||
minimize_polling=False)
|
||||
@ -2005,6 +2021,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
LOG.info("rpc_loop doing a full sync.")
|
||||
sync = True
|
||||
self.fullsync = False
|
||||
bridges_recreated = False
|
||||
port_info = {}
|
||||
ancillary_port_info = {}
|
||||
start = time.time()
|
||||
@ -2021,6 +2038,11 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
port_stats = self.get_port_stats({}, {})
|
||||
self.loop_count_and_wait(start, port_stats)
|
||||
continue
|
||||
# Check if any physical bridge wasn't recreated recently
|
||||
if bridges_monitor:
|
||||
bridges_recreated = self._reconfigure_physical_bridges(
|
||||
bridges_monitor.bridges_added)
|
||||
sync |= bridges_recreated
|
||||
# Notify the plugin of tunnel IP
|
||||
if self.enable_tunneling and tunnel_sync:
|
||||
try:
|
||||
@ -2069,8 +2091,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
ovs_restarted):
|
||||
LOG.debug("Starting to process devices in:%s",
|
||||
port_info)
|
||||
provisioning_needed = (
|
||||
ovs_restarted or bridges_recreated)
|
||||
failed_devices = self.process_network_ports(
|
||||
port_info, ovs_restarted)
|
||||
port_info, provisioning_needed)
|
||||
if need_clean_stale_flow:
|
||||
self.cleanup_stale_flows()
|
||||
need_clean_stale_flow = False
|
||||
@ -2115,11 +2139,15 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
signal.signal(signal.SIGTERM, self._handle_sigterm)
|
||||
if hasattr(signal, 'SIGHUP'):
|
||||
signal.signal(signal.SIGHUP, self._handle_sighup)
|
||||
br_names = [br.br_name for br in self.phys_brs.values()]
|
||||
with polling.get_polling_manager(
|
||||
self.minimize_polling,
|
||||
self.ovsdb_monitor_respawn_interval) as pm:
|
||||
self.minimize_polling,
|
||||
self.ovsdb_monitor_respawn_interval) as pm,\
|
||||
ovsdb_monitor.get_bridges_monitor(
|
||||
br_names,
|
||||
self.ovsdb_monitor_respawn_interval) as bm:
|
||||
|
||||
self.rpc_loop(polling_manager=pm)
|
||||
self.rpc_loop(polling_manager=pm, bridges_monitor=bm)
|
||||
|
||||
def _handle_sigterm(self, signum, frame):
|
||||
self.catch_sigterm = True
|
||||
|
@ -141,10 +141,10 @@ class Host(fixtures.Fixture):
|
||||
agent_cfg_fixture.get_br_tun_name())).bridge
|
||||
self.connect_to_internal_network_via_tunneling()
|
||||
else:
|
||||
br_phys = self.useFixture(
|
||||
self.br_phys = self.useFixture(
|
||||
net_helpers.OVSBridgeFixture(
|
||||
agent_cfg_fixture.get_br_phys_name())).bridge
|
||||
self.connect_to_internal_network_via_vlans(br_phys)
|
||||
self.connect_to_internal_network_via_vlans(self.br_phys)
|
||||
|
||||
self.ovs_agent = self.useFixture(
|
||||
process.OVSAgentFixture(
|
||||
|
@ -208,6 +208,22 @@ class TestLegacyL3Agent(TestL3Agent):
|
||||
# Verify north-south connectivity using ping6 to external_vm.
|
||||
vm.block_until_ping(external_vm.ipv6)
|
||||
|
||||
# Now let's remove and create again phys bridge and check connectivity
|
||||
# once again
|
||||
br_phys = self.environment.hosts[0].br_phys
|
||||
br_phys.destroy()
|
||||
br_phys.create()
|
||||
self.environment.hosts[0].connect_to_internal_network_via_vlans(
|
||||
br_phys)
|
||||
|
||||
# ping floating ip from external vm
|
||||
external_vm.block_until_ping(fip['floating_ip_address'])
|
||||
|
||||
# Verify VM is able to reach the router interface.
|
||||
vm.block_until_ping(vm.gateway_ipv6)
|
||||
# Verify north-south connectivity using ping6 to external_vm.
|
||||
vm.block_until_ping(external_vm.ipv6)
|
||||
|
||||
|
||||
class TestHAL3Agent(TestL3Agent):
|
||||
|
||||
|
@ -1672,14 +1672,59 @@ class TestOvsNeutronAgent(object):
|
||||
self.agent.reclaim_local_vlan('net2')
|
||||
tun_br.delete_port.assert_called_once_with('gre-02020202')
|
||||
|
||||
def test_daemon_loop_uses_polling_manager(self):
|
||||
def test_ext_br_recreated(self):
|
||||
bridge_mappings = {'physnet0': 'br-ex0',
|
||||
'physnet1': 'br-ex1'}
|
||||
ex_br_mocks = [mock.Mock(br_name='br-ex0'),
|
||||
mock.Mock(br_name='br-ex1')]
|
||||
phys_bridges = {'physnet0': ex_br_mocks[0],
|
||||
'physnet1': ex_br_mocks[1]},
|
||||
bm_mock = mock.Mock()
|
||||
with mock.patch(
|
||||
'neutron.agent.common.polling.get_polling_manager') as mock_get_pm:
|
||||
with mock.patch.object(self.agent, 'rpc_loop') as mock_loop:
|
||||
self.agent.daemon_loop()
|
||||
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor',
|
||||
return_value=bm_mock),\
|
||||
mock.patch.object(
|
||||
self.agent,
|
||||
'check_ovs_status',
|
||||
return_value=constants.OVS_NORMAL),\
|
||||
mock.patch.object(
|
||||
self.agent,
|
||||
'_agent_has_updates',
|
||||
side_effect=TypeError('loop exit')),\
|
||||
mock.patch.dict(
|
||||
self.agent.bridge_mappings, bridge_mappings, clear=True),\
|
||||
mock.patch.dict(
|
||||
self.agent.phys_brs, phys_bridges, clear=True),\
|
||||
mock.patch.object(
|
||||
self.agent,
|
||||
'setup_physical_bridges') as setup_physical_bridges:
|
||||
bm_mock.bridges_added = ['br-ex0']
|
||||
try:
|
||||
self.agent.rpc_loop(polling_manager=mock.Mock(),
|
||||
bridges_monitor=bm_mock)
|
||||
except TypeError:
|
||||
pass
|
||||
setup_physical_bridges.assert_called_once_with(
|
||||
{'physnet0': 'br-ex0'})
|
||||
|
||||
def test_daemon_loop_uses_polling_manager(self):
|
||||
ex_br_mock = mock.Mock(br_name="br-ex0")
|
||||
with mock.patch(
|
||||
'neutron.agent.common.polling.get_polling_manager'
|
||||
) as mock_get_pm, mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor'
|
||||
) as mock_get_bm, mock.patch.object(
|
||||
self.agent, 'rpc_loop'
|
||||
) as mock_loop, mock.patch.dict(
|
||||
self.agent.phys_brs, {'physnet0': ex_br_mock}, clear=True):
|
||||
|
||||
self.agent.daemon_loop()
|
||||
mock_get_pm.assert_called_with(True,
|
||||
constants.DEFAULT_OVSDBMON_RESPAWN)
|
||||
mock_loop.assert_called_once_with(polling_manager=mock.ANY)
|
||||
mock_get_bm.assert_called_once_with(
|
||||
['br-ex0'], constants.DEFAULT_OVSDBMON_RESPAWN)
|
||||
mock_loop.assert_called_once_with(
|
||||
polling_manager=mock.ANY, bridges_monitor=mock.ANY)
|
||||
|
||||
def test_setup_tunnel_port_invalid_ofport(self):
|
||||
remote_ip = '1.2.3.4'
|
||||
|
Loading…
Reference in New Issue
Block a user