Introducing a timing logger to save the system test results

in a nice formated html table and a .csv file.

Updating some of log steps to be log_test_case_step rather than
log_info.

Change-Id: I7f43b19f2a1cbb8355e6023ec643953f0290c06d
Signed-off-by: Vitor Vidal <vitor.vidaldenegreiros@windriver.com>
This commit is contained in:
Vitor Vidal
2025-11-13 16:54:40 -03:00
parent 742ef98be4
commit 043283a166
2 changed files with 194 additions and 2 deletions

View File

@@ -0,0 +1,186 @@
import csv
import os
from datetime import datetime
from typing import Dict, List
from framework.validation.validation import validate_equals
# Default directory for saving benchmark timing logs
DEFAULT_BENCHMARK_LOG_DIR = "benchmark_results"
# HTML template for benchmark results
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>{title}</title>
<style>
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: center; }}
th {{ background-color: #f2f2f2; }}
.metric {{ font-weight: bold; text-align: left; }}
</style>
</head>
<body>
<h2>{title}</h2>
<table>
<thead>
<tr>
<th class="metric">Metric</th>
{iteration_headers}
<th>Average</th>
</tr>
</thead>
<tbody>
{table_rows}
</tbody>
</table>
</body>
</html>
"""
class TimingLogger:
"""
Utility class to log system test benchmark timings to HTML and CSV files.
This class provides functionality to record timing measurements for benchmark tests
and generate both CSV and HTML reports with statistical summaries.
Attributes:
benchmark_type (str): Type/name of the benchmark being measured
output_dir (str): Directory where output files will be saved
csv_file (str): Full path to the CSV output file
html_file (str): Full path to the HTML output file
Example:
>>> logger = TimingLogger("container_deployment")
>>> logger.log_timings(45.2, 12.8, 8.5)
"""
def __init__(self, benchmark_type: str, output_dir: str = DEFAULT_BENCHMARK_LOG_DIR, column_headers: List[str] = None):
"""
Initialize the TimingLogger.
Args:
benchmark_type (str): Name/type of benchmark (used in filenames)
output_dir (str, optional): Directory to save files. Defaults to DEFAULT_BENCHMARK_LOG_DIR.
column_headers (List[str], optional): Custom column headers. Defaults to ['Deploy Time (s)', 'Scale Up Time (s)', 'Scale Down Time (s)'].
Creates:
- Output directory if it doesn't exist
- CSV file path: {output_dir}/{benchmark_type}_benchmark_timings.csv
- HTML file path: {output_dir}/{benchmark_type}_benchmark_timings.html
"""
self.benchmark_type = benchmark_type
self.output_dir = output_dir
self.csv_file = os.path.join(output_dir, f"{benchmark_type}_benchmark_timings.csv")
self.html_file = os.path.join(output_dir, f"{benchmark_type}_benchmark_timings.html")
self.column_headers = column_headers or ['Deploy Time (s)', 'Scale Up Time (s)', 'Scale Down Time (s)']
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
def log_timings(self, *timing_values: float):
"""
Log timing measurements to both CSV and HTML files.
Args:
*timing_values (float): Variable number of timing values in seconds
This method appends the timing data to the CSV file and regenerates
the HTML report with updated statistics including averages.
"""
validate_equals(len(timing_values), len(self.column_headers),
f"Expected {len(self.column_headers)}, got {len(timing_values)}")
self._log_to_csv(*timing_values)
self._generate_html_table()
def _log_to_csv(self, *timing_values: float):
"""
Append timing data to CSV file with timestamp.
Args:
*timing_values (float): Variable number of timing values in seconds
Creates CSV header if file doesn't exist. Each row contains:
timestamp, timing_value1, timing_value2, ...
"""
file_exists = os.path.exists(self.csv_file)
with open(self.csv_file, 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(['Timestamp'] + self.column_headers)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
formatted_values = [f"{value:.2f}" for value in timing_values]
writer.writerow([timestamp] + formatted_values)
def _read_csv_data(self) -> List[Dict]:
"""
Read existing CSV data into a list of dictionaries.
Returns:
List[Dict]: List of timing records, each as a dictionary with keys:
'Timestamp', 'Deploy Time (s)', 'Scale Up Time (s)', 'Scale Down Time (s)'
Returns empty list if CSV file doesn't exist.
"""
data = []
if os.path.exists(self.csv_file):
with open(self.csv_file, 'r') as csvfile:
reader = csv.DictReader(csvfile)
data = list(reader)
return data
def _calculate_averages(self, data: List[Dict]) -> Dict[str, float]:
"""
Calculate average timing values from CSV data.
Args:
data (List[Dict]): List of timing records from CSV
Returns:
Dict[str, float]: Dictionary with column headers as keys containing average times.
"""
if not data:
return {header: 0.0 for header in self.column_headers}
avg_timings = {}
for header in self.column_headers:
values = [float(row[header]) for row in data]
avg_timings[header] = sum(values) / len(values)
return avg_timings
def _generate_html_table(self):
"""
Generate HTML report from CSV data using template.
Creates an HTML file with a table showing all timing iterations
and calculated averages.
"""
data = self._read_csv_data()
averages = self._calculate_averages(data)
# Generate iteration headers
iteration_headers = "".join(f"<th>Iteration {i+1}</th>" for i in range(len(data)))
# Generate table rows
table_rows = []
for header in self.column_headers:
row_cells = [f'<td class="metric">{header}</td>']
row_cells.extend(f"<td>{row[header]}</td>" for row in data)
row_cells.append(f"<td><strong>{averages[header]:.2f}</strong></td>")
table_rows.append(f"<tr>{''.join(row_cells)}</tr>")
# Format HTML using template
html_content = HTML_TEMPLATE.format(
title=f"{self.benchmark_type.title()} Benchmark Results",
iteration_headers=iteration_headers,
table_rows="".join(table_rows)
)
with open(self.html_file, 'w') as htmlfile:
htmlfile.write(html_content)

View File

@@ -22,6 +22,8 @@ from keywords.k8s.deployments.kubectl_get_deployments_keywords import KubectlGet
import os
from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords
from framework.validation.validation import validate_equals
from keywords.system_test.timing_logger import TimingLogger
from keywords.docker.images.docker_images_keywords import DockerImagesKeywords
IMAGES = [
"gcr.io/kubernetes-e2e-test-images/resource-consumer:1.4",
@@ -74,7 +76,7 @@ def test_deploy_benchmark_pods_large(request):
def deploy_benchmark_pods(request, benchmark):
"""
Deploys pods for the selected benchmark type.
Scale up and down the deployments and mea'sures the time taken for each operation.
Scale up and down the deployments and measures the time taken for each operation.
Args:
request: pytest request object
@@ -87,6 +89,7 @@ def deploy_benchmark_pods(request, benchmark):
remote_services_dir = "/tmp/system_test/services"
local_deployments_dir = get_stx_resource_path(f"{DEPLOYMENTS_PATH}/{benchmark}")
remote_deployments_dir = f"/tmp/system_test/deployments/{benchmark}"
timing_logger = TimingLogger(f"{benchmark}_container_deployment")
setup_upload_files(local_services_dir, remote_services_dir, local_deployments_dir, remote_deployments_dir)
@@ -127,10 +130,13 @@ def deploy_benchmark_pods(request, benchmark):
scale_up_time = scale_deployments(ssh_connection, SCALE_FACTOR, namespace)
get_logger().log_info(f"Time to scale up pods: {scale_up_time:.2f} seconds")
get_logger().log_test_case_step("Scaling down all deployments tand calculating time...")
get_logger().log_test_case_step("Scaling down all deployments and calculating time...")
scale_down_time = scale_deployments(ssh_connection, 0, namespace)
get_logger().log_info(f"Time to scale down pods: {scale_down_time:.2f} seconds")
# Log all timings to CSV and HTML files
timing_logger.log_timings(deploy_time, scale_up_time, scale_down_time)
def teardown():
deployments_output = KubectlGetDeploymentsKeywords(ssh_connection).get_deployments(namespace=namespace)
deployments_objs = deployments_output.get_deployments()