diff --git a/keywords/files/file_keywords.py b/keywords/files/file_keywords.py index b4876680..639db2e1 100644 --- a/keywords/files/file_keywords.py +++ b/keywords/files/file_keywords.py @@ -1,4 +1,5 @@ import math +import shlex import time from framework.exceptions.keyword_exception import KeywordException @@ -111,15 +112,18 @@ class FileKeywords(BaseKeyword): self.ssh_connection.send_as_sudo(f"rm {file_name}") return self.file_exists(file_name) - def delete_directory(self, directory_path: str) -> None: + def delete_directory(self, directory_path: str) -> bool: """Remove directory and all its contents. Args: directory_path (str): Directory path to remove. + Returns: + bool: True if delete successful, False otherwise. """ - cleanup_cmd = f"rm -rf {directory_path}" + cleanup_cmd = f"rm -rf {shlex.quote(directory_path)}" self.ssh_connection.send(cleanup_cmd) + return not self.file_exists(directory_path) def get_files_in_dir(self, file_dir: str) -> list[str]: """ diff --git a/keywords/linux/find/find_keywords.py b/keywords/linux/find/find_keywords.py index ceeac110..71ea615c 100644 --- a/keywords/linux/find/find_keywords.py +++ b/keywords/linux/find/find_keywords.py @@ -3,10 +3,10 @@ import shlex from typing import Optional -from framework.ssh.ssh_connection import SSHConnection -from framework.validation.validation import validate_not_none, validate_is_digit, validate_greater_than_or_equal, validate_less_than_or_equal -from keywords.base_keyword import BaseKeyword from framework.logging.automation_logger import get_logger +from framework.ssh.ssh_connection import SSHConnection +from framework.validation.validation import validate_is_digit, validate_not_none +from keywords.base_keyword import BaseKeyword class FindKeywords(BaseKeyword): @@ -21,9 +21,7 @@ class FindKeywords(BaseKeyword): super().__init__() self.ssh_connection = ssh_connection - def count_files_in_directory(self, directory_path: str, file_pattern: str = '*', - max_depth: int = 1, min_depth: int = 0, - file_type: str = 'f', exclude_pattern: Optional[str] = None) -> int: + def count_files_in_directory(self, directory_path: str, file_pattern: str = "*", max_depth: int = 1, min_depth: int = 0, file_type: str = "f", exclude_pattern: Optional[str] = None) -> int: """Count files/directories matching pattern in directory. Args: @@ -37,30 +35,30 @@ class FindKeywords(BaseKeyword): Returns: int: Number of matching files/directories. """ - safe_directory_path = shlex.quote(directory_path) + safe_file_pattern = shlex.quote(file_pattern) find_cmd = f"find {safe_directory_path}" - + if min_depth > 0: find_cmd += f" -mindepth {min_depth}" - find_cmd += f" -maxdepth {max_depth} -type {file_type} -name '{file_pattern}'" - + find_cmd += f" -maxdepth {max_depth} -type {file_type} -name {safe_file_pattern}" + if exclude_pattern: - find_cmd += f" | grep -v '{exclude_pattern}'" + safe_exclude_pattern = shlex.quote(exclude_pattern) + find_cmd += f" | grep -v {safe_exclude_pattern}" find_cmd += " | wc -l" - + get_logger().log_info(f"DEBUG: Executing find command: {find_cmd}") - + result = self.ssh_connection.send(find_cmd) self.validate_success_return_code(self.ssh_connection) - + validate_not_none(result, f"file count command response for pattern {file_pattern}") count_str = result[0].strip() validate_is_digit(count_str, f"file count string for pattern {file_pattern}") return int(count_str) - def find_most_recent_file(self, directory_path: str, file_pattern: str = '*', - max_depth: int = 1, min_depth: int = 0) -> Optional[str]: + def find_most_recent_file(self, directory_path: str, file_pattern: str = "*", max_depth: int = 1, min_depth: int = 0) -> Optional[str]: """Find most recent file matching pattern in directory. Args: @@ -74,14 +72,15 @@ class FindKeywords(BaseKeyword): """ safe_directory_path = shlex.quote(directory_path) find_cmd = f"find {safe_directory_path}" - + if min_depth > 0: find_cmd += f" -mindepth {min_depth}" - find_cmd += f" -maxdepth {max_depth} -type f -name '{file_pattern}' -printf '%T@ %p\n' | sort -n | tail -1 | cut -d' ' -f2-" - + safe_file_pattern = shlex.quote(file_pattern) + find_cmd += f" -maxdepth {max_depth} -type f -name {safe_file_pattern} -printf '%T@ %p\n' | sort -n | tail -1 | cut -d' ' -f2-" + result = self.ssh_connection.send(find_cmd) self.validate_success_return_code(self.ssh_connection) - + if result and result[0].strip(): return result[0].strip() - return None \ No newline at end of file + return None diff --git a/keywords/linux/tar/tar_keywords.py b/keywords/linux/tar/tar_keywords.py index 5e78a1f1..a8a2cbb3 100644 --- a/keywords/linux/tar/tar_keywords.py +++ b/keywords/linux/tar/tar_keywords.py @@ -1,3 +1,4 @@ +import os import shlex from typing import Optional @@ -7,8 +8,6 @@ from keywords.base_keyword import BaseKeyword from keywords.files.file_keywords import FileKeywords - - class TarKeywords(BaseKeyword): """ Class for linux tar command keywords @@ -60,26 +59,32 @@ class TarKeywords(BaseKeyword): str: Path to extracted directory. """ safe_tar_path = shlex.quote(tar_path) - - # Verify tar file exists + + # Verify tar file exists and has valid extension if not self.file_ops.file_exists(tar_path): raise FileNotFoundError(f"Tar file not found: {tar_path}") + if not (tar_path.endswith(".tar.gz") or tar_path.endswith(".tgz")): + raise ValueError("File must be a .tar.gz or .tgz archive.") + # Determine extraction directory and command if extract_to is None: - extract_dir = tar_path.replace('.tar.gz', '') - extract_cmd = f"cd $(dirname {safe_tar_path}) && tar -xzf $(basename {safe_tar_path})" + if tar_path.endswith(".tar.gz"): + extract_dir = tar_path.replace(".tar.gz", "") + else: # .tgz + extract_dir = tar_path.replace(".tgz", "") + dir_path = shlex.quote(os.path.dirname(tar_path)) + file_name = shlex.quote(os.path.basename(tar_path)) + extract_cmd = f"cd {dir_path} && tar -xzf {file_name}" else: extract_dir = extract_to safe_extract_dir = shlex.quote(extract_dir) self.file_ops.create_directory(safe_extract_dir) extract_cmd = f"tar -xzf {safe_tar_path} -C {safe_extract_dir}" - + get_logger().log_info(f"Extracting {tar_path} to {extract_dir}") self.ssh_connection.send(extract_cmd) self.validate_success_return_code(self.ssh_connection) - + get_logger().log_info(f"Archive extracted successfully to {extract_dir}") return extract_dir - -