@ -36,6 +36,10 @@ class TestType(volume_fakes.TestVolume):
self . app . client_manager . volume . volume_type_access )
self . types_access_mock . reset_mock ( )
self . encryption_types_mock = (
self . app . client_manager . volume . volume_encryption_types )
self . encryption_types_mock . reset_mock ( )
self . projects_mock = self . app . client_manager . identity . projects
self . projects_mock . reset_mock ( )
@ -131,6 +135,68 @@ class TestTypeCreate(TestType):
self . cmd . take_action ,
parsed_args )
def test_type_create_with_encryption ( self ) :
encryption_info = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : ' aes-xts-plain64 ' ,
' key_size ' : ' 128 ' ,
' control_location ' : ' front-end ' ,
}
encryption_type = volume_fakes . FakeType . create_one_encryption_type (
attrs = encryption_info
)
self . new_volume_type = volume_fakes . FakeType . create_one_type (
attrs = { ' encryption ' : encryption_info } )
self . types_mock . create . return_value = self . new_volume_type
self . encryption_types_mock . create . return_value = encryption_type
encryption_columns = (
' description ' ,
' encryption ' ,
' id ' ,
' is_public ' ,
' name ' ,
)
encryption_data = (
self . new_volume_type . description ,
utils . format_dict ( encryption_info ) ,
self . new_volume_type . id ,
True ,
self . new_volume_type . name ,
)
arglist = [
' --encryption-provider ' , ' LuksEncryptor ' ,
' --encryption-cipher ' , ' aes-xts-plain64 ' ,
' --encryption-key-size ' , ' 128 ' ,
' --encryption-control-location ' , ' front-end ' ,
self . new_volume_type . name ,
]
verifylist = [
( ' encryption_provider ' , ' LuksEncryptor ' ) ,
( ' encryption_cipher ' , ' aes-xts-plain64 ' ) ,
( ' encryption_key_size ' , 128 ) ,
( ' encryption_control_location ' , ' front-end ' ) ,
( ' name ' , self . new_volume_type . name ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
columns , data = self . cmd . take_action ( parsed_args )
self . types_mock . create . assert_called_with (
self . new_volume_type . name ,
description = None ,
)
body = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : ' aes-xts-plain64 ' ,
' key_size ' : 128 ,
' control_location ' : ' front-end ' ,
}
self . encryption_types_mock . create . assert_called_with (
self . new_volume_type ,
body ,
)
self . assertEqual ( encryption_columns , columns )
self . assertEqual ( encryption_data , data )
class TestTypeDelete ( TestType ) :
@ -305,6 +371,7 @@ class TestTypeList(TestType):
" --default " ,
]
verifylist = [
( " encryption_type " , False ) ,
( " long " , False ) ,
( " private " , False ) ,
( " public " , False ) ,
@ -317,6 +384,47 @@ class TestTypeList(TestType):
self . assertEqual ( self . columns , columns )
self . assertEqual ( self . data_with_default_type , list ( data ) )
def test_type_list_with_encryption ( self ) :
encryption_type = volume_fakes . FakeType . create_one_encryption_type (
attrs = { ' volume_type_id ' : self . volume_types [ 0 ] . id } )
encryption_info = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : None ,
' key_size ' : None ,
' control_location ' : ' front-end ' ,
}
encryption_columns = self . columns + [
" Encryption " ,
]
encryption_data = [ ]
encryption_data . append ( (
self . volume_types [ 0 ] . id ,
self . volume_types [ 0 ] . name ,
self . volume_types [ 0 ] . is_public ,
utils . format_dict ( encryption_info ) ,
) )
encryption_data . append ( (
self . volume_types [ 1 ] . id ,
self . volume_types [ 1 ] . name ,
self . volume_types [ 1 ] . is_public ,
' - ' ,
) )
self . encryption_types_mock . list . return_value = [ encryption_type ]
arglist = [
" --encryption-type " ,
]
verifylist = [
( " encryption_type " , True ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
columns , data = self . cmd . take_action ( parsed_args )
self . encryption_types_mock . list . assert_called_once_with ( )
self . types_mock . list . assert_called_once_with ( is_public = None )
self . assertEqual ( encryption_columns , columns )
self . assertEqual ( encryption_data , list ( data ) )
class TestTypeSet ( TestType ) :
@ -331,6 +439,8 @@ class TestTypeSet(TestType):
# Return a project
self . projects_mock . get . return_value = self . project
self . encryption_types_mock . create . return_value = None
self . encryption_types_mock . update . return_value = None
# Get the command object to test
self . cmd = volume_type . SetVolumeType ( self . app , None )
@ -454,6 +564,107 @@ class TestTypeSet(TestType):
self . project . id ,
)
def test_type_set_new_encryption ( self ) :
self . encryption_types_mock . update . side_effect = (
exceptions . NotFound ( ' NotFound ' ) )
arglist = [
' --encryption-provider ' , ' LuksEncryptor ' ,
' --encryption-cipher ' , ' aes-xts-plain64 ' ,
' --encryption-key-size ' , ' 128 ' ,
' --encryption-control-location ' , ' front-end ' ,
self . volume_type . id ,
]
verifylist = [
( ' encryption_provider ' , ' LuksEncryptor ' ) ,
( ' encryption_cipher ' , ' aes-xts-plain64 ' ) ,
( ' encryption_key_size ' , 128 ) ,
( ' encryption_control_location ' , ' front-end ' ) ,
( ' volume_type ' , self . volume_type . id ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
result = self . cmd . take_action ( parsed_args )
body = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : ' aes-xts-plain64 ' ,
' key_size ' : 128 ,
' control_location ' : ' front-end ' ,
}
self . encryption_types_mock . update . assert_called_with (
self . volume_type ,
body ,
)
self . encryption_types_mock . create . assert_called_with (
self . volume_type ,
body ,
)
self . assertIsNone ( result )
@mock.patch.object ( utils , ' find_resource ' )
def test_type_set_existing_encryption ( self , mock_find ) :
mock_find . side_effect = [ self . volume_type ,
" existing_encryption_type " ]
arglist = [
' --encryption-provider ' , ' LuksEncryptor ' ,
' --encryption-cipher ' , ' aes-xts-plain64 ' ,
' --encryption-control-location ' , ' front-end ' ,
self . volume_type . id ,
]
verifylist = [
( ' encryption_provider ' , ' LuksEncryptor ' ) ,
( ' encryption_cipher ' , ' aes-xts-plain64 ' ) ,
( ' encryption_control_location ' , ' front-end ' ) ,
( ' volume_type ' , self . volume_type . id ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
result = self . cmd . take_action ( parsed_args )
body = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : ' aes-xts-plain64 ' ,
' control_location ' : ' front-end ' ,
}
self . encryption_types_mock . update . assert_called_with (
self . volume_type ,
body ,
)
self . encryption_types_mock . create . assert_not_called ( )
self . assertIsNone ( result )
def test_type_set_new_encryption_without_provider ( self ) :
self . encryption_types_mock . update . side_effect = (
exceptions . NotFound ( ' NotFound ' ) )
arglist = [
' --encryption-cipher ' , ' aes-xts-plain64 ' ,
' --encryption-key-size ' , ' 128 ' ,
' --encryption-control-location ' , ' front-end ' ,
self . volume_type . id ,
]
verifylist = [
( ' encryption_cipher ' , ' aes-xts-plain64 ' ) ,
( ' encryption_key_size ' , 128 ) ,
( ' encryption_control_location ' , ' front-end ' ) ,
( ' volume_type ' , self . volume_type . id ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
try :
self . cmd . take_action ( parsed_args )
self . fail ( ' CommandError should be raised. ' )
except exceptions . CommandError as e :
self . assertEqual ( " Command Failed: One or more of "
" the operations failed " ,
str ( e ) )
body = {
' cipher ' : ' aes-xts-plain64 ' ,
' key_size ' : 128 ,
' control_location ' : ' front-end ' ,
}
self . encryption_types_mock . update . assert_called_with (
self . volume_type ,
body ,
)
self . encryption_types_mock . create . assert_not_called ( )
class TestTypeShow ( TestType ) :
@ -489,6 +700,7 @@ class TestTypeShow(TestType):
self . volume_type . id
]
verifylist = [
( " encryption_type " , False ) ,
( " volume_type " , self . volume_type . id )
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
@ -564,6 +776,52 @@ class TestTypeShow(TestType):
)
self . assertEqual ( private_type_data , data )
def test_type_show_with_encryption ( self ) :
encryption_type = volume_fakes . FakeType . create_one_encryption_type ( )
encryption_info = {
' provider ' : ' LuksEncryptor ' ,
' cipher ' : None ,
' key_size ' : None ,
' control_location ' : ' front-end ' ,
}
self . volume_type = volume_fakes . FakeType . create_one_type (
attrs = { ' encryption ' : encryption_info } )
self . types_mock . get . return_value = self . volume_type
self . encryption_types_mock . get . return_value = encryption_type
encryption_columns = (
' access_project_ids ' ,
' description ' ,
' encryption ' ,
' id ' ,
' is_public ' ,
' name ' ,
' properties ' ,
)
encryption_data = (
None ,
self . volume_type . description ,
utils . format_dict ( encryption_info ) ,
self . volume_type . id ,
True ,
self . volume_type . name ,
utils . format_dict ( self . volume_type . extra_specs )
)
arglist = [
' --encryption-type ' ,
self . volume_type . id
]
verifylist = [
( ' encryption_type ' , True ) ,
( " volume_type " , self . volume_type . id )
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
columns , data = self . cmd . take_action ( parsed_args )
self . types_mock . get . assert_called_with ( self . volume_type . id )
self . encryption_types_mock . get . assert_called_with ( self . volume_type . id )
self . assertEqual ( encryption_columns , columns )
self . assertEqual ( encryption_data , data )
class TestTypeUnset ( TestType ) :
@ -625,6 +883,7 @@ class TestTypeUnset(TestType):
self . volume_type . id ,
]
verifylist = [
( ' encryption_type ' , False ) ,
( ' project ' , ' ' ) ,
( ' volume_type ' , self . volume_type . id ) ,
]
@ -633,7 +892,7 @@ class TestTypeUnset(TestType):
result = self . cmd . take_action ( parsed_args )
self . assertIsNone ( result )
self . encryption_types_mock . delete . assert_not_called ( )
self . assertFalse ( self . types_access_mock . remove_project_access . called )
def test_type_unset_failed_with_missing_volume_type_argument ( self ) :
@ -649,3 +908,18 @@ class TestTypeUnset(TestType):
self . cmd ,
arglist ,
verifylist )
def test_type_unset_encryption_type ( self ) :
arglist = [
' --encryption-type ' ,
self . volume_type . id ,
]
verifylist = [
( ' encryption_type ' , True ) ,
( ' volume_type ' , self . volume_type . id ) ,
]
parsed_args = self . check_parser ( self . cmd , arglist , verifylist )
result = self . cmd . take_action ( parsed_args )
self . encryption_types_mock . delete . assert_called_with ( self . volume_type )
self . assertIsNone ( result )