diff --git a/neutron_fwaas/tests/tempest_plugin/tests/scenario/test_fwaas.py b/neutron_fwaas/tests/tempest_plugin/tests/scenario/test_fwaas.py index eea30dd5b..3c529870f 100644 --- a/neutron_fwaas/tests/tempest_plugin/tests/scenario/test_fwaas.py +++ b/neutron_fwaas/tests/tempest_plugin/tests/scenario/test_fwaas.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import testscenarios + from tempest import config from tempest import test @@ -20,16 +22,29 @@ from neutron_fwaas.tests.tempest_plugin.tests.scenario import base CONF = config.CONF +load_tests = testscenarios.load_tests_apply_scenarios + class TestFWaaS(base.FWaaSScenarioTest): + scenarios = [ + ('without router insersion', { + 'router_insertion': False, + }), + ('with router insersion', { + 'router_insertion': True, + }), + ] - @classmethod - def resource_setup(cls): - super(TestFWaaS, cls).resource_setup() - for ext in ['fwaas', 'security-group', 'router']: + def setUp(self): + super(TestFWaaS, self).setUp() + required_exts = ['fwaas', 'security-group', 'router'] + if self.router_insertion: + required_exts.append('fwaasrouterinsertion') + for ext in required_exts: if not test.is_extension_enabled(ext, 'network'): msg = "%s Extension not enabled." % ext - raise cls.skipException(msg) + raise self.skipException(msg) + self._router_ids = None def _create_server(self, network, security_group=None): keys = self.create_keypair() @@ -43,10 +58,15 @@ class TestFWaaS(base.FWaaSScenarioTest): **kwargs) return server, keys + def _create_firewall(self, **kwargs): + if self._router_ids is not None: + kwargs['router_ids'] = self._router_ids + return self.create_firewall(**kwargs) + def _empty_policy(self, server1_ip): # NOTE(yamamoto): an empty policy would deny all fw_policy = self.create_firewall_policy(firewall_rules=[]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id']) + fw = self._create_firewall(firewall_policy_id=fw_policy['id']) self._wait_firewall_ready(fw['id']) return { 'fw': fw, @@ -57,7 +77,7 @@ class TestFWaaS(base.FWaaSScenarioTest): # NOTE(yamamoto): a policy whose rules are all disabled would deny all fw_rule = self.create_firewall_rule(action="allow", enabled=False) fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id']) + fw = self._create_firewall(firewall_policy_id=fw_policy['id']) self._wait_firewall_ready(fw['id']) return { 'fw': fw, @@ -74,7 +94,7 @@ class TestFWaaS(base.FWaaSScenarioTest): action="allow") fw_policy = self.create_firewall_policy( firewall_rules=[fw_rule['id'], fw_rule_allow['id']]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id']) + fw = self._create_firewall(firewall_policy_id=fw_policy['id']) self._wait_firewall_ready(fw['id']) return { 'fw': fw, @@ -90,7 +110,7 @@ class TestFWaaS(base.FWaaSScenarioTest): action="allow") fw_policy = self.create_firewall_policy( firewall_rules=[fw_rule['id'], fw_rule_allow['id']]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id']) + fw = self._create_firewall(firewall_policy_id=fw_policy['id']) self._wait_firewall_ready(fw['id']) return { 'fw': fw, @@ -105,7 +125,7 @@ class TestFWaaS(base.FWaaSScenarioTest): action="allow") fw_policy = self.create_firewall_policy( firewall_rules=[fw_rule['id'], fw_rule_allow['id']]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id']) + fw = self._create_firewall(firewall_policy_id=fw_policy['id']) self._wait_firewall_ready(fw['id']) return { 'fw': fw, @@ -117,7 +137,7 @@ class TestFWaaS(base.FWaaSScenarioTest): # NOTE(yamamoto): A firewall with admin_state_up=False would block all fw_rule = self.create_firewall_rule(action="allow") fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']]) - fw = self.create_firewall(firewall_policy_id=fw_policy['id'], + fw = self._create_firewall(firewall_policy_id=fw_policy['id'], admin_state_up=False) self._wait_firewall_ready(fw['id']) return { @@ -187,6 +207,17 @@ class TestFWaaS(base.FWaaSScenarioTest): **kwargs) self.check_connectivity(check_ssh=False, **kwargs) + def _create_topology(self): + public_network_id = CONF.network.public_network_id + network, subnet, router = self.create_networks() + security_group = self._create_security_group() + server, keys = self._create_server(network, + security_group=security_group) + private_key = keys['private_key'] + server_floating_ip = self.create_floating_ip(server, public_network_id) + server_ip = server_floating_ip.floating_ip_address + return server_ip, private_key, router + def _test_firewall_basic(self, block, allow=None, confirm_allowed=None, confirm_blocked=None): if allow is None: @@ -196,25 +227,34 @@ class TestFWaaS(base.FWaaSScenarioTest): if confirm_blocked is None: confirm_blocked = self._confirm_blocked ssh_login = CONF.validation.image_ssh_user - public_network_id = CONF.network.public_network_id - - network1, subnet1, router1 = self.create_networks() - security_group = self._create_security_group() - server1, keys1 = self._create_server(network1, - security_group=security_group) - private_key = keys1['private_key'] - server1_floating_ip = self.create_floating_ip(server1, - public_network_id) - server1_ip = server1_floating_ip.floating_ip_address + server1_ip, private_key1, router1 = self._create_topology() + server2_ip, private_key2, router2 = self._create_topology() + if self.router_insertion: + # Specify the router when creating a firewall and ensures that + # the other router (router2) is not affected by the firewall + self._router_ids = [router1['id']] + confirm_allowed2 = self.check_connectivity + confirm_blocked2 = self.check_connectivity + else: + # Without router insertion, all routers should be affected + # equally + confirm_allowed2 = confirm_allowed + confirm_blocked2 = confirm_blocked confirm_allowed(ip_address=server1_ip, username=ssh_login, - private_key=private_key) + private_key=private_key1) + confirm_allowed2(ip_address=server2_ip, username=ssh_login, + private_key=private_key2) ctx = block(server1_ip) confirm_blocked(ip_address=server1_ip, username=ssh_login, - private_key=private_key) + private_key=private_key1) + confirm_blocked2(ip_address=server2_ip, username=ssh_login, + private_key=private_key2) allow(ctx) confirm_allowed(ip_address=server1_ip, username=ssh_login, - private_key=private_key) + private_key=private_key1) + confirm_allowed2(ip_address=server2_ip, username=ssh_login, + private_key=private_key2) @test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8') def test_firewall_block_ip(self):