Browse Source

DVR: Add forwarding routes based on address_scopes

When we create agent gateway port on all the nodes irrespective
of the floatingips we can basically use that agent gateway port to
forward traffic in and out of the nodes if the address_scopes match,
since we don't need SNAT functionality if address scopes match.

If a gateway is configured and if it has internal ports that belong
to the same address_scopes then no need to add the redirect rules.
At the same we should also add a static route in the fip namespace
for every interface that is connected to the router that belongs to
the same address scope.

Change-Id: Iaf6d3b38b1fb45772cf0b88706586c057ddb0230
Closes-Bug: #1577488
tags/11.0.0.0b2
Swaminathan Vasudevan 3 years ago
parent
commit
fb2093c365
5 changed files with 351 additions and 28 deletions
  1. +2
    -0
      neutron/agent/l3/dvr_fip_ns.py
  2. +113
    -18
      neutron/agent/l3/dvr_local_router.py
  3. +143
    -2
      neutron/tests/functional/agent/l3/test_dvr_router.py
  4. +49
    -7
      neutron/tests/unit/agent/l3/test_agent.py
  5. +44
    -1
      neutron/tests/unit/agent/l3/test_dvr_local_router.py

+ 2
- 0
neutron/agent/l3/dvr_fip_ns.py View File

@@ -43,6 +43,8 @@ FIP_RT_TBL = 16
# Rule priority range for FIPs
FIP_PR_START = 32768
FIP_PR_END = FIP_PR_START + 40000
# Fixed rule priority for Fast Path Exit rules
FAST_PATH_EXIT_PR = 80000


class FipNamespace(namespaces.Namespace):

+ 113
- 18
neutron/agent/l3/dvr_local_router.py View File

@@ -358,15 +358,24 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
# entries for the dvr services ports into the router
# namespace. This does not have dependency on the
# external_gateway port or the agent_mode.
ex_gw_port = self.get_ex_gw_port()
for subnet in port['subnets']:
self._set_subnet_arp_info(subnet['id'])
if ex_gw_port:
# Check for address_scopes here if gateway exists.
if self._check_if_address_scopes_match(port, ex_gw_port):
self._add_interface_routing_rule_to_router_ns(port)
self._add_interface_route_to_fip_ns(port)
self._snat_redirect_add_from_port(port)

def _snat_redirect_add_from_port(self, port):
ex_gw_port = self.get_ex_gw_port()
if not ex_gw_port:
return

if self._check_if_address_scopes_match(port, ex_gw_port):
# If address scopes match there is no need to cleanup the
# snat redirect rules, hence return here.
return
sn_port = self.get_snat_port_for_internal_port(port)
if not sn_port:
return
@@ -375,9 +384,21 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
self._snat_redirect_add(sn_port, port, interface_name)

def _dvr_internal_network_removed(self, port):
# Clean up the cached arp entries related to the port subnet
for subnet in port['subnets']:
self._delete_arp_cache_for_internal_port(subnet)

if not self.ex_gw_port:
return

# Delete DVR address_scope static route for the removed interface
# Check for address_scopes here.
if self._check_if_address_scopes_match(port, self.ex_gw_port):
self._delete_interface_route_in_fip_ns(port)
self._delete_interface_routing_rule_in_router_ns(port)
# If address scopes match there is no need to cleanup the
# snat redirect rules, hence return here.
return
sn_port = self.get_snat_port_for_internal_port(port, self.snat_ports)
if not sn_port:
return
@@ -385,9 +406,6 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
# DVR handling code for SNAT
interface_name = self.get_internal_device_name(port['id'])
self._snat_redirect_remove(sn_port, port, interface_name)
# Clean up the cached arp entries related to the port subnet
for subnet in port['subnets']:
self._delete_arp_cache_for_internal_port(subnet)

def internal_network_removed(self, port):
self._dvr_internal_network_removed(port)
@@ -404,18 +422,33 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
if ip_lib.device_exists(fip_int, namespace=self.fip_ns.get_name()):
return self.fip_ns.get_rtr_ext_device_name(self.router_id)

def enable_snat_redirect_rules(self, ex_gw_port):
for p in self.internal_ports:
if not self._check_if_address_scopes_match(p, ex_gw_port):
gateway = self.get_snat_port_for_internal_port(p)
if not gateway:
continue
internal_dev = self.get_internal_device_name(p['id'])
self._snat_redirect_add(gateway, p, internal_dev)

def disable_snat_redirect_rules(self, ex_gw_port):
for p in self.internal_ports:
if not self._check_if_address_scopes_match(p, ex_gw_port):
gateway = self.get_snat_port_for_internal_port(
p, self.snat_ports)
if not gateway:
continue
internal_dev = self.get_internal_device_name(p['id'])
self._snat_redirect_remove(gateway, p, internal_dev)

def external_gateway_added(self, ex_gw_port, interface_name):
# TODO(Carl) Refactor external_gateway_added/updated/removed to use
# super class implementation where possible. Looks like preserve_ips,
# and ns_name are the key differences.
cmd = ['net.ipv4.conf.all.send_redirects=0']
ip_lib.sysctl(cmd, namespace=self.ns_name)
for p in self.internal_ports:
gateway = self.get_snat_port_for_internal_port(p)
id_name = self.get_internal_device_name(p['id'])
if gateway:
self._snat_redirect_add(gateway, p, id_name)

self.enable_snat_redirect_rules(ex_gw_port)
for port in self.get_snat_interfaces():
for ip in port['fixed_ips']:
self._update_arp_entry(ip['ip_address'],
@@ -441,15 +474,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
# gateway is cleared and should not be called when the gateway
# is moved or rescheduled.
if not self.router.get('gw_port'):
for p in self.internal_ports:
# NOTE: When removing the gateway port, pass in the snat_port
# cache along with the current ports.
gateway = self.get_snat_port_for_internal_port(
p, self.snat_ports)
if not gateway:
continue
internal_interface = self.get_internal_device_name(p['id'])
self._snat_redirect_remove(gateway, p, internal_interface)
self.disable_snat_redirect_rules(ex_gw_port)

def _handle_router_snat_rules(self, ex_gw_port, interface_name):
"""Configures NAT rules for Floating IPs for DVR."""
@@ -503,10 +528,80 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):

def connect_rtr_2_fip(self):
if self.fip_ns.agent_gateway_port and not self.rtr_fip_connect:
ex_gw_port = self.get_ex_gw_port()
self.fip_ns.create_rtr_2_fip_link(self)
self.set_address_scope_interface_routes(ex_gw_port)
self.rtr_fip_connect = True
self.routes_updated([], self.router['routes'])

def _check_if_address_scopes_match(self, int_port, ex_gw_port):
"""Checks and returns the matching state for v4 or v6 scopes."""
int_port_addr_scopes = int_port.get('address_scopes', {})
ext_port_addr_scopes = ex_gw_port.get('address_scopes', {})
key = (
lib_constants.IP_VERSION_6 if self._port_has_ipv6_subnet(int_port)
else lib_constants.IP_VERSION_4)
# NOTE: DVR does not support IPv6 for the floating namespace yet, so
# until we fix it, we probably should use the snat redirect path for
# the ports that have IPv6 address configured.
if ((key != lib_constants.IP_VERSION_6) and
int_port_addr_scopes.get(str(key)) in
ext_port_addr_scopes.values()):
return True
return False

def _delete_interface_route_in_fip_ns(self, router_port):
rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name()
fip_ns_name = self.fip_ns.get_name()
device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
if not device.exists():
return
for subnet in router_port['subnets']:
rtr_port_cidr = subnet['cidr']
device.route.delete_route(rtr_port_cidr, str(rtr_2_fip_ip))

def _add_interface_route_to_fip_ns(self, router_port):
rtr_2_fip_ip, fip_2_rtr_name = self.get_rtr_fip_ip_and_interface_name()
fip_ns_name = self.fip_ns.get_name()
device = ip_lib.IPDevice(fip_2_rtr_name, namespace=fip_ns_name)
if not device.exists():
return
for subnet in router_port['subnets']:
rtr_port_cidr = subnet['cidr']
device.route.add_route(rtr_port_cidr, str(rtr_2_fip_ip))

def _add_interface_routing_rule_to_router_ns(self, router_port):
ip_rule = ip_lib.IPRule(namespace=self.ns_name)
for subnet in router_port['subnets']:
rtr_port_cidr = subnet['cidr']
ip_rule.rule.add(ip=rtr_port_cidr,
table=dvr_fip_ns.FIP_RT_TBL,
priority=dvr_fip_ns.FAST_PATH_EXIT_PR)

def _delete_interface_routing_rule_in_router_ns(self, router_port):
ip_rule = ip_lib.IPRule(namespace=self.ns_name)
for subnet in router_port['subnets']:
rtr_port_cidr = subnet['cidr']
ip_rule.rule.delete(ip=rtr_port_cidr,
table=dvr_fip_ns.FIP_RT_TBL,
priority=dvr_fip_ns.FAST_PATH_EXIT_PR)

def get_rtr_fip_ip_and_interface_name(self):
"""Function that returns the router to fip interface name and ip."""
if self.rtr_fip_subnet is None:
self.rtr_fip_subnet = self.fip_ns.local_subnets.allocate(
self.router_id)
rtr_2_fip, __ = self.rtr_fip_subnet.get_pair()
fip_2_rtr_name = self.fip_ns.get_int_device_name(self.router_id)
return rtr_2_fip.ip, fip_2_rtr_name

def set_address_scope_interface_routes(self, ex_gw_port):
"""Sets routing rules for router interfaces if addr scopes match."""
for port in self.internal_ports:
if self._check_if_address_scopes_match(port, ex_gw_port):
self._add_interface_routing_rule_to_router_ns(port)
self._add_interface_route_to_fip_ns(port)

def create_dvr_external_gateway_on_agent(self, ex_gw_port):
fip_agent_port = self.get_floating_agent_gw_interface(
ex_gw_port['network_id'])

+ 143
- 2
neutron/tests/functional/agent/l3/test_dvr_router.py View File

@@ -1259,6 +1259,145 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self._setup_dvr_router_static_routes(
router_namespace=False, check_fpr_int_rule_delete=True)

def _assert_fip_namespace_interface_static_routes(
self, address_scopes, fpr_device,
router_info, rtr_2_fip, fpr_device_name):
fixed_ips_1 = router_info[lib_constants.INTERFACE_KEY][0]['fixed_ips']
fixed_ips_2 = router_info[lib_constants.INTERFACE_KEY][1]['fixed_ips']
actual_routes = fpr_device.route.list_routes(
ip_version=lib_constants.IP_VERSION_4, table='main',
via=str(rtr_2_fip.ip))
if not address_scopes:
self.assertEqual([], actual_routes)

if address_scopes:
cidr1 = (
str(fixed_ips_1[0]['ip_address']) +
'/' + str(fixed_ips_1[0]['prefixlen']))
cidr2 = (
str(fixed_ips_2[0]['ip_address']) +
'/' + str(fixed_ips_2[0]['prefixlen']))
net_addr_1 = netaddr.IPNetwork(cidr1).network
net_addr_2 = netaddr.IPNetwork(cidr2).network
route_cidr_1 = (
str(net_addr_1) + '/' +
str(fixed_ips_1[0]['prefixlen']))
route_cidr_2 = (
str(net_addr_2) + '/' +
str(fixed_ips_2[0]['prefixlen']))
expected_routes = [{'dev': fpr_device_name,
'cidr': unicode(route_cidr_1),
'via': str(rtr_2_fip.ip),
'table': 'main'},
{'dev': fpr_device_name,
'cidr': unicode(route_cidr_2),
'via': str(rtr_2_fip.ip),
'table': 'main'}]
# Comparing the static routes for both internal interfaces on the
# main table.
self.assertEqual(expected_routes, actual_routes)
else:
self.assertEqual([], actual_routes)

def _assert_interface_rules_on_gateway_remove(
self, router, agent, address_scopes, agent_gw_port,
rfp_device, fpr_device):

router.router[n_const.SNAT_ROUTER_INTF_KEY] = []
router.router['gw_port'] = ""
router.router['gw_port_host'] = ""
self.agent._process_updated_router(router.router)
router_updated = self.agent.router_info[router.router['id']]
self.assertFalse(rfp_device.exists())
self.assertFalse(fpr_device.exists())
self.assertTrue(self._namespace_exists(router_updated.ns_name))
self._assert_fip_namespace_deleted(
agent_gw_port, assert_ovs_interface=False)
if not address_scopes:
ns_ipr = ip_lib.IPRule(namespace=router_updated.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
self.assertEqual(3, len(ip4_rules_list))
self.assertEqual(2, len(ip6_rules_list))

def _setup_dvr_router_for_fast_path_exit(self, address_scopes=True):
"""Test to validate the fip and router namespace routes.

This test validates the fip and router namespace routes
that are based on the address scopes.
If the address scopes of internal network and external network
matches, the traffic will be forwarded to the fip namespace and
the reverse traffic to the private network is forwarded to the
router namespace.
"""
self.agent.conf.agent_mode = 'dvr'
router_info = self.generate_dvr_router_info(
enable_snat=True, enable_gw=True, enable_floating_ip=True)
fip_agent_gw_port = router_info[n_const.FLOATINGIP_AGENT_INTF_KEY]
self.mock_plugin_api.get_agent_gateway_port.return_value = (
fip_agent_gw_port[0])
router_info[lib_constants.FLOATINGIP_KEY] = []
if address_scopes:
address_scope1 = {
str(lib_constants.IP_VERSION_4): 'scope1'}
address_scope2 = {
str(lib_constants.IP_VERSION_4): 'scope1'}
else:
address_scope1 = {
str(lib_constants.IP_VERSION_4): 'scope2'}
address_scope2 = {
str(lib_constants.IP_VERSION_4): 'scope2'}
router_info['gw_port']['address_scopes'] = {
str(lib_constants.IP_VERSION_4): 'scope1'}
router_info[lib_constants.INTERFACE_KEY][0]['address_scopes'] = (
address_scope1)
router_info[lib_constants.INTERFACE_KEY][1]['address_scopes'] = (
address_scope2)
router1 = self.manage_router(self.agent, router_info)
fip_ns_name = router1.fip_ns.get_name()
self.assertTrue(self._namespace_exists(router1.ns_name))
self.assertTrue(self._namespace_exists(fip_ns_name))

# Check the router namespace for default route.
rfp_device_name = router1.fip_ns.get_rtr_ext_device_name(
router1.router_id)
rfp_device = ip_lib.IPDevice(rfp_device_name,
namespace=router1.ns_name)
fpr_device_name = router1.fip_ns.get_int_device_name(router1.router_id)
fpr_device = ip_lib.IPDevice(fpr_device_name,
namespace=fip_ns_name)
rtr_2_fip, fip_2_rtr = router1.rtr_fip_subnet.get_pair()
self._assert_default_gateway(
fip_2_rtr, rfp_device, rfp_device_name)

# Check if any snat redirect rules in the router namespace exist.
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
# Just make sure the basic set of rules are there in the router
# namespace
self.assertEqual(5, len(ip4_rules_list))
self.assertEqual(2, len(ip6_rules_list))
# Now check the fip namespace static routes for reaching the private
# network.
self._assert_fip_namespace_interface_static_routes(
address_scopes, fpr_device,
router_info, rtr_2_fip, fpr_device_name)

# Now remove the gateway and validate if the respective interface
# routes in router namespace is deleted respectively.
self. _assert_interface_rules_on_gateway_remove(
router1, self.agent, address_scopes, fip_agent_gw_port[0],
rfp_device, fpr_device)

def test_dvr_fip_and_router_namespace_rules_with_address_scopes_match(
self):
self._setup_dvr_router_for_fast_path_exit(address_scopes=True)

def test_dvr_fip_and_router_namespace_rules_with_address_scopes_mismatch(
self):
self._setup_dvr_router_for_fast_path_exit(address_scopes=False)

def test_dvr_router_gateway_update_to_none(self):
self.agent.conf.agent_mode = 'dvr_snat'
router_info = self.generate_dvr_router_info(enable_snat=True)
@@ -1292,13 +1431,15 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertIsNone(ex_gw_device.route.get_gateway())
self.assertIsNone(fg_device.route.get_gateway())

def _assert_fip_namespace_deleted(self, ext_gateway_port):
def _assert_fip_namespace_deleted(
self, ext_gateway_port, assert_ovs_interface=True):
ext_net_id = ext_gateway_port['network_id']
fip_ns = self.agent.get_fip_ns(ext_net_id)
fip_ns.unsubscribe = mock.Mock()
self.agent.fipnamespace_delete_on_ext_net(
self.agent.context, ext_net_id)
self._assert_interfaces_deleted_from_ovs()
if assert_ovs_interface:
self._assert_interfaces_deleted_from_ovs()
fip_ns_name = fip_ns.get_name()
self.assertFalse(self._namespace_exists(fip_ns_name))
self.assertTrue(fip_ns.destroyed)

+ 49
- 7
neutron/tests/unit/agent/l3/test_agent.py View File

@@ -432,7 +432,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def _fixed_ip_cidr(fixed_ip):
return '%s/%s' % (fixed_ip['ip_address'], fixed_ip['prefixlen'])

def _test_internal_network_action_dist(self, action):
def _test_internal_network_action_dist(self, action, scope_match=False):
router = l3_test_common.prepare_router_data(num_internal_ports=2)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self._set_ri_kwargs(agent, router['id'], router)
@@ -468,7 +468,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri.snat_ports = sn_port
ri.ex_gw_port = ex_gw_port
ri.snat_namespace = mock.Mock()

if scope_match:
ri._check_if_address_scopes_match = mock.Mock(return_value=True)
else:
ri._check_if_address_scopes_match = mock.Mock(return_value=False)
if action == 'add':
self.device_exists.return_value = False

@@ -478,8 +481,10 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri._set_subnet_arp_info = mock.Mock()
ri._internal_network_added = mock.Mock()
ri._set_subnet_arp_info = mock.Mock()
ri._port_has_ipv6_subnet = mock.Mock(return_value=False)
ri._add_interface_routing_rule_to_router_ns = mock.Mock()
ri._add_interface_route_to_fip_ns = mock.Mock()
ri.internal_network_added(port)
self.assertEqual(1, ri._snat_redirect_add.call_count)
self.assertEqual(2, ri._internal_network_added.call_count)
ri._set_subnet_arp_info.assert_called_once_with(subnet_id)
ri._internal_network_added.assert_called_with(
@@ -491,19 +496,47 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri._get_snat_int_device_name(sn_port['id']),
lib_constants.SNAT_INT_DEV_PREFIX,
mtu=None)
self.assertTrue(ri._check_if_address_scopes_match.called)
if scope_match:
self.assertTrue(
ri._add_interface_routing_rule_to_router_ns.called)
self.assertTrue(
ri._add_interface_route_to_fip_ns.called)
self.assertEqual(0, ri._snat_redirect_add.call_count)
else:
self.assertFalse(
ri._add_interface_routing_rule_to_router_ns.called)
self.assertFalse(
ri._add_interface_route_to_fip_ns.called)
self.assertEqual(1, ri._snat_redirect_add.call_count)
elif action == 'remove':
self.device_exists.return_value = False
ri.get_snat_port_for_internal_port = mock.Mock(
return_value=sn_port)
ri._delete_arp_cache_for_internal_port = mock.Mock()
ri._snat_redirect_modify = mock.Mock()
ri._port_has_ipv6_subnet = mock.Mock(return_value=False)
ri._delete_interface_routing_rule_in_router_ns = mock.Mock()
ri._delete_interface_route_in_fip_ns = mock.Mock()
ri.internal_network_removed(port)
self.assertEqual(
1, ri._delete_arp_cache_for_internal_port.call_count)
ri._snat_redirect_modify.assert_called_with(
sn_port, port,
ri.get_internal_device_name(port['id']),
is_add=False)
self.assertTrue(ri._check_if_address_scopes_match.called)
if scope_match:
self.assertFalse(ri._snat_redirect_modify.called)
self.assertTrue(
ri._delete_interface_routing_rule_in_router_ns.called)
self.assertTrue(
ri._delete_interface_route_in_fip_ns.called)
else:
ri._snat_redirect_modify.assert_called_with(
sn_port, port,
ri.get_internal_device_name(port['id']),
is_add=False)
self.assertFalse(
ri._delete_interface_routing_rule_in_router_ns.called)
self.assertFalse(
ri._delete_interface_route_in_fip_ns.called)

def test_agent_add_internal_network(self):
self._test_internal_network_action('add')
@@ -511,9 +544,15 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_agent_add_internal_network_dist(self):
self._test_internal_network_action_dist('add')

def test_agent_add_internal_network_dist_with_addr_scope_match(self):
self._test_internal_network_action_dist('add', scope_match=True)

def test_agent_remove_internal_network(self):
self._test_internal_network_action('remove')

def test_agent_remove_internal_network_dist_with_addr_scope_mismatch(self):
self._test_internal_network_action_dist('remove', scope_match=True)

def test_agent_remove_internal_network_dist(self):
self._test_internal_network_action_dist('remove')

@@ -651,6 +690,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri.get_snat_port_for_internal_port = mock.Mock(
return_value=sn_port)
ri._snat_redirect_remove = mock.Mock()
if router.get('distributed'):
ri.fip_ns.delete_rtr_2_fip_link = mock.Mock()
ri.router['gw_port'] = ""
ri.external_gateway_removed(ex_gw_port, interface_name)
if not router.get('distributed'):
@@ -665,6 +706,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri.get_internal_device_name(sn_port['id']))
ri.get_snat_port_for_internal_port.assert_called_with(
mock.ANY, ri.snat_ports)
self.assertTrue(ri.fip_ns.delete_rtr_2_fip_link.called)
else:
raise Exception("Invalid action %s" % action)


+ 44
- 1
neutron/tests/unit/agent/l3/test_dvr_local_router.py View File

@@ -175,11 +175,54 @@ class TestDvrRouterOperations(base.BaseTestCase):
ri.get_floating_ips = mock.Mock(return_value=True)
ri.fip_ns = mock.Mock()
ri.fip_ns.subscribe.return_value = False
ri.rtr_fip_connect = True
ex_gw_port = {'network_id': 'fake_net_id'}
ri.create_dvr_external_gateway_on_agent(ex_gw_port)
ri.fip_ns.create_or_update_gateway_port.assert_called_once_with(
fip_agent_port)

def test_create_dvr_fip_interfaces_with_matching_address_scope(self):
self._setup_create_dvr_fip_interfaces_for_setting_routing_rules(
address_scopes_match=True)

def test_create_dvr_fip_interfaces_with_address_scope_mismatch(self):
self._setup_create_dvr_fip_interfaces_for_setting_routing_rules()

def _setup_create_dvr_fip_interfaces_for_setting_routing_rules(
self, address_scopes_match=False):
ri = self._create_router()
ri.get_floating_agent_gw_interface = mock.Mock()
ri.fip_ns = mock.Mock()
ri._add_interface_routing_rule_to_router_ns = mock.Mock()
ri._add_interface_route_to_fip_ns = mock.Mock()
ri.fip_ns._create_rtr_2_fip_link = mock.Mock()
ri.internal_ports = ['moke_port_1', 'moke_port_2']
if address_scopes_match:
ri._check_if_address_scopes_match = mock.Mock(
return_value=True)
else:
ri._check_if_address_scopes_match = mock.Mock(
return_value=False)
ri.rtr_fip_connect = False
ex_gw_port = {'network_id': 'fake_net_id'}
ri.create_dvr_external_gateway_on_agent(ex_gw_port)
ri.connect_rtr_2_fip()
self.assertTrue(ri._check_if_address_scopes_match.called)
if address_scopes_match:
self.assertTrue(
ri.fip_ns.create_rtr_2_fip_link.called)
self.assertTrue(
ri._add_interface_routing_rule_to_router_ns.called)
self.assertTrue(
ri._add_interface_route_to_fip_ns.called)
else:
self.assertFalse(
ri._add_interface_routing_rule_to_router_ns.called)
self.assertFalse(
ri._add_interface_route_to_fip_ns.called)
self.assertTrue(
ri.fip_ns.create_rtr_2_fip_link.called)

def test_get_floating_ips_dvr(self):
router = mock.MagicMock()
router.get.return_value = [{'host': HOSTNAME},
@@ -298,7 +341,7 @@ class TestDvrRouterOperations(base.BaseTestCase):
ri.fip_ns.local_subnets.allocate.assert_not_called()

# Validate that fip_ns.local_subnets is called when
# rtr_fip_subnet is None
# ri.rtr_fip_subnet is None
ri.rtr_fip_subnet = None
ri.floating_ip_added_dist(fip, ip_cidr)
mIPRule().rule.add.assert_called_with(ip='192.168.0.1',

Loading…
Cancel
Save