Reprogram flows when ovs-vswitchd restarts

When OVS is restarted, by default it will not reprogram flows which were
programmed. For the case of the OVS agent, this means a restart will cause
all traffic to be switched using the NORMAL action. This is undesirable for
a number of reasons, including obvious security reasons.

This change provides a way for the agent to check if a restart of ovs-vswitchd
has happened in the main agent loop. If a restart of ovs-vswitchd is detected,
the agent will run through the setup of the bridges on the host and reprogram
flows for all the ports connected.

DocImpact
This changes adds a new table (table 23) to the integration bridge, with a
single 'drop' flow. This is used to monitor OVS restarts and to reprogram
flows from the agent.

Change-Id: If9e07465c43115838de23e12a4e0087c9218cea2
Closes-Bug: #1290486
This commit is contained in:
Kyle Mestery 2014-05-16 04:21:32 +00:00
parent f2ffb17f5d
commit 8e9f00a19d
5 changed files with 143 additions and 31 deletions

View File

@ -203,6 +203,13 @@ class OVSBridge(BaseOVS):
else:
self.run_ofctl("del-flows", [flow_expr_str])
def dump_flows_for_table(self, table):
flow_str = "table=%s" % table
flows = self.run_ofctl("dump-flows", [flow_str])
retval = '\n'.join(item for item in flows.splitlines()
if 'NXST' not in item)
return retval
def defer_apply_on(self):
LOG.debug(_('defer_apply_on'))
self.defer_apply_flows = True

View File

@ -206,7 +206,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.updated_ports = set()
self.setup_rpc()
self.setup_integration_br()
self.setup_physical_bridges(bridge_mappings)
self.bridge_mappings = bridge_mappings
self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
self.tun_br_ofports = {p_const.TYPE_GRE: {},
p_const.TYPE_VXLAN: {}}
@ -223,6 +224,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.tunnel_count = 0
self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
self._check_ovs_version()
self.tun_br = None
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
# Collect additional bridges to monitor
@ -541,16 +543,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
'''
if not self.available_local_vlans:
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
return
lvid = self.available_local_vlans.pop()
# On a restart or crash of OVS, the network associated with this VLAN
# will already be assigned, so check for that here before assigning a
# new one.
lvm = self.local_vlan_map.get(net_uuid)
if lvm:
lvid = lvm.vlan
else:
if not self.available_local_vlans:
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
return
lvid = self.available_local_vlans.pop()
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
network_type,
physical_network,
segmentation_id)
LOG.info(_("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
{'vlan_id': lvid, 'net_uuid': net_uuid})
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
physical_network,
segmentation_id)
if network_type in constants.TUNNEL_NETWORK_TYPES:
if self.enable_tunneling:
@ -683,7 +694,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.available_local_vlans.add(lvm.vlan)
def port_bound(self, port, net_uuid,
network_type, physical_network, segmentation_id):
network_type, physical_network, segmentation_id,
ovs_restarted):
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic
to vm.
@ -692,8 +704,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param network_type: the network type ('gre', 'vlan', 'flat', 'local')
:param physical_network: the physical network for 'vlan' or 'flat'
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
:param ovs_restarted: indicates if this is called for an OVS restart.
'''
if net_uuid not in self.local_vlan_map:
if net_uuid not in self.local_vlan_map or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
@ -754,6 +767,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.int_br.remove_all_flows()
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
# Add a canary flow to int_br to track OVS restarts
self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0,
actions="drop")
def setup_ancillary_bridges(self, integ_br, tun_br):
'''Setup ancillary bridges - for example br-ex.'''
@ -781,7 +797,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ancillary_bridges.append(br)
return ancillary_bridges
def setup_tunnel_br(self, tun_br):
def setup_tunnel_br(self, tun_br=None):
'''Setup the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
@ -789,7 +805,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
:param tun_br: the name of the tunnel bridge.
'''
self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
if not self.tun_br:
self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
self.tun_br.reset_bridge()
self.patch_tun_ofport = self.int_br.add_patch_port(
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
@ -1002,7 +1020,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
'removed': removed}
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up):
physical_network, segmentation_id, admin_state_up,
ovs_restarted):
# When this function is called for a port, the port should have
# an OVS ofport configured, as only these ports were considered
# for being treated. If that does not happen, it is a potential
@ -1013,7 +1032,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if vif_port:
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
physical_network, segmentation_id)
physical_network, segmentation_id,
ovs_restarted)
else:
self.port_dead(vif_port)
else:
@ -1071,7 +1091,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.tun_br.delete_port(port_name)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices):
def treat_devices_added_or_updated(self, devices, ovs_restarted):
resync = False
for device in devices:
LOG.debug(_("Processing port %s"), device)
@ -1103,7 +1123,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
details['network_type'],
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'])
details['admin_state_up'],
ovs_restarted)
# update plugin about port status
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
@ -1180,7 +1201,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug(_("Device %s not defined on plugin"), device)
return resync
def process_network_ports(self, port_info):
def process_network_ports(self, port_info, ovs_restarted):
resync_a = False
resync_b = False
# TODO(salv-orlando): consider a solution for ensuring notifications
@ -1203,7 +1224,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if devices_added_updated:
start = time.time()
resync_a = self.treat_devices_added_or_updated(
devices_added_updated)
devices_added_updated, ovs_restarted)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
"treat_devices_added_or_updated completed "
"in %(elapsed).3f"),
@ -1291,6 +1312,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info.get('removed') or
port_info.get('updated'))
def check_ovs_restart(self):
# Check for the canary flow
canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE)
return not canary_flow
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
@ -1300,6 +1326,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
while True:
start = time.time()
port_stats = {'regular': {'added': 0,
@ -1323,7 +1350,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
if self._agent_has_updates(polling_manager):
ovs_restarted = self.check_ovs_restart()
if ovs_restarted:
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
self.setup_tunnel_br()
if self._agent_has_updates(polling_manager) or ovs_restarted:
try:
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
@ -1335,7 +1368,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
port_info = self.scan_ports(ports, updated_ports_copy)
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, updated_ports_copy)
ports = port_info['current']
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
@ -1345,11 +1379,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed()):
self.sg_agent.firewall_refresh_needed() or
ovs_restarted):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
sync = self.process_network_ports(port_info,
ovs_restarted)
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,

View File

@ -47,6 +47,7 @@ LEARN_FROM_TUN = 10
UCAST_TO_TUN = 20
ARP_RESPONDER = 21
FLOOD_TO_TUN = 22
CANARY_TABLE = 23
# Map tunnel types to tables number
TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,

View File

@ -22,10 +22,12 @@ import netaddr
from oslo.config import cfg
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
from neutron.openstack.common import log
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
@ -147,7 +149,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
'db_get_val', return_value=str(old_local_vlan)),
mock.patch.object(self.agent.int_br, 'delete_flows')
) as (set_ovs_db_func, get_ovs_db_func, delete_flows_func):
self.agent.port_bound(port, net_uuid, 'local', None, None)
self.agent.port_bound(port, net_uuid, 'local', None, None, False)
get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
if new_local_vlan != old_local_vlan:
set_ovs_db_func.assert_called_once_with(
@ -283,7 +285,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
side_effect=Exception()),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.Mock())):
self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
False))
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
@ -302,7 +305,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
False))
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
@ -349,7 +353,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
False))
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
@ -380,11 +385,12 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(self.agent, "treat_devices_removed",
return_value=False)
) as (setup_port_filters, device_added_updated, device_removed):
self.assertFalse(self.agent.process_network_ports(port_info))
self.assertFalse(self.agent.process_network_ports(port_info,
False))
setup_port_filters.assert_called_once_with(
port_info['added'], port_info.get('updated', set()))
device_added_updated.assert_called_once_with(
port_info['added'] | port_info.get('updated', set()))
port_info['added'] | port_info.get('updated', set()), False)
device_removed.assert_called_once_with(port_info['removed'])
def test_process_network_ports(self):
@ -800,6 +806,61 @@ class TestOvsNeutronAgent(base.BaseTestCase):
expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent.setup_tunnel_port.assert_has_calls(expected_calls)
def test_ovs_restart(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),
'removed': set([])}
reply3 = {'current': set(['tap2']),
'added': set([]),
'removed': set(['tap0'])}
with contextlib.nested(
mock.patch.object(async_process.AsyncProcess, "_spawn"),
mock.patch.object(log.ContextAdapter, 'exception'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'scan_ports'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'process_network_ports'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'check_ovs_restart'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'setup_integration_br'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'setup_physical_bridges')
) as (spawn_fn, log_exception, scan_ports, process_network_ports,
check_ovs_restart, setup_int_br, setup_phys_br):
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_restart.side_effect = [False, True]
# This will exit after the second loop
try:
self.agent.daemon_loop()
except Exception:
pass
scan_ports.assert_has_calls([
mock.call(set(), set()),
mock.call(set(), set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
'added': set([])}, True)
])
# Verify the second time through the loop we triggered an
# OVS restart and re-setup the bridges
setup_int_br.assert_has_calls([mock.call()])
setup_phys_br.assert_has_calls([mock.call({})])
class AncillaryBridgesTest(base.BaseTestCase):

View File

@ -111,6 +111,8 @@ class TunnelTest(base.BaseTestCase):
mock.call.delete_port('patch-tun'),
mock.call.remove_all_flows(),
mock.call.add_flow(priority=1, actions='normal'),
mock.call.add_flow(priority=0, table=constants.CANARY_TABLE,
actions='drop')
]
self.mock_map_tun_bridge = self.ovs_bridges[self.MAP_TUN_BRIDGE]
@ -476,7 +478,7 @@ class TunnelTest(base.BaseTestCase):
'sudo', 2, ['gre'],
self.VETH_MTU)
a.local_vlan_map[NET_UUID] = LVM
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID, False)
self._verify_mock_calls()
def test_port_unbound(self):
@ -552,6 +554,11 @@ class TunnelTest(base.BaseTestCase):
'added': set([]),
'removed': set(['tap0'])}
self.mock_int_bridge_expected += [
mock.call.dump_flows_for_table(constants.CANARY_TABLE),
mock.call.dump_flows_for_table(constants.CANARY_TABLE)
]
with contextlib.nested(
mock.patch.object(log.ContextAdapter, 'exception'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
@ -590,10 +597,10 @@ class TunnelTest(base.BaseTestCase):
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
'added': set(['tap2'])}),
'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
'added': set([])})
'added': set([])}, False)
])
self._verify_mock_calls()