Migrate backup test cases under test_delete_subcloud_backup.py

- migrate backup test cases from old framework to new framework.
  - Add test case for test_delete_backup_group_on_local.
  - Implement keywords related to backup operations.

Change-Id: I949de4518eef1055d639269cdea71e6c3d383493
Signed-off-by: Swapna Gorre <swapna.gorre@windriver.com>
This commit is contained in:
Swapna Gorre
2025-03-06 06:01:38 -05:00
parent a577ea8202
commit f6cef1bdd2
3 changed files with 136 additions and 82 deletions

View File

@@ -4,6 +4,7 @@ from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.files.file_keywords import FileKeywords
@@ -21,6 +22,24 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
"""
self.ssh_connection = ssh_connection
def get_backup_path(self, subcloud_name: str, release: str, local_only: bool = False) -> str:
"""
Generate the backup path for a given subcloud and release.
Args:
subcloud_name (str): The name of the subcloud.
release (str): The release version associated with the backup.
local_only (bool, optional): If True, returns the local subcloud backup path;
otherwise, returns the central cloud backup path. Defaults to False.
Returns:
str: The full backup path based on the given parameters.
"""
if local_only:
return f"/opt/platform-backup/backups/{release}/"
return f"/opt/dc-vault/backups/{subcloud_name}/{release}"
def create_subcloud_backup(
self,
sysadmin_password: str,
@@ -67,11 +86,17 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
if group:
# Use wait_for_backup_creation to ensure the file is created
for subcloud_name in subcloud_list:
central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}"
self.wait_for_backup_creation(con_ssh, central_path, subcloud_name)
ssh_connection = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) if local_only else con_ssh
backup_path = self.get_backup_path(subcloud_name, release, local_only)
if local_only:
backup_path = f"{backup_path}{subcloud_name}_platform_backup_*.tgz"
self.wait_for_backup_creation(ssh_connection, backup_path, subcloud_name)
else:
self.wait_for_backup_creation(con_ssh, path, subcloud)
@@ -161,8 +186,10 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword):
if group:
for subcloud_name in subcloud_list:
central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}"
self.wait_for_backup_deletion(con_ssh, central_path, subcloud_name)
ssh_connection = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) if local_only else con_ssh
backup_path = self.get_backup_path(subcloud_name, release, local_only)
self.wait_for_backup_deletion(ssh_connection, backup_path, subcloud_name)
else:
self.wait_for_backup_deletion(con_ssh, path, subcloud)

View File

@@ -32,17 +32,11 @@ class FileKeywords(BaseKeyword):
sftp_client = self.ssh_connection.get_sftp_client()
sftp_client.get(remote_file_path, local_file_path)
except Exception as e:
get_logger().log_error(
f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}"
)
raise KeywordException(
f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}"
)
get_logger().log_error(f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}")
raise KeywordException(f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}")
return True
def upload_file(
self, local_file_path: str, remote_file_path: str, overwrite: bool = True
) -> bool:
def upload_file(self, local_file_path: str, remote_file_path: str, overwrite: bool = True) -> bool:
"""
Method to upload a file.
@@ -65,12 +59,8 @@ class FileKeywords(BaseKeyword):
sftp_client = self.ssh_connection.get_sftp_client()
sftp_client.put(local_file_path, remote_file_path)
except Exception as e:
get_logger().log_error(
f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}"
)
raise KeywordException(
f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}"
)
get_logger().log_error(f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}")
raise KeywordException(f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}")
return True
def file_exists(self, file_name: str) -> bool:
@@ -142,9 +132,7 @@ class FileKeywords(BaseKeyword):
grep_arg = f"| grep {grep_pattern}"
while time.time() < end_time:
output = self.ssh_connection.send(
f"sed -n '{start_line},{end_line}p' {file_name} {grep_arg}"
)
output = self.ssh_connection.send(f"sed -n '{start_line},{end_line}p' {file_name} {grep_arg}")
if not output: # if we get no more output we are at end of file
break
total_output.extend(output)
@@ -171,12 +159,23 @@ class FileKeywords(BaseKeyword):
output = self.ssh_connection.send_as_sudo(cmd)
# Handle encoding issues
output = "".join(
[line.replace("‘", "").replace("’", "") for line in output]
)
output = "".join([line.replace("‘", "").replace("’", "") for line in output])
return "No such file or directory" not in output
except Exception as e:
get_logger().log_error(f"Failed to check file existence at {path}: {e}")
raise KeywordException(f"Failed to check file existence at {path}: {e}")
def delete_folder_with_sudo(self, folder_path: str) -> bool:
"""
Deletes the folder.
Args:
folder_path (str): path to the folder.
Returns:
bool: True if delete successful, False otherwise.
"""
self.ssh_connection.send_as_sudo(f"rm -r -f {folder_path}")
return self.validate_file_exists_with_sudo(folder_path)

View File

@@ -5,28 +5,14 @@ from pytest import fail, mark
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import (
DcManagerSubcloudBackupKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_group_keywords import (
DcmanagerSubcloudGroupKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import (
DcManagerSubcloudListKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_update_keywords import (
DcManagerSubcloudUpdateKeywords,
)
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_list_object_filter import (
DcManagerSubcloudListObjectFilter,
)
from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import (
GetHostsKeywords,
)
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import DcManagerSubcloudBackupKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_group_keywords import DcmanagerSubcloudGroupKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_update_keywords import DcManagerSubcloudUpdateKeywords
from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_list_object_filter import DcManagerSubcloudListObjectFilter
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.host.system_host_list_keywords import (
SystemHostListKeywords,
)
from keywords.cloud_platform.version_info.cloud_platform_version_manager import CloudPlatformVersionManagerClass
from keywords.files.file_keywords import FileKeywords
@mark.p2
@@ -43,8 +29,7 @@ def test_delete_backup_central(request):
"""
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name()
host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host)
release = CloudPlatformVersionManagerClass().get_sw_version()
# Gets the lowest subcloud (the subcloud with the lowest id).
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
@@ -56,12 +41,6 @@ def test_delete_backup_central(request):
lab_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name)
subcloud_password = lab_config.get_admin_credentials().get_password()
# Get the sw_version if available (used in vbox environments).
release = host_show_output.get_sw_version()
# If sw_version is not available, fall back to software_load (used in physical labs).
if not release:
release = host_show_output.get_software_load()
dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh)
# Path to where the backup file will store.
@@ -69,8 +48,8 @@ def test_delete_backup_central(request):
def teardown():
get_logger().log_info("Removing test files during teardown")
central_ssh.send_as_sudo("rm -r -f /opt/dc-vault/backups/")
subcloud_ssh.send_as_sudo("rm -r -f /opt/platform-backup/backups/")
FileKeywords(central_ssh).delete_folder_with_sudo("/opt/dc-vault/backups/")
FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/")
request.addfinalizer(teardown)
@@ -97,8 +76,7 @@ def test_delete_backup_local(request):
"""
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name()
host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host)
release = CloudPlatformVersionManagerClass().get_sw_version()
# Gets the lowest subcloud (the subcloud with the lowest id).
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
@@ -110,12 +88,6 @@ def test_delete_backup_local(request):
lab_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name)
subcloud_password = lab_config.get_admin_credentials().get_password()
# Get the sw_version if available (used in vbox environments).
release = host_show_output.get_sw_version()
# If sw_version is not available, fall back to software_load (used in physical labs).
if not release:
release = host_show_output.get_software_load()
dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh)
# Path to where the backup file will store.
@@ -123,7 +95,7 @@ def test_delete_backup_local(request):
def teardown():
get_logger().log_info("Removing test files during teardown")
subcloud_ssh.send_as_sudo("rm -r -f /opt/platform-backup/backups/")
FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/")
request.addfinalizer(teardown)
@@ -183,7 +155,7 @@ def create_subcloud_group(subcloud_list: List[str]) -> None:
# Checking Subcloud's assigned to the group correctly
get_logger().log_info("Checking Subcloud's in the new group")
group_list = subcloud_group_keywords.get_dcmanager_subcloud_group_list_subclouds(group_name).get_dcmanager_subcloud_group_list_subclouds()
subclouds = [subcloud.name for subcloud in group_list]
subclouds = [subcloud.get_name() for subcloud in group_list]
validate_equals(subclouds, subcloud_list, "Checking Subcloud's assigned to the group correctly")
@@ -204,14 +176,69 @@ def test_delete_backup_group_on_central(request):
group_name = "Test"
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name()
host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host)
release = CloudPlatformVersionManagerClass().get_sw_version()
# Get the sw_version if available (used in vbox environments).
release = host_show_output.get_sw_version()
# If sw_version is not available, fall back to software_load (used in physical labs).
if not release:
release = host_show_output.get_software_load()
# Retrieves the subclouds. Considers only subclouds that are online, managed, and synced.
dcmanager_subcloud_list_input = DcManagerSubcloudListObjectFilter.get_healthy_subcloud_filter()
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh)
dcmanager_subcloud_list_objects_filtered = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_dcmanager_subcloud_list_objects_filtered(dcmanager_subcloud_list_input)
subcloud_list = [subcloud.get_name() for subcloud in dcmanager_subcloud_list_objects_filtered]
if len(subcloud_list) < 2:
get_logger().log_info("At least two subclouds managed are required to run the test")
fail("At least two subclouds managed are required to run the test")
# Gets the subcloud sysadmin password needed for backup creation.
subcloud_password = ConfigurationManager.get_lab_config().get_subcloud(subcloud_list[0]).get_admin_credentials().get_password()
# Create a subcloud group and add 2 subclouds
create_subcloud_group(subcloud_list)
dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh)
def teardown_backup():
get_logger().log_info("Removing test files during teardown")
FileKeywords(central_ssh).delete_folder_with_sudo("/opt/dc-vault/backups/")
request.addfinalizer(teardown_backup)
def teardown_group():
get_logger().log_info("Removing the created subcloud group during teardown")
for subcloud_name in subcloud_list:
DcManagerSubcloudUpdateKeywords(central_ssh).dcmanager_subcloud_update(subcloud_name, "group", "Default")
DcmanagerSubcloudGroupKeywords(central_ssh).dcmanager_subcloud_group_delete(group_name)
request.addfinalizer(teardown_group)
# Create a subcloud backup
get_logger().log_info(f"Create backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, group=group_name, release=release, subcloud_list=subcloud_list)
# Delete the backup created
get_logger().log_info(f"Delete backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, group=group_name, subcloud_list=subcloud_list)
@mark.p2
@mark.lab_has_min_2_subclouds
def test_delete_backup_group_on_local(request):
"""
Verify delete subcloud group backup on local path
Test Steps:
- Create a subcloud group and add 2 subclouds
- Create a Subcloud backup and check it on local path
- Delete the backup created and verify the backup is deleted
Teardown:
- Remove files created while the Tc was running.
- Delete the subcloud group
"""
group_name = "Test"
central_ssh = LabConnectionKeywords().get_active_controller_ssh()
release = CloudPlatformVersionManagerClass().get_sw_version()
# Retrieves the subclouds. Considers only subclouds that are online, managed, and synced.
dcmanager_subcloud_list_input = DcManagerSubcloudListObjectFilter.get_healthy_subcloud_filter()
@@ -233,7 +260,9 @@ def test_delete_backup_group_on_central(request):
def teardown_backup():
get_logger().log_info("Removing test files during teardown")
central_ssh.send_as_sudo("rm -r -f /opt/dc-vault/backups/")
for subcloud_name in subcloud_list:
subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name)
FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/")
request.addfinalizer(teardown_backup)
@@ -241,15 +270,14 @@ def test_delete_backup_group_on_central(request):
get_logger().log_info("Removing the created subcloud group during teardown")
for subcloud_name in subcloud_list:
DcManagerSubcloudUpdateKeywords(central_ssh).dcmanager_subcloud_update(subcloud_name, "group", "Default")
DcmanagerSubcloudGroupKeywords(central_ssh).dcmanager_subcloud_group_delete(group_name)
request.addfinalizer(teardown_group)
# Create a subcloud backup
get_logger().log_info(f"Create backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, group=group_name, release=release, subcloud_list=subcloud_list)
# Create a subcloud backup and check it on local path
get_logger().log_info(f"Create and check if backup was was created on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, local_only=True, group=group_name, release=release, subcloud_list=subcloud_list)
# Delete the backup created
get_logger().log_info(f"Delete backup on Central Cloud for subcloud group: {group_name}")
dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, group=group_name, subcloud_list=subcloud_list)
# Delete the backup created and verify the backup is deleted
get_logger().log_info(f"Delete and check if backup was removed on SubClouds for subcloud group: {group_name}")
dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, local_only=True, group=group_name, sysadmin_password=subcloud_password, subcloud_list=subcloud_list)