Merge "Disk usauge function"

This commit is contained in:
Zuul
2025-12-04 21:29:04 +00:00
committed by Gerrit Code Review
3 changed files with 188 additions and 0 deletions

View File

@@ -0,0 +1,43 @@
"""Linux df command keywords."""
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.linux.df.df_output import DfOutput
class DfKeywords(BaseKeyword):
"""Linux df command operations."""
def __init__(self, ssh_connection: SSHConnection):
"""Initialize df keywords.
Args:
ssh_connection (SSHConnection): SSH connection to target host.
"""
super().__init__()
self.ssh_connection = ssh_connection
def get_disk_usage(self, path: str = "/") -> DfOutput:
"""Get disk usage information for specified path.
Args:
path (str): Filesystem path to check. Defaults to "/".
Returns:
DfOutput: Disk usage information collection.
"""
# Execute df command via SSH and get raw output
raw_result = self.ssh_connection.send(f"df {path}")
# Parse output and return collection of df objects
return DfOutput(raw_result)
def allocate_disk_space(self, size_gb: int, file_path: str):
"""Allocate disk space using fallocate command.
Args:
size_gb (int): Size in gigabytes to allocate.
file_path (str): Path where to create the allocation file.
"""
get_logger().log_info(f"Allocating {size_gb}G disk space to {file_path}")
self.ssh_connection.send(f"fallocate -l {size_gb}G {file_path}")

View File

@@ -0,0 +1,63 @@
"""Df command output object."""
class DfObject:
"""Container for df command output data."""
def __init__(self, filesystem: str, total_kb: int, used_kb: int, available_kb: int, usage_percent: int, mount_point: str):
"""Initialize df object with parsed data.
Args:
filesystem (str): Filesystem name/device.
total_kb (int): Total space in kilobytes.
used_kb (int): Used space in kilobytes.
available_kb (int): Available space in kilobytes.
usage_percent (int): Usage percentage (0-100).
mount_point (str): Mount point path.
"""
self.filesystem = filesystem
self.total_kb = total_kb
self.used_kb = used_kb
self.available_kb = available_kb
self.usage_percent = usage_percent
self.mount_point = mount_point
def get_usage_percentage(self) -> int:
"""Get usage percentage as integer.
Returns:
int: Usage percentage (0-100).
"""
return self.usage_percent
def get_total_kb(self) -> int:
"""Get total space in kilobytes.
Returns:
int: Total space in KB.
"""
return self.total_kb
def get_used_kb(self) -> int:
"""Get used space in kilobytes.
Returns:
int: Used space in KB.
"""
return self.used_kb
def get_filesystem(self) -> str:
"""Get filesystem name.
Returns:
str: Filesystem name/device.
"""
return self.filesystem
def get_mount_point(self) -> str:
"""Get mount point path.
Returns:
str: Mount point path.
"""
return self.mount_point

View File

@@ -0,0 +1,82 @@
"""Df command output collection."""
from typing import List
from keywords.linux.df.df_object import DfObject
class DfOutput:
"""Collection of df command results."""
def __init__(self, raw_output: List[str]):
"""Initialize df output collection.
Args:
raw_output (List[str]): Raw output lines from df command.
"""
# Initialize empty collection to store parsed df entries
self.df_entries: List[DfObject] = []
self._parse_df_output(raw_output)
def _parse_df_output(self, raw_output: List[str]) -> None:
"""Parse df command output into DfObject instances.
Args:
raw_output (List[str]): Raw output lines from df command.
"""
# Skip header line (first line contains column names)
for line in raw_output[1:]:
if line.strip():
parts = line.strip().split()
# Ensure we have all required fields (filesystem, size, used, avail, use%, mounted)
if len(parts) >= 6:
filesystem = parts[0] # Device/filesystem name
total_kb = int(parts[1]) # Total space in KB
used_kb = int(parts[2]) # Used space in KB
available_kb = int(parts[3]) # Available space in KB
usage_percent = int(parts[4].rstrip("%")) # Usage percentage (remove % symbol)
mount_point = parts[5] # Mount point path
# Create df object and add to collection
df_obj = DfObject(filesystem, total_kb, used_kb, available_kb, usage_percent, mount_point)
self.df_entries.append(df_obj)
def get_df_entries(self) -> List[DfObject]:
"""Get all df entries.
Returns:
List[DfObject]: List of all df entries.
"""
return self.df_entries
def get_df_by_mount_point(self, mount_point: str) -> DfObject:
"""Get df entry by mount point.
Args:
mount_point (str): Mount point path.
Returns:
DfObject: Df entry for the specified mount point.
Raises:
ValueError: If mount point not found.
"""
# Search through all entries for matching mount point
for entry in self.df_entries:
if entry.get_mount_point() == mount_point:
return entry
raise ValueError(f"Mount point '{mount_point}' not found")
def get_first_entry(self) -> DfObject:
"""Get first df entry.
Returns:
DfObject: First df entry.
Raises:
ValueError: If no entries available.
"""
# Return first entry (useful for single filesystem queries)
if not self.df_entries:
raise ValueError("No df entries available")
return self.df_entries[0]