@ -395,24 +395,22 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
self . check_remote_connectivity ( source = access_point_ssh ,
dest = self . _get_server_ip ( server ) )
def _test_cross_tenant_block ( self , source_tenant , dest_tenant ):
def _test_cross_tenant_block ( self , source_tenant , dest_tenant , ruleset ):
# if public router isn't defined, then dest_tenant access is via
# floating-ip
protocol = ruleset [ ' protocol ' ]
access_point_ssh = self . _connect_to_access_point ( source_tenant )
ip = self . _get_server_ip ( dest_tenant . access_point ,
floating = self . floating_ip_access )
self . check_remote_connectivity ( source = access_point_ssh , dest = ip ,
should_succeed = False )
should_succeed = False , protocol = protocol )
def _test_cross_tenant_allow ( self , source_tenant , dest_tenant ):
def _test_cross_tenant_allow ( self , source_tenant , dest_tenant , ruleset ):
""" check for each direction:
creating rule for tenant incoming traffic enables only 1 way traffic
"""
ruleset = dict (
protocol = ' icmp ' ,
direction = ' ingress '
)
protocol = ruleset [ ' protocol ' ]
sec_group_rules_client = (
dest_tenant . manager . security_group_rules_client )
self . _create_security_group_rule (
@ -423,10 +421,10 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
access_point_ssh = self . _connect_to_access_point ( source_tenant )
ip = self . _get_server_ip ( dest_tenant . access_point ,
floating = self . floating_ip_access )
self . check_remote_connectivity ( access_point_ssh , ip )
self . check_remote_connectivity ( access_point_ssh , ip , protocol = protocol )
# test that reverse traffic is still blocked
self . _test_cross_tenant_block ( dest_tenant , source_tenant )
self . _test_cross_tenant_block ( dest_tenant , source_tenant , ruleset )
# allow reverse traffic and check
sec_group_rules_client = (
@ -440,7 +438,8 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
access_point_ssh_2 = self . _connect_to_access_point ( dest_tenant )
ip = self . _get_server_ip ( source_tenant . access_point ,
floating = self . floating_ip_access )
self . check_remote_connectivity ( access_point_ssh_2 , ip )
self . check_remote_connectivity ( access_point_ssh_2 , ip ,
protocol = protocol )
def _verify_mac_addr ( self , tenant ) :
""" Verify that VM has the same ip, mac as listed in port """
@ -470,6 +469,17 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
self . _log_console_output (
servers = [ tenant . access_point ] , client = client )
def _create_protocol_ruleset ( self , protocol , port = 80 ) :
if protocol == ' icmp ' :
ruleset = dict ( protocol = ' icmp ' ,
direction = ' ingress ' )
else :
ruleset = dict ( protocol = protocol ,
port_range_min = port ,
port_range_max = port ,
direction = ' ingress ' )
return ruleset
@decorators.idempotent_id ( ' e79f879e-debb-440c-a7e4-efeda05b6848 ' )
@utils.services ( ' compute ' , ' network ' )
def test_cross_tenant_traffic ( self ) :
@ -484,8 +494,18 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
# cross tenant check
source_tenant = self . primary_tenant
dest_tenant = self . alt_tenant
self . _test_cross_tenant_block ( source_tenant , dest_tenant )
self . _test_cross_tenant_allow ( source_tenant , dest_tenant )
protocol = CONF . scenario . protocol
LOG . debug ( " Testing cross tenant traffic for %s protocol " ,
protocol )
if protocol in [ ' udp ' , ' tcp ' ] :
for tenant in [ source_tenant , dest_tenant ] :
access_point = self . _connect_to_access_point ( tenant )
access_point . nc_listen_host ( protocol = protocol )
ruleset = self . _create_protocol_ruleset ( protocol )
self . _test_cross_tenant_block ( source_tenant , dest_tenant , ruleset )
self . _test_cross_tenant_allow ( source_tenant , dest_tenant , ruleset )
except Exception :
self . _log_console_output_for_all_tenants ( )
raise