@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
import uuid
@ -19,118 +20,220 @@ from openstackclient.tests.functional.volume.v2 import common
class VolumeTests ( common . BaseVolumeTests ) :
""" Functional tests for volume. """
NAME = uuid . uuid4 ( ) . hex
SNAPSHOT_NAME = uuid . uuid4 ( ) . hex
VOLUME_FROM_SNAPSHOT_NAME = uuid . uuid4 ( ) . hex
OTHER_NAME = uuid . uuid4 ( ) . hex
HEADERS = [ ' " Display Name " ' ]
FIELDS = [ ' name ' ]
@classmethod
def setUpClass ( cls ) :
super ( VolumeTests , cls ) . setUpClass ( )
opts = cls . get_opts ( cls . FIELDS )
# Create test volume
raw_output = cls . openstack ( ' volume create --size 1 ' + cls . NAME + opts )
expected = cls . NAME + ' \n '
cls . assertOutput ( expected , raw_output )
@classmethod
def tearDownClass ( cls ) :
# Rename test volume
raw_output = cls . openstack (
' volume set --name ' + cls . OTHER_NAME + ' ' + cls . NAME )
cls . assertOutput ( ' ' , raw_output )
# Set volume state
cls . openstack ( ' volume set --state error ' + cls . OTHER_NAME )
opts = cls . get_opts ( [ " status " ] )
raw_output_status = cls . openstack (
' volume show ' + cls . OTHER_NAME + opts )
# Delete test volume
raw_output = cls . openstack ( ' volume delete ' + cls . OTHER_NAME )
cls . assertOutput ( ' ' , raw_output )
cls . assertOutput ( ' error \n ' , raw_output_status )
def test_volume_delete ( self ) :
""" Test create, delete multiple """
name1 = uuid . uuid4 ( ) . hex
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 1 ' +
name1
) )
self . assertEqual (
1 ,
cmd_output [ " size " ] ,
)
name2 = uuid . uuid4 ( ) . hex
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 2 ' +
name2
) )
self . assertEqual (
2 ,
cmd_output [ " size " ] ,
)
self . wait_for ( " volume " , name1 , " available " )
self . wait_for ( " volume " , name2 , " available " )
del_output = self . openstack ( ' volume delete ' + name1 + ' ' + name2 )
self . assertOutput ( ' ' , del_output )
def test_volume_list ( self ) :
opts = self . get_opts ( self . HEADERS )
raw_output = self . openstack ( ' volume list ' + opts )
self . assertIn ( self . NAME , raw_output )
def test_volume_show ( self ) :
opts = self . get_opts ( self . FIELDS )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( self . NAME + " \n " , raw_output )
def test_volume_properties ( self ) :
""" Test create, list filter """
name1 = uuid . uuid4 ( ) . hex
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 1 ' +
name1
) )
self . addCleanup ( self . openstack , ' volume delete ' + name1 )
self . assertEqual (
1 ,
cmd_output [ " size " ] ,
)
self . wait_for ( " volume " , name1 , " available " )
name2 = uuid . uuid4 ( ) . hex
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 2 ' +
name2
) )
self . addCleanup ( self . openstack , ' volume delete ' + name2 )
self . assertEqual (
2 ,
cmd_output [ " size " ] ,
)
self . wait_for ( " volume " , name2 , " available " )
raw_output = self . openstack (
' volume set --property a=b --property c=d ' + self . NAME )
self . assertEqual ( " " , raw_output )
opts = self . get_opts ( [ " properties " ] )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( " a= ' b ' , c= ' d ' \n " , raw_output )
' volume set ' +
' --state error ' +
name2
)
self . assert Output( ' ' , raw_output )
raw_output = self . openstack ( ' volume unset --property a ' + self . NAME )
self . assertEqual ( " " , raw_output )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( " c= ' d ' \n " , raw_output )
# Test list --long
cmd_output = json . loads ( self . openstack (
' volume list -f json ' +
' --long '
) )
names = [ x [ " Display Name " ] for x in cmd_output ]
self . assertIn ( name1 , names )
self . assertIn ( name2 , names )
# Test list --status
cmd_output = json . loads ( self . openstack (
' volume list -f json ' +
' --status error '
) )
names = [ x [ " Display Name " ] for x in cmd_output ]
self . assertNotIn ( name1 , names )
self . assertIn ( name2 , names )
# TODO(qiangjiahui): Add project option to filter tests when we can
# specify volume with project
def test_volume_set ( self ) :
discription = uuid . uuid4 ( ) . hex
self . openstack ( ' volume set --description ' + discription + ' ' +
self . NAME )
opts = self . get_opts ( [ " description " , " name " ] )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( discription + " \n " + self . NAME + " \n " , raw_output )
def test_volume_set_size ( self ) :
self . openstack ( ' volume set --size 2 ' + self . NAME )
opts = self . get_opts ( [ " name " , " size " ] )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( self . NAME + " \n 2 \n " , raw_output )
def test_volume_set_bootable ( self ) :
self . openstack ( ' volume set --bootable ' + self . NAME )
opts = self . get_opts ( [ " bootable " ] )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( " true \n " , raw_output )
self . openstack ( ' volume set --non-bootable ' + self . NAME )
opts = self . get_opts ( [ " bootable " ] )
raw_output = self . openstack ( ' volume show ' + self . NAME + opts )
self . assertEqual ( " false \n " , raw_output )
""" Tests create volume, set, unset, show, delete """
name = uuid . uuid4 ( ) . hex
new_name = name + " _ "
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 1 ' +
' --description aaaa ' +
' --property Alpha=a ' +
name
) )
self . addCleanup ( self . openstack , ' volume delete ' + new_name )
self . assertEqual (
name ,
cmd_output [ " name " ] ,
)
self . assertEqual (
1 ,
cmd_output [ " size " ] ,
)
self . assertEqual (
' aaaa ' ,
cmd_output [ " description " ] ,
)
self . assertEqual (
" Alpha= ' a ' " ,
cmd_output [ " properties " ] ,
)
self . assertEqual (
' false ' ,
cmd_output [ " bootable " ] ,
)
self . wait_for ( " volume " , name , " available " )
# Test volume set
raw_output = self . openstack (
' volume set ' +
' --name ' + new_name +
' --size 2 ' +
' --description bbbb ' +
' --property Alpha=c ' +
' --property Beta=b ' +
' --bootable ' +
name ,
)
self . assertOutput ( ' ' , raw_output )
def test_volume_snapshot ( self ) :
opts = self . get_opts ( self . FIELDS )
# Create snapshot from test volume
raw_output = self . openstack ( ' volume snapshot create ' +
self . SNAPSHOT_NAME +
' --volume ' + self . NAME + opts )
expected = self . SNAPSHOT_NAME + ' \n '
self . assertOutput ( expected , raw_output )
self . wait_for ( " volume snapshot " , self . SNAPSHOT_NAME , " available " )
# Create volume from snapshot
raw_output = self . openstack ( ' volume create --size 2 --snapshot ' +
self . SNAPSHOT_NAME + ' ' +
self . VOLUME_FROM_SNAPSHOT_NAME + opts )
expected = self . VOLUME_FROM_SNAPSHOT_NAME + ' \n '
self . assertOutput ( expected , raw_output )
self . wait_for ( " volume " , self . VOLUME_FROM_SNAPSHOT_NAME , " available " )
# Delete volume that create from snapshot
raw_output = self . openstack ( ' volume delete ' +
self . VOLUME_FROM_SNAPSHOT_NAME )
cmd_output = json . loads ( self . openstack (
' volume show -f json ' +
new_name
) )
self . assertEqual (
new_name ,
cmd_output [ " name " ] ,
)
self . assertEqual (
2 ,
cmd_output [ " size " ] ,
)
self . assertEqual (
' bbbb ' ,
cmd_output [ " description " ] ,
)
self . assertEqual (
" Alpha= ' c ' , Beta= ' b ' " ,
cmd_output [ " properties " ] ,
)
self . assertEqual (
' true ' ,
cmd_output [ " bootable " ] ,
)
# Test volume unset
raw_output = self . openstack (
' volume unset ' +
' --property Alpha ' +
new_name ,
)
self . assertOutput ( ' ' , raw_output )
# Delete test snapshot
cmd_output = json . loads ( self . openstack (
' volume show -f json ' +
new_name
) )
self . assertEqual (
" Beta= ' b ' " ,
cmd_output [ " properties " ] ,
)
def test_volume_snapshot ( self ) :
""" Tests volume create from snapshot """
volume_name = uuid . uuid4 ( ) . hex
snapshot_name = uuid . uuid4 ( ) . hex
# Make a snapshot
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --size 1 ' +
volume_name
) )
self . wait_for ( " volume " , volume_name , " available " )
self . assertEqual (
volume_name ,
cmd_output [ " name " ] ,
)
cmd_output = json . loads ( self . openstack (
' volume snapshot create -f json ' +
snapshot_name +
' --volume ' + volume_name
) )
self . wait_for ( " volume snapshot " , snapshot_name , " available " )
name = uuid . uuid4 ( ) . hex
cmd_output = json . loads ( self . openstack (
' volume create -f json ' +
' --snapshot ' + snapshot_name +
' ' + name
) )
self . addCleanup ( self . openstack , ' volume delete ' + name )
self . addCleanup ( self . openstack , ' volume delete ' + volume_name )
self . assertEqual (
name ,
cmd_output [ " name " ] ,
)
self . wait_for ( " volume " , name , " available " )
# Delete snapshot
raw_output = self . openstack (
' volume snapshot delete ' + self . SNAPSHOT_NAME )
' volume snapshot delete ' + snapshot_name )
self . assertOutput ( ' ' , raw_output )
self . wait_for ( " volume " , self . NAME , " available " )
def wait_for ( self , check_type , check_name , desired_status , wait = 120 ,
interval = 5 , failures = [ ' ERROR ' ] ) :