manila-tempest-plugin/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
lkuchlan b5d6f2997f Add nfs_versions option for NFS protocol version selection
This commit introduces a new configuration option, `nfs_versions`,
to allow specifying the NFS protocol version used when mounting NFS
shares in Tempest tests. The option supports selecting specific
versions, such as NFSv3 or NFSv4, enabling compatibility with
environments where certain NFS versions are required.

- Option Name: `nfs_versions`
- Default: ["4", ] (NFSv4 is used by default)

This enhancement increases flexibility in testing scenarios with
NFS version dependencies.

Change-Id: I048d4e954329d2d4f28604a8bf02c615703ac43c
2024-11-25 22:18:02 +02:00

502 lines
21 KiB
Python

# Copyright 2015 Deutsche Telekom AG
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests.tests.scenario import manager_share as manager
from manila_tempest_tests import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
@ddt.ddt
class ShareBasicOpsBase(manager.ShareScenarioTest):
"""This smoke test case follows this basic set of operations:
* Create share network
* Create share
* Launch an instance
* Allow access
* Perform ssh to instance
* Mount share
* Terminate the instance
"""
@decorators.idempotent_id('825be71c-cf14-4884-a0ad-cf47d511df9a')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_mount_share_one_vm(self):
instance = self.boot_instance(wait_until="BUILD")
share = self.create_share()
locations = self.get_user_export_locations(share)
instance = self.wait_for_active_instance(instance["id"])
remote_client = self.init_remote_client(instance)
self.allow_access(share=share, instance=instance,
remote_client=remote_client, locations=locations)
for location in locations:
self.mount_share(location, remote_client)
self.unmount_share(remote_client)
@decorators.idempotent_id('7cc61131-90e1-42fb-9f07-d3786efb338f')
@tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
def test_write_with_ro_access(self):
'''Test if an instance with ro access can write on the share.'''
test_data = "Some test data to write"
instance = self.boot_instance(wait_until="BUILD")
share = self.create_share()
location = self.get_user_export_locations(share)[0]
instance = self.wait_for_active_instance(instance["id"])
remote_client_inst = self.init_remote_client(instance)
# First, check if write works RW access.
acc_rule_id = self.allow_access(
share=share, instance=instance, remote_client=remote_client_inst,
locations=location)['id']
self.mount_share(location, remote_client_inst)
self.write_data_to_mounted_share(test_data, remote_client_inst)
self.deny_access(share['id'], acc_rule_id)
self.allow_access(share=share, instance=instance,
remote_client=remote_client_inst, locations=location,
access_level='ro')
self.addCleanup(self.unmount_share, remote_client_inst)
# Test if write with RO access fails.
self.assertRaises(exceptions.SSHExecCommandFailed,
self.write_data_to_mounted_share,
test_data, remote_client_inst)
@decorators.idempotent_id('5e184576-c7d1-4c16-9b7c-bc9bcd65ba58')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms(self):
"""Boots two vms and writes/reads data on it."""
test_data = "Some test data to write"
# Boot two VMs and create share
instance1 = self.boot_instance(wait_until="BUILD")
instance2 = self.boot_instance(wait_until="BUILD")
share = self.create_share()
location = self.get_user_export_locations(share)[0]
instance1 = self.wait_for_active_instance(instance1["id"])
instance2 = self.wait_for_active_instance(instance2["id"])
# Write data to first VM
remote_client_inst1 = self.init_remote_client(instance1)
access = self.allow_access(share=share, instance=instance1,
remote_client=remote_client_inst1,
locations=location)
self.mount_share(location, remote_client_inst1)
self.addCleanup(self.unmount_share,
remote_client_inst1)
self.write_data_to_mounted_share(test_data, remote_client_inst1)
# Read from second VM
remote_client_inst2 = self.init_remote_client(instance2)
if not CONF.share.override_ip_for_nfs_access or self.ipv6_enabled:
self.allow_access(share=share, instance=instance2,
remote_client=remote_client_inst2,
locations=location,
access_rule=access)
self.mount_share(location, remote_client_inst2)
self.addCleanup(self.unmount_share,
remote_client_inst2)
data = self.read_data_from_mounted_share(remote_client_inst2)
self.assertEqual(test_data, data)
@decorators.idempotent_id('15d42949-545e-4ad8-b06e-bb2556c54375')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@utils.skip_if_microversion_not_supported("2.29")
@testtools.skipUnless(CONF.share.run_host_assisted_migration_tests or
CONF.share.run_driver_assisted_migration_tests,
"Share migration tests are disabled.")
@ddt.data(True, False)
def test_migration_files(self, force_host_assisted):
if (force_host_assisted and
not CONF.share.run_host_assisted_migration_tests):
raise self.skipException("Host-assisted migration tests are "
"disabled.")
elif (not force_host_assisted and
not CONF.share.run_driver_assisted_migration_tests):
raise self.skipException("Driver-assisted migration tests are "
"disabled.")
if self.protocol != "nfs":
raise self.skipException("Only NFS protocol supported "
"at this moment.")
if self.ipv6_enabled:
raise self.skipException("Share Migration using IPv6 is not "
"supported at this moment.")
pools = self.shares_admin_v2_client.list_pools(detail=True)['pools']
if len(pools) < 2:
raise self.skipException("At least two different pool entries are "
"needed to run share migration tests.")
instance = self.boot_instance(wait_until="BUILD")
share = self.create_share()
export_location = self.get_user_export_locations(share)[0]
instance = self.wait_for_active_instance(instance["id"])
share = self.shares_admin_v2_client.get_share(share['id'])['share']
default_type = self.shares_v2_client.list_share_types(
default=True)['share_type']
dest_pool = utils.choose_matching_backend(share, pools, default_type)
self.assertIsNotNone(dest_pool)
self.assertIsNotNone(dest_pool.get('name'))
dest_pool = dest_pool['name']
remote_client = self.init_remote_client(instance)
self.allow_access(share=share,
instance=instance,
remote_client=remote_client,
locations=export_location)
self.mount_share(export_location, remote_client)
remote_client.exec_command("sudo mkdir -p /mnt/f1")
remote_client.exec_command("sudo mkdir -p /mnt/f2")
remote_client.exec_command("sudo mkdir -p /mnt/f3")
remote_client.exec_command("sudo mkdir -p /mnt/f4")
remote_client.exec_command("sudo mkdir -p /mnt/f1/ff1")
remote_client.exec_command("sleep 1")
remote_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f1/1m1.bin bs=1M count=1")
remote_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f2/1m2.bin bs=1M count=1")
remote_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f3/1m3.bin bs=1M count=1")
remote_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f4/1m4.bin bs=1M count=1")
remote_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f1/ff1/1m5.bin bs=1M count=1")
remote_client.exec_command("sudo chmod -R 555 /mnt/f3")
remote_client.exec_command("sudo chmod -R 777 /mnt/f4")
task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
if force_host_assisted
else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
share = self.migrate_share(
share['id'], dest_pool, task_state, force_host_assisted)
if force_host_assisted:
self.assertRaises(
exceptions.SSHExecCommandFailed,
remote_client.exec_command,
"dd if=/dev/zero of=/mnt/f1/1m6.bin bs=1M count=1")
self.unmount_share(remote_client)
share = self.migration_complete(share['id'], dest_pool)
new_exports = self.get_user_export_locations(share)
self.assertEqual(dest_pool, share['host'])
self.assertEqual(constants.TASK_STATE_MIGRATION_SUCCESS,
share['task_state'])
self.mount_share(new_exports[0], remote_client)
output = remote_client.exec_command("ls -lRA --ignore=lost+found /mnt")
self.unmount_share(remote_client)
self.assertIn('1m1.bin', output)
self.assertIn('1m2.bin', output)
self.assertIn('1m3.bin', output)
self.assertIn('1m4.bin', output)
self.assertIn('1m5.bin', output)
@decorators.idempotent_id('87b803bf-679a-492b-a538-af4c9ff013c8')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipUnless(
CONF.share.capability_create_share_from_snapshot_support,
"Create share from snapshot tests are disabled.")
def test_write_data_to_share_created_from_snapshot(self):
# 1 - Create UVM, ok, created
instance = self.boot_instance(wait_until="BUILD")
# 2 - Create share S1, ok, created
extra_specs = {'snapshot_support': True}
parent_share = self.create_share(extra_specs=extra_specs)
parent_share_export_location = self.get_user_export_locations(
parent_share)[0]
# Create a client User Virtual Machine
instance = self.wait_for_active_instance(instance["id"])
self.addCleanup(self.servers_client.delete_server, instance['id'])
# 3 - SSH to UVM, ok, connected
remote_client = self.init_remote_client(instance)
# 4 - Provide RW access to S1, ok, provided
self.allow_access(share=parent_share,
instance=instance,
remote_client=remote_client,
locations=parent_share_export_location)
# 5 - Try mount S1 to UVM, ok, mounted
parent_share_dir = "/mnt/parent"
remote_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
self.mount_share(parent_share_export_location,
remote_client,
parent_share_dir)
self.addCleanup(self.unmount_share, remote_client, parent_share_dir)
# 6 - Create "file1", ok, created
remote_client.exec_command("sudo touch %s/file1" % parent_share_dir)
# 7 - Create snapshot SS1 from S1, ok, created
snapshot = self._create_snapshot(parent_share['id'])
# 8 - Create "file2" in share S1 - ok, created. We expect that
# snapshot will not contain any data created after snapshot creation.
remote_client.exec_command("sudo touch %s/file2" % parent_share_dir)
# 9 - Create share S2 from SS1, ok, created
child_share = self.create_share(snapshot_id=snapshot["id"])
# 10 - Try mount S2 - fail, access denied. We test that child share
# did not get access rules from parent share.
child_share_export_location = self.get_user_export_locations(
child_share)[0]
child_share_dir = "/mnt/child"
remote_client.exec_command("sudo mkdir -p %s" % child_share_dir)
self.assertRaises(
exceptions.SSHExecCommandFailed,
self.mount_share,
child_share_export_location, remote_client, child_share_dir,
)
# 11 - Provide RW access to S2, ok, provided
self.allow_access(share=child_share,
instance=instance,
remote_client=remote_client,
locations=child_share_export_location)
# 12 - Try mount S2, ok, mounted
self.mount_share(child_share_export_location,
remote_client,
child_share_dir)
self.addCleanup(self.unmount_share, remote_client, child_share_dir)
# 13 - List files on S2, only "file1" exists
output = remote_client.exec_command(
"sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
# 14 - Create file3 on S2, ok, file created
remote_client.exec_command("sudo touch %s/file3" % child_share_dir)
# 15 - List files on S1, two files exist - "file1" and "file2"
output = remote_client.exec_command(
"sudo ls -lRA %s" % parent_share_dir)
self.assertIn('file1', output)
self.assertIn('file2', output)
self.assertNotIn('file3', output)
# 16 - List files on S2, two files exist - "file1" and "file3"
output = remote_client.exec_command(
"sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
self.assertIn('file3', output)
@decorators.idempotent_id('c98e6876-3a4f-40e8-8b4f-023c94c242c3')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@utils.skip_if_microversion_not_supported("2.32")
@testtools.skipUnless(CONF.share.run_mount_snapshot_tests,
'Mountable snapshots tests are disabled.')
@testtools.skipUnless(CONF.share.run_snapshot_tests,
"Snapshot tests are disabled.")
def test_read_mountable_snapshot(self):
# 1 - Create UVM, ok, created
instance = self.boot_instance(wait_until="BUILD")
# 2 - Create share S1, ok, created
extra_specs = {'snapshot_support': True}
parent_share = self.create_share(extra_specs=extra_specs)
user_export_location = self.get_user_export_locations(parent_share)[0]
# Create client User Virtual Machine
instance = self.wait_for_active_instance(instance["id"])
self.addCleanup(self.servers_client.delete_server, instance['id'])
# 3 - SSH to UVM, ok, connected
remote_client = self.init_remote_client(instance)
# 4 - Provide RW access to S1, ok, provided
self.allow_access(share=parent_share,
instance=instance,
remote_client=remote_client,
locations=user_export_location)
# 5 - Try mount S1 to UVM, ok, mounted
parent_share_dir = "/mnt/parent"
snapshot_dir = "/mnt/snapshot_dir"
remote_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
remote_client.exec_command("sudo mkdir -p %s" % snapshot_dir)
self.mount_share(user_export_location, remote_client, parent_share_dir)
self.addCleanup(self.unmount_share, remote_client, parent_share_dir)
# 6 - Create "file1", ok, created
remote_client.exec_command("sudo touch %s/file1" % parent_share_dir)
# 7 - Create snapshot SS1 from S1, ok, created
snapshot = self._create_snapshot(parent_share['id'])
snapshot_export_location = self.get_user_export_locations(
snapshot=snapshot)[0]
# 8 - Create "file2" in share S1 - ok, created. We expect that
# snapshot will not contain any data created after snapshot creation.
remote_client.exec_command("sudo touch %s/file2" % parent_share_dir)
# 9 - Allow access to SS1
self.allow_access(share=parent_share,
instance=instance,
snapshot=snapshot,
remote_client=remote_client,
locations=snapshot_export_location)
# 10 - Mount SS1
self.mount_share(snapshot_export_location, remote_client, snapshot_dir)
self.addCleanup(self.unmount_share, remote_client, snapshot_dir)
# 11 - List files on SS1, only "file1" exists
# NOTE(lseki): using ls without recursion to avoid permission denied
# error while listing lost+found directory on LVM volumes
output = remote_client.exec_command("sudo ls -lA %s" % snapshot_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
# 12 - Try to create a file on SS1, should fail
self.assertRaises(
exceptions.SSHExecCommandFailed,
remote_client.exec_command,
"sudo touch %s/file3" % snapshot_dir)
@ddt.ddt
class TestShareBasicOpsNFS(manager.BaseShareScenarioNFSTest,
ShareBasicOpsBase):
@decorators.idempotent_id('4bad2073-a19b-4851-8cbe-75b20ade5cdb')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data(*utils.deduplicate(CONF.share.nfs_versions))
def test_mount_share_one_vm(self, nfs_version):
self.nfs_version = nfs_version
super(TestShareBasicOpsNFS, self).test_mount_share_one_vm()
class TestShareBasicOpsCIFS(manager.BaseShareScenarioCIFSTest,
ShareBasicOpsBase):
@decorators.idempotent_id('4344a47a-d316-496b-97a4-12a59297950a')
@tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
def test_write_with_ro_access(self):
msg = ("Skipped for CIFS protocol because RO access is not "
"supported for shares by IP.")
raise self.skipException(msg)
@decorators.idempotent_id('a691332b-dd7a-4041-9bbd-3893e168aefa')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_mountable_snapshot(self):
msg = "Skipped for CIFS protocol because of bug/1649573"
raise self.skipException(msg)
@decorators.idempotent_id('8c936c3e-4793-49d2-8409-4038f03e7012')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_write_data_to_share_created_from_snapshot(self):
msg = "Skipped for CIFS protocol because of bug/1649573"
raise self.skipException(msg)
class TestBaseShareBasicOpsScenarioCEPHFS(manager.BaseShareScenarioCEPHFSTest,
ShareBasicOpsBase):
@decorators.idempotent_id('9fb12879-45b3-4042-acac-82be338dbde1')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_mount_share_one_vm_with_ceph_fuse_client(self):
self.mount_client = 'fuse'
super(TestBaseShareBasicOpsScenarioCEPHFS,
self).test_mount_share_one_vm()
@decorators.idempotent_id('a2a70b94-f5fc-438a-9dfa-53aa60ee3949')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_write_with_ro_access_with_ceph_fuse_client(self):
self.mount_client = 'fuse'
super(TestBaseShareBasicOpsScenarioCEPHFS,
self).test_write_with_ro_access()
@decorators.idempotent_id('c247f51f-0ffc-4a4f-894c-781647619faf')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms_with_ceph_fuse_client(self):
self.mount_client = 'fuse'
super(TestBaseShareBasicOpsScenarioCEPHFS,
self).test_read_write_two_vms()
@decorators.idempotent_id('5bd64c46-05f4-4891-a08f-e146d1a76437')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_write_data_to_share_created_from_snapshot_with_ceph_fuse_client(
self):
self.mount_client = 'fuse'
super(TestBaseShareBasicOpsScenarioCEPHFS,
self).test_write_data_to_share_created_from_snapshot()
class TestShareBasicOpsNFSIPv6(TestShareBasicOpsNFS):
ip_version = 6
# NOTE(u_glide): this function is required to exclude ShareBasicOpsBase from
# executed test cases.
# See: https://docs.python.org/3/library/unittest.html#load-tests-protocol
# for details.
def load_tests(loader, tests, _):
result = []
for test_case in tests:
if type(test_case._tests[0]) is ShareBasicOpsBase:
continue
result.append(test_case)
return loader.suiteClass(result)