@ -17,6 +17,8 @@ import contextlib
import copy
from unittest import mock
from neutron_lib . api . definitions import security_groups_remote_address_group \
as sgag_def
from neutron_lib . api import validators
from neutron_lib import constants as const
from neutron_lib import context
@ -29,12 +31,15 @@ import oslo_db.exception as exc
import testtools
import webob . exc
from neutron . db import address_group_db
from neutron . db import db_base_plugin_v2
from neutron . db import securitygroups_db
from neutron . extensions import address_group as ext_ag
from neutron . extensions import securitygroup as ext_sg
from neutron . extensions import standardattrdescription
from neutron . tests import base
from neutron . tests . unit . db import test_db_base_plugin_v2
from neutron . tests . unit . extensions import test_address_group
DB_PLUGIN_KLASS = ( ' neutron.tests.unit.extensions.test_securitygroup. '
' SecurityGroupTestPlugin ' )
@ -60,7 +65,11 @@ class SecurityGroupTestExtensionManager(object):
ext_sg . RESOURCE_ATTRIBUTE_MAP [ ext_sg . SECURITYGROUPS ] )
sg_attr_desc = ext_res [ ext_sg . SECURITYGROUPS ]
existing_sg_attr_map . update ( sg_attr_desc )
return ext_sg . Securitygroup . get_resources ( )
# update with the remote address group api definition
ext_sg . Securitygroup ( ) . update_attributes_map (
sgag_def . RESOURCE_ATTRIBUTE_MAP )
return ( ext_sg . Securitygroup . get_resources ( ) +
ext_ag . Address_group ( ) . get_resources ( ) )
def get_actions ( self ) :
return [ ]
@ -97,6 +106,7 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self , security_group_id , direction , proto ,
port_range_min = None , port_range_max = None ,
remote_ip_prefix = None , remote_group_id = None ,
remote_address_group_id = None ,
tenant_id = test_db_base_plugin_v2 . TEST_TENANT_ID ,
ethertype = const . IPv4 ) :
@ -117,6 +127,10 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
if remote_group_id :
data [ ' security_group_rule ' ] [ ' remote_group_id ' ] = remote_group_id
if remote_address_group_id :
data [ ' security_group_rule ' ] [ ' remote_address_group_id ' ] = \
remote_address_group_id
return data
def _create_security_group_rule ( self , fmt , rules , * * kwargs ) :
@ -160,6 +174,7 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
direction = ' ingress ' , protocol = const . PROTO_NAME_TCP ,
port_range_min = ' 22 ' , port_range_max = ' 22 ' ,
remote_ip_prefix = None , remote_group_id = None ,
remote_address_group_id = None ,
fmt = None , ethertype = const . IPv4 ) :
if not fmt :
fmt = self . fmt
@ -169,6 +184,7 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
port_range_max ,
remote_ip_prefix ,
remote_group_id ,
remote_address_group_id ,
ethertype = ethertype )
security_group_rule = self . _make_security_group_rule ( self . fmt , rule )
yield security_group_rule
@ -194,7 +210,8 @@ class SecurityGroupsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
class SecurityGroupTestPlugin ( db_base_plugin_v2 . NeutronDbPluginV2 ,
securitygroups_db . SecurityGroupDbMixin ) :
securitygroups_db . SecurityGroupDbMixin ,
address_group_db . AddressGroupDbMixin ) :
""" Test plugin that implements necessary calls on create/delete port for
associating ports with security groups .
"""
@ -248,7 +265,8 @@ class SecurityGroupTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return neutron_lports
class SecurityGroupDBTestCase ( SecurityGroupsTestCase ) :
class SecurityGroupDBTestCase ( SecurityGroupsTestCase ,
test_address_group . AddressGroupTestCase ) :
def setUp ( self , plugin = None , ext_mgr = None ) :
self . _backup = copy . deepcopy ( ext_sg . RESOURCE_ATTRIBUTE_MAP )
self . addCleanup ( self . _restore )
@ -659,10 +677,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
port_range_min = 22
port_range_max = 22
ethertype = ' ipV4 '
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ,
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix ,
ethertype = ethertype ) as rule :
# the lower case value will be return
@ -689,10 +709,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ) :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix ) :
group = self . deserialize (
self . fmt , res . get_response ( self . ext_api ) )
@ -951,10 +973,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ) as rule :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
@ -975,14 +1000,76 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_group_id = remote_group_id
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
def test_create_security_group_rule_remote_address_group_id ( self ) :
name = ' webservers '
description = ' my webservers '
ag = self . _test_create_address_group ( name = ' foo ' )
ag_id = ag [ ' address_group ' ] [ ' id ' ]
with self . security_group ( name , description ) as sg :
security_group_id = sg [ ' security_group ' ] [ ' id ' ]
direction = " ingress "
remote_address_group_id = ag_id
protocol = const . PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [ ( ' remote_address_group_id ' , remote_address_group_id ) ,
( ' security_group_id ' , security_group_id ) ,
( ' direction ' , direction ) ,
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_address_group_id = (
remote_address_group_id )
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
def test_delete_address_group_in_use ( self ) :
ag = self . _test_create_address_group ( name = ' foo ' )
ag_id = ag [ ' address_group ' ] [ ' id ' ]
with self . security_group ( ) as sg :
security_group_id = sg [ ' security_group ' ] [ ' id ' ]
with self . security_group_rule ( security_group_id ,
remote_address_group_id = ag_id ) :
self . _delete ( ' address-groups ' , ag [ ' address_group ' ] [ ' id ' ] ,
expected_code = webob . exc . HTTPConflict . code )
def test_create_security_group_rule_multiple_remotes ( self ) :
name = ' webservers '
description = ' my webservers '
ag = self . _test_create_address_group ( name = ' foo ' )
ag_id = ag [ ' address_group ' ] [ ' id ' ]
with self . security_group ( name , description ) as sg :
sg_id = sg [ ' security_group ' ] [ ' id ' ]
for remote in [
{ ' remote_ip_prefix ' : ' 10.0.0.0/8 ' , ' remote_group_id ' : sg_id } ,
{ ' remote_group_id ' : sg_id , ' remote_address_group_id ' : ag_id } ,
{ ' remote_ip_prefix ' : ' 10.0.0.0/8 ' ,
' remote_address_group_id ' : ag_id } ,
{ ' remote_ip_prefix ' : ' 10.0.0.0/8 ' , ' remote_group_id ' : sg_id ,
' remote_address_group_id ' : ag_id }
] :
rule = self . _build_security_group_rule ( sg_id , " ingress " ,
const . PROTO_NAME_TCP ,
* * remote )
res = self . _create_security_group_rule ( self . fmt , rule )
self . assertEqual ( webob . exc . HTTPBadRequest . code , res . status_int )
def test_create_security_group_rule_port_range_min_max_limits ( self ) :
name = ' webservers '
description = ' my webservers '
@ -998,9 +1085,12 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , None ) ,
( ' port_range_max ' , None ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ) as rule :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
@ -1023,10 +1113,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ) as rule :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
@ -1048,10 +1141,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ) as rule :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix
) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
@ -1075,12 +1171,13 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' protocol ' , protocol ) ,
( ' port_range_min ' , port_range_min ) ,
( ' port_range_max ' , port_range_max ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_ip_prefix ,
None , None ,
ethertype ) as rule :
with self . security_group_rule ( security_group_id ,
direction = direction ,
protocol = protocol ,
port_range_min = port_range_min ,
port_range_max = port_range_max ,
remote_ip_prefix = remote_ip_prefix ,
ethertype = ethertype ) as rule :
for k , v , in keys :
self . assertEqual ( v , rule [ ' security_group_rule ' ] [ k ] )
@ -1097,11 +1194,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' direction ' , direction ) ,
( ' ethertype ' , ethertype ) ,
( ' protocol ' , protocol ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol, None , None ,
remote_ip_prefix ,
None , None ,
ethertype ) as rule :
with self . security_group_rule ( security_group_id ,
direction= direction ,
protocol= protocol ,
remote_ip_prefix = remote_ip_prefix ,
ethertype = ethertype ) as rule :
for k , v , in keys :
# IPv6 ICMP protocol will always be 'ipv6-icmp'
if k == ' protocol ' :
@ -1129,11 +1226,11 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
( ' direction ' , direction ) ,
( ' ethertype ' , ethertype ) ,
( ' protocol ' , protocol ) ]
with self . security_group_rule ( security_group_id , direction ,
protocol, None , None ,
remote_ip_prefix ,
None , None ,
ethertype ) as rule :
with self . security_group_rule ( security_group_id ,
direction= direction ,
protocol= protocol ,
remote_ip_prefix = remote_ip_prefix ,
ethertype = ethertype ) as rule :
for k , v , in keys :
# IPv6 ICMP protocol will always be '58'
if k == ' protocol ' :
@ -1255,6 +1352,26 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self . deserialize ( self . fmt , res )
self . assertEqual ( webob . exc . HTTPNotFound . code , res . status_int )
def test_create_security_group_rule_bad_remote_address_group_id ( self ) :
name = ' webservers '
description = ' my webservers '
with self . security_group ( name , description ) as sg :
security_group_id = sg [ ' security_group ' ] [ ' id ' ]
remote_address_group_id = " 4cd70774-cc67-4a87-9b39-7d1db38eb087 "
direction = " ingress "
protocol = const . PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
rule = self . _build_security_group_rule ( security_group_id , direction ,
protocol , port_range_min ,
port_range_max ,
remote_address_group_id = (
remote_address_group_id
) )
res = self . _create_security_group_rule ( self . fmt , rule )
self . deserialize ( self . fmt , res )
self . assertEqual ( webob . exc . HTTPNotFound . code , res . status_int )
def test_create_security_group_rule_duplicate_rules ( self ) :
name = ' webservers '
description = ' my webservers '
@ -1961,7 +2078,9 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
' port_range_min ' : None ,
' port_range_max ' : None ,
' remote_ip_prefix ' : None ,
' remote_group_id ' : None } )
' remote_group_id ' : None ,
' remote_address_group_id ' :
None } )
result = self . plugin . create_security_group_rule (
neutron_context , rule )
self . assertEqual ( specified_id , result [ ' id ' ] )