diff --git a/keywords/cloud_platform/dcmanager/dcmanager_subcloud_backup_keywords.py b/keywords/cloud_platform/dcmanager/dcmanager_subcloud_backup_keywords.py index 9b16c495..0cad7b5e 100644 --- a/keywords/cloud_platform/dcmanager/dcmanager_subcloud_backup_keywords.py +++ b/keywords/cloud_platform/dcmanager/dcmanager_subcloud_backup_keywords.py @@ -4,6 +4,7 @@ from framework.ssh.ssh_connection import SSHConnection from framework.validation.validation import validate_equals_with_retry from keywords.base_keyword import BaseKeyword from keywords.cloud_platform.command_wrappers import source_openrc +from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords from keywords.files.file_keywords import FileKeywords @@ -21,6 +22,24 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword): """ self.ssh_connection = ssh_connection + def get_backup_path(self, subcloud_name: str, release: str, local_only: bool = False) -> str: + """ + Generate the backup path for a given subcloud and release. + + Args: + subcloud_name (str): The name of the subcloud. + release (str): The release version associated with the backup. + local_only (bool, optional): If True, returns the local subcloud backup path; + otherwise, returns the central cloud backup path. Defaults to False. + + Returns: + str: The full backup path based on the given parameters. + """ + if local_only: + return f"/opt/platform-backup/backups/{release}/" + + return f"/opt/dc-vault/backups/{subcloud_name}/{release}" + def create_subcloud_backup( self, sysadmin_password: str, @@ -67,11 +86,17 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword): self.ssh_connection.send(source_openrc(cmd)) self.validate_success_return_code(self.ssh_connection) + if group: - # Use wait_for_backup_creation to ensure the file is created for subcloud_name in subcloud_list: - central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}" - self.wait_for_backup_creation(con_ssh, central_path, subcloud_name) + ssh_connection = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) if local_only else con_ssh + backup_path = self.get_backup_path(subcloud_name, release, local_only) + + if local_only: + backup_path = f"{backup_path}{subcloud_name}_platform_backup_*.tgz" + + self.wait_for_backup_creation(ssh_connection, backup_path, subcloud_name) + else: self.wait_for_backup_creation(con_ssh, path, subcloud) @@ -161,8 +186,10 @@ class DcManagerSubcloudBackupKeywords(BaseKeyword): if group: for subcloud_name in subcloud_list: - central_path = f"/opt/dc-vault/backups/{subcloud_name}/{release}" - self.wait_for_backup_deletion(con_ssh, central_path, subcloud_name) + ssh_connection = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) if local_only else con_ssh + backup_path = self.get_backup_path(subcloud_name, release, local_only) + + self.wait_for_backup_deletion(ssh_connection, backup_path, subcloud_name) else: self.wait_for_backup_deletion(con_ssh, path, subcloud) diff --git a/keywords/files/file_keywords.py b/keywords/files/file_keywords.py index d859b529..8e84bd3c 100644 --- a/keywords/files/file_keywords.py +++ b/keywords/files/file_keywords.py @@ -32,17 +32,11 @@ class FileKeywords(BaseKeyword): sftp_client = self.ssh_connection.get_sftp_client() sftp_client.get(remote_file_path, local_file_path) except Exception as e: - get_logger().log_error( - f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}" - ) - raise KeywordException( - f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}" - ) + get_logger().log_error(f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}") + raise KeywordException(f"Exception while downloading remote file [{remote_file_path}] to [{local_file_path}]. {e}") return True - def upload_file( - self, local_file_path: str, remote_file_path: str, overwrite: bool = True - ) -> bool: + def upload_file(self, local_file_path: str, remote_file_path: str, overwrite: bool = True) -> bool: """ Method to upload a file. @@ -65,12 +59,8 @@ class FileKeywords(BaseKeyword): sftp_client = self.ssh_connection.get_sftp_client() sftp_client.put(local_file_path, remote_file_path) except Exception as e: - get_logger().log_error( - f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}" - ) - raise KeywordException( - f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}" - ) + get_logger().log_error(f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}") + raise KeywordException(f"Exception while uploading local file [{local_file_path}] to [{remote_file_path}]. {e}") return True def file_exists(self, file_name: str) -> bool: @@ -142,9 +132,7 @@ class FileKeywords(BaseKeyword): grep_arg = f"| grep {grep_pattern}" while time.time() < end_time: - output = self.ssh_connection.send( - f"sed -n '{start_line},{end_line}p' {file_name} {grep_arg}" - ) + output = self.ssh_connection.send(f"sed -n '{start_line},{end_line}p' {file_name} {grep_arg}") if not output: # if we get no more output we are at end of file break total_output.extend(output) @@ -171,12 +159,23 @@ class FileKeywords(BaseKeyword): output = self.ssh_connection.send_as_sudo(cmd) # Handle encoding issues - output = "".join( - [line.replace("‘", "").replace("’", "") for line in output] - ) + output = "".join([line.replace("‘", "").replace("’", "") for line in output]) return "No such file or directory" not in output except Exception as e: get_logger().log_error(f"Failed to check file existence at {path}: {e}") raise KeywordException(f"Failed to check file existence at {path}: {e}") + + def delete_folder_with_sudo(self, folder_path: str) -> bool: + """ + Deletes the folder. + + Args: + folder_path (str): path to the folder. + + Returns: + bool: True if delete successful, False otherwise. + """ + self.ssh_connection.send_as_sudo(f"rm -r -f {folder_path}") + return self.validate_file_exists_with_sudo(folder_path) diff --git a/testcases/cloud_platform/regression/dc/backup_restore/test_delete_subcloud_backup.py b/testcases/cloud_platform/regression/dc/backup_restore/test_delete_subcloud_backup.py index d59b80b3..90ab7d54 100644 --- a/testcases/cloud_platform/regression/dc/backup_restore/test_delete_subcloud_backup.py +++ b/testcases/cloud_platform/regression/dc/backup_restore/test_delete_subcloud_backup.py @@ -5,28 +5,14 @@ from pytest import fail, mark from config.configuration_manager import ConfigurationManager from framework.logging.automation_logger import get_logger from framework.validation.validation import validate_equals -from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import ( - DcManagerSubcloudBackupKeywords, -) -from keywords.cloud_platform.dcmanager.dcmanager_subcloud_group_keywords import ( - DcmanagerSubcloudGroupKeywords, -) -from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import ( - DcManagerSubcloudListKeywords, -) -from keywords.cloud_platform.dcmanager.dcmanager_subcloud_update_keywords import ( - DcManagerSubcloudUpdateKeywords, -) -from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_list_object_filter import ( - DcManagerSubcloudListObjectFilter, -) -from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import ( - GetHostsKeywords, -) +from keywords.cloud_platform.dcmanager.dcmanager_subcloud_backup_keywords import DcManagerSubcloudBackupKeywords +from keywords.cloud_platform.dcmanager.dcmanager_subcloud_group_keywords import DcmanagerSubcloudGroupKeywords +from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords +from keywords.cloud_platform.dcmanager.dcmanager_subcloud_update_keywords import DcManagerSubcloudUpdateKeywords +from keywords.cloud_platform.dcmanager.objects.dcmanager_subcloud_list_object_filter import DcManagerSubcloudListObjectFilter from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords -from keywords.cloud_platform.system.host.system_host_list_keywords import ( - SystemHostListKeywords, -) +from keywords.cloud_platform.version_info.cloud_platform_version_manager import CloudPlatformVersionManagerClass +from keywords.files.file_keywords import FileKeywords @mark.p2 @@ -43,8 +29,7 @@ def test_delete_backup_central(request): """ central_ssh = LabConnectionKeywords().get_active_controller_ssh() - host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name() - host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host) + release = CloudPlatformVersionManagerClass().get_sw_version() # Gets the lowest subcloud (the subcloud with the lowest id). dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh) @@ -56,12 +41,6 @@ def test_delete_backup_central(request): lab_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name) subcloud_password = lab_config.get_admin_credentials().get_password() - # Get the sw_version if available (used in vbox environments). - release = host_show_output.get_sw_version() - # If sw_version is not available, fall back to software_load (used in physical labs). - if not release: - release = host_show_output.get_software_load() - dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh) # Path to where the backup file will store. @@ -69,8 +48,8 @@ def test_delete_backup_central(request): def teardown(): get_logger().log_info("Removing test files during teardown") - central_ssh.send_as_sudo("rm -r -f /opt/dc-vault/backups/") - subcloud_ssh.send_as_sudo("rm -r -f /opt/platform-backup/backups/") + FileKeywords(central_ssh).delete_folder_with_sudo("/opt/dc-vault/backups/") + FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/") request.addfinalizer(teardown) @@ -97,8 +76,7 @@ def test_delete_backup_local(request): """ central_ssh = LabConnectionKeywords().get_active_controller_ssh() - host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name() - host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host) + release = CloudPlatformVersionManagerClass().get_sw_version() # Gets the lowest subcloud (the subcloud with the lowest id). dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh) @@ -110,12 +88,6 @@ def test_delete_backup_local(request): lab_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name) subcloud_password = lab_config.get_admin_credentials().get_password() - # Get the sw_version if available (used in vbox environments). - release = host_show_output.get_sw_version() - # If sw_version is not available, fall back to software_load (used in physical labs). - if not release: - release = host_show_output.get_software_load() - dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh) # Path to where the backup file will store. @@ -123,7 +95,7 @@ def test_delete_backup_local(request): def teardown(): get_logger().log_info("Removing test files during teardown") - subcloud_ssh.send_as_sudo("rm -r -f /opt/platform-backup/backups/") + FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/") request.addfinalizer(teardown) @@ -183,7 +155,7 @@ def create_subcloud_group(subcloud_list: List[str]) -> None: # Checking Subcloud's assigned to the group correctly get_logger().log_info("Checking Subcloud's in the new group") group_list = subcloud_group_keywords.get_dcmanager_subcloud_group_list_subclouds(group_name).get_dcmanager_subcloud_group_list_subclouds() - subclouds = [subcloud.name for subcloud in group_list] + subclouds = [subcloud.get_name() for subcloud in group_list] validate_equals(subclouds, subcloud_list, "Checking Subcloud's assigned to the group correctly") @@ -204,14 +176,69 @@ def test_delete_backup_group_on_central(request): group_name = "Test" central_ssh = LabConnectionKeywords().get_active_controller_ssh() - host = SystemHostListKeywords(central_ssh).get_active_controller().get_host_name() - host_show_output = GetHostsKeywords().get_hosts().get_system_host_show_object(host) + release = CloudPlatformVersionManagerClass().get_sw_version() - # Get the sw_version if available (used in vbox environments). - release = host_show_output.get_sw_version() - # If sw_version is not available, fall back to software_load (used in physical labs). - if not release: - release = host_show_output.get_software_load() + # Retrieves the subclouds. Considers only subclouds that are online, managed, and synced. + dcmanager_subcloud_list_input = DcManagerSubcloudListObjectFilter.get_healthy_subcloud_filter() + dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(central_ssh) + dcmanager_subcloud_list_objects_filtered = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list().get_dcmanager_subcloud_list_objects_filtered(dcmanager_subcloud_list_input) + + subcloud_list = [subcloud.get_name() for subcloud in dcmanager_subcloud_list_objects_filtered] + if len(subcloud_list) < 2: + get_logger().log_info("At least two subclouds managed are required to run the test") + fail("At least two subclouds managed are required to run the test") + + # Gets the subcloud sysadmin password needed for backup creation. + subcloud_password = ConfigurationManager.get_lab_config().get_subcloud(subcloud_list[0]).get_admin_credentials().get_password() + + # Create a subcloud group and add 2 subclouds + create_subcloud_group(subcloud_list) + + dc_manager_backup = DcManagerSubcloudBackupKeywords(central_ssh) + + def teardown_backup(): + get_logger().log_info("Removing test files during teardown") + FileKeywords(central_ssh).delete_folder_with_sudo("/opt/dc-vault/backups/") + + request.addfinalizer(teardown_backup) + + def teardown_group(): + get_logger().log_info("Removing the created subcloud group during teardown") + for subcloud_name in subcloud_list: + DcManagerSubcloudUpdateKeywords(central_ssh).dcmanager_subcloud_update(subcloud_name, "group", "Default") + + DcmanagerSubcloudGroupKeywords(central_ssh).dcmanager_subcloud_group_delete(group_name) + + request.addfinalizer(teardown_group) + + # Create a subcloud backup + get_logger().log_info(f"Create backup on Central Cloud for subcloud group: {group_name}") + dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, group=group_name, release=release, subcloud_list=subcloud_list) + + # Delete the backup created + get_logger().log_info(f"Delete backup on Central Cloud for subcloud group: {group_name}") + dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, group=group_name, subcloud_list=subcloud_list) + + +@mark.p2 +@mark.lab_has_min_2_subclouds +def test_delete_backup_group_on_local(request): + """ + Verify delete subcloud group backup on local path + + Test Steps: + - Create a subcloud group and add 2 subclouds + - Create a Subcloud backup and check it on local path + - Delete the backup created and verify the backup is deleted + + Teardown: + - Remove files created while the Tc was running. + - Delete the subcloud group + + """ + group_name = "Test" + central_ssh = LabConnectionKeywords().get_active_controller_ssh() + release = CloudPlatformVersionManagerClass().get_sw_version() # Retrieves the subclouds. Considers only subclouds that are online, managed, and synced. dcmanager_subcloud_list_input = DcManagerSubcloudListObjectFilter.get_healthy_subcloud_filter() @@ -233,7 +260,9 @@ def test_delete_backup_group_on_central(request): def teardown_backup(): get_logger().log_info("Removing test files during teardown") - central_ssh.send_as_sudo("rm -r -f /opt/dc-vault/backups/") + for subcloud_name in subcloud_list: + subcloud_ssh = LabConnectionKeywords().get_subcloud_ssh(subcloud_name) + FileKeywords(subcloud_ssh).delete_folder_with_sudo("/opt/platform-backup/backups/") request.addfinalizer(teardown_backup) @@ -241,15 +270,14 @@ def test_delete_backup_group_on_central(request): get_logger().log_info("Removing the created subcloud group during teardown") for subcloud_name in subcloud_list: DcManagerSubcloudUpdateKeywords(central_ssh).dcmanager_subcloud_update(subcloud_name, "group", "Default") - DcmanagerSubcloudGroupKeywords(central_ssh).dcmanager_subcloud_group_delete(group_name) request.addfinalizer(teardown_group) - # Create a subcloud backup - get_logger().log_info(f"Create backup on Central Cloud for subcloud group: {group_name}") - dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, group=group_name, release=release, subcloud_list=subcloud_list) + # Create a subcloud backup and check it on local path + get_logger().log_info(f"Create and check if backup was was created on Central Cloud for subcloud group: {group_name}") + dc_manager_backup.create_subcloud_backup(subcloud_password, central_ssh, local_only=True, group=group_name, release=release, subcloud_list=subcloud_list) - # Delete the backup created - get_logger().log_info(f"Delete backup on Central Cloud for subcloud group: {group_name}") - dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, group=group_name, subcloud_list=subcloud_list) + # Delete the backup created and verify the backup is deleted + get_logger().log_info(f"Delete and check if backup was removed on SubClouds for subcloud group: {group_name}") + dc_manager_backup.delete_subcloud_backup(central_ssh, release=release, local_only=True, group=group_name, sysadmin_password=subcloud_password, subcloud_list=subcloud_list)