Merge "Add a functional test to validate dvr snat namespace"

This commit is contained in:
Jenkins 2015-09-08 13:24:53 +00:00 committed by Gerrit Code Review
commit 1104702735

View File

@ -1403,3 +1403,60 @@ class TestDvrRouter(L3AgentTestFramework):
self.assertTrue(self._namespace_exists(router1.ns_name)) self.assertTrue(self._namespace_exists(router1.ns_name))
self.assertTrue(self._namespace_exists(fip_ns)) self.assertTrue(self._namespace_exists(fip_ns))
self._assert_snat_namespace_does_not_exist(router1) self._assert_snat_namespace_does_not_exist(router1)
def _assert_snat_namespace_exists(self, router):
namespace = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
router.router_id)
self.assertTrue(self._namespace_exists(namespace))
def _get_dvr_snat_namespace_device_status(
self, router, internal_dev_name=None):
"""Function returns the internal and external device status."""
snat_ns = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
router.router_id)
external_port = router.get_ex_gw_port()
external_device_name = router.get_external_device_name(
external_port['id'])
qg_device_created_successfully = ip_lib.device_exists(
external_device_name, namespace=snat_ns)
sg_device_created_successfully = ip_lib.device_exists(
internal_dev_name, namespace=snat_ns)
return qg_device_created_successfully, sg_device_created_successfully
def test_dvr_router_snat_namespace_with_interface_remove(self):
"""Test to validate the snat namespace with interface remove.
This test validates the snat namespace for all the external
and internal devices. It also validates if the internal
device corresponding to the router interface is removed
when the router interface is deleted.
"""
self.agent.conf.agent_mode = 'dvr_snat'
router_info = self.generate_dvr_router_info()
snat_internal_port = router_info[l3_constants.SNAT_ROUTER_INTF_KEY]
router1 = self.manage_router(self.agent, router_info)
csnat_internal_port = (
router1.router[l3_constants.SNAT_ROUTER_INTF_KEY])
# Now save the internal device name to verify later
internal_device_name = router1._get_snat_int_device_name(
csnat_internal_port[0]['id'])
self._assert_snat_namespace_exists(router1)
qg_device, sg_device = self._get_dvr_snat_namespace_device_status(
router1, internal_dev_name=internal_device_name)
self.assertTrue(qg_device)
self.assertTrue(sg_device)
self.assertEqual(router1.snat_ports, snat_internal_port)
# Now let us not pass INTERFACE_KEY, to emulate
# the interface has been removed.
router1.router[l3_constants.INTERFACE_KEY] = []
# Now let us not pass the SNAT_ROUTER_INTF_KEY, to emulate
# that the server did not send it, since the interface has been
# removed.
router1.router[l3_constants.SNAT_ROUTER_INTF_KEY] = []
self.agent._process_updated_router(router1.router)
router_updated = self.agent.router_info[router_info['id']]
self._assert_snat_namespace_exists(router_updated)
qg_device, sg_device = self._get_dvr_snat_namespace_device_status(
router_updated, internal_dev_name=internal_device_name)
self.assertFalse(sg_device)
self.assertTrue(qg_device)