tempest: Add test cases for router insertion

Related-Bug: #1507522
Change-Id: I7595f61af05d4667dc561be60dda3f3c83957c2c
This commit is contained in:
YAMAMOTO Takashi 2015-12-15 17:59:11 +09:00
parent fa2e13ab7d
commit bdc76969fa

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from tempest import config
from tempest import test
@ -20,16 +22,29 @@ from neutron_fwaas.tests.tempest_plugin.tests.scenario import base
CONF = config.CONF
load_tests = testscenarios.load_tests_apply_scenarios
class TestFWaaS(base.FWaaSScenarioTest):
scenarios = [
('without router insersion', {
'router_insertion': False,
}),
('with router insersion', {
'router_insertion': True,
}),
]
@classmethod
def resource_setup(cls):
super(TestFWaaS, cls).resource_setup()
for ext in ['fwaas', 'security-group', 'router']:
def setUp(self):
super(TestFWaaS, self).setUp()
required_exts = ['fwaas', 'security-group', 'router']
if self.router_insertion:
required_exts.append('fwaasrouterinsertion')
for ext in required_exts:
if not test.is_extension_enabled(ext, 'network'):
msg = "%s Extension not enabled." % ext
raise cls.skipException(msg)
raise self.skipException(msg)
self._router_ids = None
def _create_server(self, network, security_group=None):
keys = self.create_keypair()
@ -43,10 +58,15 @@ class TestFWaaS(base.FWaaSScenarioTest):
**kwargs)
return server, keys
def _create_firewall(self, **kwargs):
if self._router_ids is not None:
kwargs['router_ids'] = self._router_ids
return self.create_firewall(**kwargs)
def _empty_policy(self, server1_ip):
# NOTE(yamamoto): an empty policy would deny all
fw_policy = self.create_firewall_policy(firewall_rules=[])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
@ -57,7 +77,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
# NOTE(yamamoto): a policy whose rules are all disabled would deny all
fw_rule = self.create_firewall_rule(action="allow", enabled=False)
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
@ -74,7 +94,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
@ -90,7 +110,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
@ -105,7 +125,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
@ -117,7 +137,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
# NOTE(yamamoto): A firewall with admin_state_up=False would block all
fw_rule = self.create_firewall_rule(action="allow")
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'],
fw = self._create_firewall(firewall_policy_id=fw_policy['id'],
admin_state_up=False)
self._wait_firewall_ready(fw['id'])
return {
@ -187,6 +207,17 @@ class TestFWaaS(base.FWaaSScenarioTest):
**kwargs)
self.check_connectivity(check_ssh=False, **kwargs)
def _create_topology(self):
public_network_id = CONF.network.public_network_id
network, subnet, router = self.create_networks()
security_group = self._create_security_group()
server, keys = self._create_server(network,
security_group=security_group)
private_key = keys['private_key']
server_floating_ip = self.create_floating_ip(server, public_network_id)
server_ip = server_floating_ip.floating_ip_address
return server_ip, private_key, router
def _test_firewall_basic(self, block, allow=None,
confirm_allowed=None, confirm_blocked=None):
if allow is None:
@ -196,25 +227,34 @@ class TestFWaaS(base.FWaaSScenarioTest):
if confirm_blocked is None:
confirm_blocked = self._confirm_blocked
ssh_login = CONF.validation.image_ssh_user
public_network_id = CONF.network.public_network_id
network1, subnet1, router1 = self.create_networks()
security_group = self._create_security_group()
server1, keys1 = self._create_server(network1,
security_group=security_group)
private_key = keys1['private_key']
server1_floating_ip = self.create_floating_ip(server1,
public_network_id)
server1_ip = server1_floating_ip.floating_ip_address
server1_ip, private_key1, router1 = self._create_topology()
server2_ip, private_key2, router2 = self._create_topology()
if self.router_insertion:
# Specify the router when creating a firewall and ensures that
# the other router (router2) is not affected by the firewall
self._router_ids = [router1['id']]
confirm_allowed2 = self.check_connectivity
confirm_blocked2 = self.check_connectivity
else:
# Without router insertion, all routers should be affected
# equally
confirm_allowed2 = confirm_allowed
confirm_blocked2 = confirm_blocked
confirm_allowed(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
private_key=private_key1)
confirm_allowed2(ip_address=server2_ip, username=ssh_login,
private_key=private_key2)
ctx = block(server1_ip)
confirm_blocked(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
private_key=private_key1)
confirm_blocked2(ip_address=server2_ip, username=ssh_login,
private_key=private_key2)
allow(ctx)
confirm_allowed(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
private_key=private_key1)
confirm_allowed2(ip_address=server2_ip, username=ssh_login,
private_key=private_key2)
@test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')
def test_firewall_block_ip(self):