Merge "Add simple ARP spoofing protection"

This commit is contained in:
Jenkins 2015-04-09 01:21:53 +00:00 committed by Gerrit Code Review
commit 483de6313f
13 changed files with 385 additions and 14 deletions

View File

@ -99,6 +99,15 @@
#
# arp_responder = False
# Enable suppression of ARP responses that don't match an IP address that
# belongs to the port from which they originate.
# Note: This prevents the VMs attached to this agent from spoofing,
# it doesn't protect them from other devices which have the capability to spoof
# (e.g. bare metal or VMs attached to agents without this flag set to True).
# Requires a version of OVS that can match ARP headers.
#
# prevent_arp_spoofing = False
# (BoolOpt) Set or un-set the don't fragment (DF) bit on outgoing IP packet
# carrying GRE/VXLAN tunnel. The default value is True.
#

View File

@ -331,6 +331,19 @@ class OVSBridge(BaseOVS):
return edge_ports
def get_vif_port_to_ofport_map(self):
port_names = self.get_port_name_list()
cmd = self.ovsdb.db_list(
'Interface', port_names,
columns=['name', 'external_ids', 'ofport'], if_exists=True)
results = cmd.execute(check_error=True)
port_map = {}
for r in results:
# fall back to basic interface name
key = self.portid_from_external_ids(r['external_ids']) or r['name']
port_map[key] = r['ofport']
return port_map
def get_vif_port_set(self):
edge_ports = set()
port_names = self.get_port_name_list()
@ -346,15 +359,19 @@ class OVSBridge(BaseOVS):
LOG.warn(_LW("Found failed openvswitch port: %s"),
result['name'])
elif 'attached-mac' in result['external_ids']:
external_ids = result['external_ids']
if 'iface-id' in external_ids:
edge_ports.add(external_ids['iface-id'])
elif 'xs-vif-uuid' in external_ids:
iface_id = self.get_xapi_iface_id(
external_ids['xs-vif-uuid'])
edge_ports.add(iface_id)
port_id = self.portid_from_external_ids(result['external_ids'])
if port_id:
edge_ports.add(port_id)
return edge_ports
def portid_from_external_ids(self, external_ids):
if 'iface-id' in external_ids:
return external_ids['iface-id']
if 'xs-vif-uuid' in external_ids:
iface_id = self.get_xapi_iface_id(
external_ids['xs-vif-uuid'])
return iface_id
def get_port_tag_dict(self):
"""Get a dict of port names and associated vlan tags.

View File

@ -107,6 +107,16 @@ def arp_responder_supported():
actions=actions)
def arp_header_match_supported():
return ofctl_arg_supported(cmd='add-flow',
table=24,
priority=1,
proto='arp',
arp_op='0x2',
arp_spa='1.1.1.1',
actions="NORMAL")
def vf_management_supported():
try:
vf_section = ip_link_support.IpLinkSupport.get_vf_mgmt_section()

View File

@ -120,6 +120,15 @@ def check_arp_responder():
return result
def check_arp_header_match():
result = checks.arp_header_match_supported()
if not result:
LOG.error(_LE('Check for Open vSwitch support of ARP header matching '
'failed. ARP spoofing suppression will not work. A '
'newer version of OVS is required.'))
return result
def check_vf_management():
result = checks.vf_management_supported()
if not result:
@ -149,6 +158,8 @@ OPTS = [
help=_('Check for nova notification support')),
BoolOptCallback('arp_responder', check_arp_responder,
help=_('Check for ARP responder support')),
BoolOptCallback('arp_header_match', check_arp_header_match,
help=_('Check for ARP header match support')),
BoolOptCallback('vf_management', check_vf_management,
help=_('Check for VF management support')),
BoolOptCallback('read_netns', check_read_netns,
@ -180,6 +191,8 @@ def enable_tests_from_config():
cfg.CONF.set_override('nova_notify', True)
if cfg.CONF.AGENT.arp_responder:
cfg.CONF.set_override('arp_responder', True)
if config.AGENT.prevent_arp_spoofing:
cfg.CONF.set_override('arp_header_match', True)
if cfg.CONF.ml2_sriov.agent_required:
cfg.CONF.set_override('vf_management', True)
if not cfg.CONF.AGENT.use_helper_for_ns_read:

View File

@ -27,6 +27,7 @@ from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron.i18n import _LW
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
@ -113,6 +114,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
'physical_network': segment[api.PHYSICAL_NETWORK],
'fixed_ips': port['fixed_ips'],
'device_owner': port['device_owner'],
'allowed_address_pairs': port['allowed_address_pairs'],
'port_security_enabled': port.get(psec.PORTSECURITY, True),
'profile': port[portbindings.PROFILE]}
LOG.debug("Returning: %s", entry)
return entry

View File

@ -127,6 +127,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ovsdb_monitor_respawn_interval=(
constants.DEFAULT_OVSDBMON_RESPAWN),
arp_responder=False,
prevent_arp_spoofing=True,
use_veth_interconnection=False,
quitting_rpc_timeout=None):
'''Constructor.
@ -148,6 +149,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
the ovsdb monitor.
:param arp_responder: Optional, enable local ARP responder if it is
supported.
:param prevent_arp_spoofing: Optional, enable suppression of any ARP
responses from ports that don't match an IP address that belongs
to the ports. Spoofing rules will not be added to ports that
have port security disabled.
:param use_veth_interconnection: use veths instead of patch ports to
interconnect the integration bridge to physical bridges.
:param quitting_rpc_timeout: timeout in seconds for rpc calls after
@ -165,6 +170,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# ML2 l2 population mechanism driver.
self.enable_distributed_routing = enable_distributed_routing
self.arp_responder_enabled = arp_responder and self.l2_pop
self.prevent_arp_spoofing = prevent_arp_spoofing
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': cfg.CONF.host,
@ -195,6 +201,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.setup_integration_br()
# Stores port update notifications for processing in main rpc loop
self.updated_ports = set()
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
self.setup_rpc()
self.bridge_mappings = bridge_mappings
self.setup_physical_bridges(self.bridge_mappings)
@ -698,6 +706,48 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
if port.ofport != -1:
self.int_br.delete_flows(in_port=port.ofport)
@staticmethod
def setup_arp_spoofing_protection(bridge, vif, port_details):
# clear any previous flows related to this port in our ARP table
bridge.delete_flows(table=constants.LOCAL_SWITCHING,
in_port=vif.ofport, proto='arp')
bridge.delete_flows(table=constants.ARP_SPOOF_TABLE,
in_port=vif.ofport)
if not port_details.get('port_security_enabled', True):
LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
"it has port security disabled"), vif.port_name)
return
# all of the rules here are based on 'in_port' match criteria
# so their cleanup will be handled by 'update_stale_ofport_rules'
# collect all of the addresses and cidrs that belong to the port
addresses = [f['ip_address'] for f in port_details['fixed_ips']]
if port_details.get('allowed_address_pairs'):
addresses += [p['ip_address']
for p in port_details['allowed_address_pairs']]
# allow ARP replies as long as they match addresses that actually
# belong to the port.
for ip in addresses:
bridge.add_flow(
table=constants.ARP_SPOOF_TABLE, priority=2,
proto='arp', arp_op=constants.ARP_REPLY, arp_spa=ip,
in_port=vif.ofport, actions="NORMAL")
# drop any ARP replies in this table that aren't explicitly allowed
bridge.add_flow(
table=constants.ARP_SPOOF_TABLE, priority=1, proto='arp',
arp_op=constants.ARP_REPLY, actions="DROP")
# Now that the rules are ready, direct ARP traffic from the port into
# the anti-spoof table.
# This strategy fails gracefully because OVS versions that can't match
# on ARP headers will just process traffic normally.
bridge.add_flow(table=constants.LOCAL_SWITCHING,
priority=10, proto='arp', in_port=vif.ofport,
arp_op=constants.ARP_REPLY,
actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE))
def port_unbound(self, vif_id, net_uuid=None):
'''Unbind port.
@ -989,6 +1039,46 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
br.set_db_attribute('Interface', phys_if_name,
'options:peer', int_if_name)
def update_stale_ofport_rules(self):
# right now the ARP spoofing rules are the only thing that utilizes
# ofport-based rules, so make arp_spoofing protection a conditional
# until something else uses ofport
if not self.prevent_arp_spoofing:
return
previous = self.vifname_to_ofport_map
current = self.int_br.get_vif_port_to_ofport_map()
# if any ofport numbers have changed, re-process the devices as
# added ports so any rules based on ofport numbers are updated.
moved_ports = self._get_ofport_moves(current, previous)
if moved_ports:
self.treat_devices_added_or_updated(moved_ports,
ovs_restarted=False)
# delete any stale rules based on removed ofports
ofports_deleted = set(previous.values()) - set(current.values())
for ofport in ofports_deleted:
self.int_br.delete_flows(in_port=ofport)
# store map for next iteration
self.vifname_to_ofport_map = current
@staticmethod
def _get_ofport_moves(current, previous):
"""Returns a list of moved ports.
Takes two port->ofport maps and returns a list ports that moved to a
different ofport. Deleted ports are not included.
"""
port_moves = []
for name, ofport in previous.items():
if name not in current:
continue
current_ofport = current[name]
if ofport != current_ofport:
port_moves.append(name)
return port_moves
def scan_ports(self, registered_ports, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
@ -1163,6 +1253,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
details['fixed_ips'],
details['device_owner'],
ovs_restarted)
if self.prevent_arp_spoofing:
self.setup_arp_spoofing_protection(self.int_br,
port, details)
# update plugin about port status
# FIXME(salv-orlando): Failures while updating device status
# must be handled appropriately. Otherwise this might prevent
@ -1475,6 +1568,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, updated_ports_copy)
self.update_stale_ofport_rules()
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f",
@ -1588,6 +1682,7 @@ def create_agent_config_map(config):
enable_distributed_routing=config.AGENT.enable_distributed_routing,
l2_population=config.AGENT.l2_population,
arp_responder=config.AGENT.arp_responder,
prevent_arp_spoofing=config.AGENT.prevent_arp_spoofing,
use_veth_interconnection=config.OVS.use_veth_interconnection,
quitting_rpc_timeout=config.AGENT.quitting_rpc_timeout
)

View File

@ -74,6 +74,17 @@ agent_opts = [
"Allows the switch (when supporting an overlay) "
"to respond to an ARP request locally without "
"performing a costly ARP broadcast into the overlay.")),
cfg.BoolOpt('prevent_arp_spoofing', default=False,
help=_("Enable suppression of ARP responses that don't match "
"an IP address that belongs to the port from which "
"they originate. Note: This prevents the VMs attached "
"to this agent from spoofing, it doesn't protect them "
"from other devices which have the capability to spoof "
"(e.g. bare metal or VMs attached to agents without "
"this flag set to True). Spoofing rules will not be "
"added to any ports that have port security disabled. "
"This requires a version of OVS that supports matching "
"ARP headers.")),
cfg.BoolOpt('dont_fragment', default=True,
help=_("Set or un-set the don't fragment (DF) bit on "
"outgoing IP packet carrying GRE/VXLAN tunnel.")),

View File

@ -59,6 +59,12 @@ DVR_NOT_LEARN_VLAN = 3
# Table 0 is used for forwarding.
CANARY_TABLE = 23
# Table for ARP poison/spoofing prevention rules
ARP_SPOOF_TABLE = 24
# type for ARP reply in ARP header
ARP_REPLY = '0x2'
# Map tunnel types to tables number
TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,
p_const.TYPE_VXLAN: VXLAN_TUN_TO_LV}

View File

@ -0,0 +1,94 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.cmd.sanity import checks
from neutron.plugins.openvswitch.agent import ovs_neutron_agent as ovsagt
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional.agent import test_ovs_lib
class ARPSpoofTestCase(test_ovs_lib.OVSBridgeTestBase,
base.BaseIPVethTestCase):
def setUp(self):
if not checks.arp_header_match_supported():
self.skipTest("ARP header matching not supported")
# NOTE(kevinbenton): it would be way cooler to use scapy for
# these but scapy requires the python process to be running as
# root to bind to the ports.
super(ARPSpoofTestCase, self).setUp()
self.src_addr = '192.168.0.1'
self.dst_addr = '192.168.0.2'
self.src_ns = self._create_namespace()
self.dst_ns = self._create_namespace()
self.src_p = self.useFixture(
net_helpers.OVSPortFixture(self.br, self.src_ns.namespace)).port
self.dst_p = self.useFixture(
net_helpers.OVSPortFixture(self.br, self.dst_ns.namespace)).port
# wait to add IPs until after anti-spoof rules to ensure ARP doesn't
# happen before
def test_arp_spoof_doesnt_block_normal_traffic(self):
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr])
self._setup_arp_spoof_for_port(self.dst_p.name, [self.dst_addr])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
pinger = helpers.Pinger(self.src_ns)
pinger.assert_ping(self.dst_addr)
def test_arp_spoof_blocks_response(self):
# this will prevent the destination from responding to the ARP
# request for it's own address
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
pinger = helpers.Pinger(self.src_ns)
pinger.assert_no_ping(self.dst_addr)
def test_arp_spoof_allowed_address_pairs(self):
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3',
self.dst_addr])
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
pinger = helpers.Pinger(self.src_ns)
pinger.assert_ping(self.dst_addr)
def test_arp_spoof_disable_port_security(self):
# block first and then disable port security to make sure old rules
# are cleared
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'])
self._setup_arp_spoof_for_port(self.dst_p.name, ['192.168.0.3'],
psec=False)
self.src_p.addr.add('%s/24' % self.src_addr)
self.dst_p.addr.add('%s/24' % self.dst_addr)
pinger = helpers.Pinger(self.src_ns)
pinger.assert_ping(self.dst_addr)
def _setup_arp_spoof_for_port(self, port, addrs, psec=True):
of_port_map = self.br.get_vif_port_to_ofport_map()
class VifPort(object):
ofport = of_port_map[port]
port_name = port
ip_addr = addrs.pop()
details = {'port_security_enabled': psec,
'fixed_ips': [{'ip_address': ip_addr}],
'allowed_address_pairs': [
dict(ip_address=ip) for ip in addrs]}
ovsagt.OVSNeutronAgent.setup_arp_spoofing_protection(
self.br, VifPort(), details)

View File

@ -21,11 +21,11 @@ from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
class OVSBridgeTestCase(base.BaseOVSLinuxTestCase):
class OVSBridgeTestBase(base.BaseOVSLinuxTestCase):
# TODO(twilson) So far, only ovsdb-related tests are written. It would be
# good to also add the openflow-related functions
def setUp(self):
super(OVSBridgeTestCase, self).setUp()
super(OVSBridgeTestBase, self).setUp()
self.ovs = ovs_lib.BaseOVS()
self.br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
@ -46,6 +46,9 @@ class OVSBridgeTestCase(base.BaseOVSLinuxTestCase):
port_name, ofport = self.create_ovs_port(attrs)
return ovs_lib.VifPort(port_name, ofport, iface_id, mac, self.br)
class OVSBridgeTestCase(OVSBridgeTestBase):
def test_port_lifecycle(self):
(port_name, ofport) = self.create_ovs_port(('type', 'internal'))
# ofport should always be an integer string with value -1 or > 0.

View File

@ -56,6 +56,9 @@ class SanityTestCaseRoot(functional_base.BaseSudoTestCase):
def test_arp_responder_runs(self):
checks.arp_responder_supported()
def test_arp_header_match_runs(self):
checks.arp_header_match_supported()
def test_vf_management_runs(self):
checks.vf_management_supported()

View File

@ -43,6 +43,11 @@ FAKE_IP1 = '10.0.0.1'
FAKE_IP2 = '10.0.0.2'
class FakeVif(object):
ofport = 99
port_name = 'name'
class CreateAgentConfigMap(base.BaseTestCase):
def test_create_agent_config_map_succeeds(self):
@ -1048,9 +1053,12 @@ class TestOvsNeutronAgent(base.BaseTestCase):
'setup_integration_br'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'setup_physical_bridges'),
mock.patch.object(time, 'sleep')
mock.patch.object(time, 'sleep'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'update_stale_ofport_rules')
) as (spawn_fn, log_exception, scan_ports, process_network_ports,
check_ovs_status, setup_int_br, setup_phys_br, time_sleep):
check_ovs_status, setup_int_br, setup_phys_br, time_sleep,
update_stale):
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
@ -1078,7 +1086,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
'removed': set(['tap0']),
'added': set([])}, True)
])
self.assertTrue(update_stale.called)
# Verify the second time through the loop we triggered an
# OVS restart and re-setup the bridges
setup_int_br.assert_has_calls([mock.call()])
@ -1098,6 +1106,102 @@ class TestOvsNeutronAgent(base.BaseTestCase):
self.agent._handle_sigterm(None, None)
self.assertFalse(mock_set_rpc.called)
def test_arp_spoofing_disabled(self):
self.agent.prevent_arp_spoofing = False
# all of this is required just to get to the part of
# treat_devices_added_or_updated that checks the prevent_arp_spoofing
# flag
self.agent.int_br = mock.Mock()
self.agent.treat_vif_port = mock.Mock()
self.agent.get_vif_port_by_id = mock.Mock(return_value=FakeVif())
self.agent.plugin_rpc = mock.Mock()
plist = [{a: a for a in ('port_id', 'network_id', 'network_type',
'physical_network', 'segmentation_id',
'admin_state_up', 'fixed_ips', 'device',
'device_owner')}]
self.agent.plugin_rpc.get_devices_details_list.return_value = plist
self.agent.setup_arp_spoofing_protection = mock.Mock()
self.agent.treat_devices_added_or_updated([], False)
self.assertFalse(self.agent.setup_arp_spoofing_protection.called)
def test_arp_spoofing_port_security_disabled(self):
int_br = mock.Mock()
self.agent.setup_arp_spoofing_protection(
int_br, FakeVif(), {'port_security_enabled': False})
self.assertFalse(int_br.add_flows.called)
def test_arp_spoofing_basic_rule_setup(self):
vif = FakeVif()
fake_details = {'fixed_ips': []}
self.agent.prevent_arp_spoofing = True
int_br = mock.Mock()
self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details)
int_br.delete_flows.assert_has_calls(
[mock.call(table=mock.ANY, in_port=vif.ofport)])
# make sure redirect into spoof table is installed
int_br.add_flow.assert_any_call(
table=constants.LOCAL_SWITCHING, in_port=vif.ofport,
arp_op=constants.ARP_REPLY, proto='arp', actions=mock.ANY,
priority=10)
# make sure drop rule for replies is installed
int_br.add_flow.assert_any_call(
table=constants.ARP_SPOOF_TABLE,
proto='arp', arp_op=constants.ARP_REPLY, actions='DROP',
priority=mock.ANY)
def test_arp_spoofing_fixed_and_allowed_addresses(self):
vif = FakeVif()
fake_details = {
'fixed_ips': [{'ip_address': '192.168.44.100'},
{'ip_address': '192.168.44.101'}],
'allowed_address_pairs': [{'ip_address': '192.168.44.102/32'},
{'ip_address': '192.168.44.103/32'}]
}
self.agent.prevent_arp_spoofing = True
int_br = mock.Mock()
self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details)
# make sure all addresses are allowed
for addr in ('192.168.44.100', '192.168.44.101', '192.168.44.102/32',
'192.168.44.103/32'):
int_br.add_flow.assert_any_call(
table=constants.ARP_SPOOF_TABLE, in_port=vif.ofport,
proto='arp', arp_op=constants.ARP_REPLY, actions='NORMAL',
arp_spa=addr, priority=mock.ANY)
def test__get_ofport_moves(self):
previous = {'port1': 1, 'port2': 2}
current = {'port1': 5, 'port2': 2}
# we expect it to tell us port1 moved
expected = ['port1']
self.assertEqual(expected,
self.agent._get_ofport_moves(current, previous))
def test_update_stale_ofport_rules_clears_old(self):
self.agent.prevent_arp_spoofing = True
self.agent.vifname_to_ofport_map = {'port1': 1, 'port2': 2}
self.agent.int_br = mock.Mock()
# simulate port1 was removed
newmap = {'port2': 2}
self.agent.int_br.get_vif_port_to_ofport_map.return_value = newmap
self.agent.update_stale_ofport_rules()
# rules matching port 1 should have been deleted
self.assertEqual(self.agent.int_br.delete_flows.mock_calls,
[mock.call(in_port=1)])
# make sure the state was updated with the new map
self.assertEqual(self.agent.vifname_to_ofport_map, newmap)
def test_update_stale_ofport_rules_treats_moved(self):
self.agent.prevent_arp_spoofing = True
self.agent.vifname_to_ofport_map = {'port1': 1, 'port2': 2}
self.agent.treat_devices_added_or_updated = mock.Mock()
self.agent.int_br = mock.Mock()
# simulate port1 was moved
newmap = {'port2': 2, 'port1': 90}
self.agent.int_br.get_vif_port_to_ofport_map.return_value = newmap
self.agent.update_stale_ofport_rules()
self.agent.treat_devices_added_or_updated.assert_called_with(
['port1'], ovs_restarted=False)
class AncillaryBridgesTest(base.BaseTestCase):

View File

@ -521,9 +521,11 @@ class TunnelTest(base.BaseTestCase):
'process_network_ports'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'tunnel_sync'),
mock.patch.object(time, 'sleep')
mock.patch.object(time, 'sleep'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
'update_stale_ofport_rules')
) as (log_exception, scan_ports, process_network_ports,
ts, time_sleep):
ts, time_sleep, update_stale):
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
@ -555,6 +557,7 @@ class TunnelTest(base.BaseTestCase):
'removed': set(['tap0']),
'added': set([])}, False)
])
self.assertTrue(update_stale.called)
self._verify_mock_calls()