TESTFramework: DC Sanity - test_dc_system_health_pre_session

Test the health of the DC System to guarantee the following requirements in the central cloud and in the subclouds:
      _ Application 'platform-integ-apps' is in 'applied' status.
      _ No alarms are present, except for the ignorable ones.
      _ The health of Kubernetes pods.

Change-Id: I91b2863dcfafdd9bec45a29682da536db893cd2a
This commit is contained in:
Marcelo Daride Gaspar 2024-12-03 16:48:01 -03:00
parent b1c33161c9
commit 3a5df0c562
5 changed files with 245 additions and 3 deletions

View File

@ -1,3 +1,6 @@
import time
from framework.logging.automation_logger import get_logger
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.fault_management.alarms.objects.alarm_list_object import AlarmListObject
@ -15,7 +18,9 @@ class AlarmListKeywords(BaseKeyword):
Args:
ssh_connection:
"""
self.ssh_connection = ssh_connection
self._ssh_connection = ssh_connection
self._check_interval_in_seconds = 3
self._timeout_in_seconds = 600
def alarm_list(self) -> [AlarmListObject]:
"""
@ -25,8 +30,140 @@ class AlarmListKeywords(BaseKeyword):
Returns: the list of alarms
"""
output = self.ssh_connection.send(source_openrc('fm alarm-list --nowrap'))
self.validate_success_return_code(self.ssh_connection)
output = self._ssh_connection.send(source_openrc('fm alarm-list --nowrap'))
self.validate_success_return_code(self._ssh_connection)
alarms = AlarmListOutput(output)
return alarms.get_alarms()
def wait_for_all_alarms_cleared(self):
"""
This method waits for all alarms to be cleared in this SSH connection within the period defined by
'get_timeout_in_seconds()'. Otherwise, this method raises TimeoutError exception.
Notes:
The alarms in this SSH connection are checked every 'get_check_interval_in_seconds()' seconds.
Returns:
None
Raises:
TimeoutError: if some alarm can not be cleared within a period defined by
'get_timeout_in_seconds()' seconds; False otherwise.
"""
# Retrieves the current alarms on this SSH connection
alarms = self.alarm_list()
now = time.time()
end_time = now + self.get_timeout_in_seconds()
while now < end_time:
if len(alarms) == 0:
get_logger().log_info(f"All alarms in this SSH connection ({self.get_ssh_connection()}) are now cleared.")
return
alarm_ids = ", ".join([alarm.get_alarm_id() for alarm in alarms])
get_logger().log_info(
f"There are still some alarms active in this SSH connection ({self.get_ssh_connection()}). Active alarms IDs: {alarm_ids}. Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds."
)
time.sleep(self.get_check_interval_in_seconds())
alarms = self.alarm_list()
now = time.time()
alarm_ids = ", ".join([alarm.get_alarm_id() for alarm in alarms])
raise TimeoutError(f"The alarms with the following IDs: {alarm_ids} could not be cleared within {self.get_timeout_in_seconds()} seconds.")
def wait_for_alarms_cleared(self, alarms: list[AlarmListObject]):
"""
This method waits for the alarms defined in 'alarms' to be cleared in this SSH connection within the period
defined by 'get_timeout_in_seconds()'. Otherwise, a TimeoutError exception is raised.
Notes:
The alarms in this SSH connection are checked every 'get_check_interval_in_seconds()' seconds.
Args:
alarms (list[AlarmListObject]): The list of alarms to be checked to see if they have been cleared in this
SSH connection.
Returns:
None
Raises:
TimeoutError: if some alarm can not be cleared within a period defined by
'get_timeout_in_seconds()' seconds; False otherwise.
"""
current_alarms = self.alarm_list()
alarm_ids = ", ".join([alarm.get_alarm_id() for alarm in alarms])
now = time.time()
end_time = now + self.get_timeout_in_seconds()
while now < end_time:
alarms_are_cleared = True
for alarm in alarms:
# Note: AlarmListObject overrides __eq__ method and the operator 'in' uses this overridden method.
if alarm in current_alarms:
get_logger().log_info(f"The alarm with ID {alarm.get_alarm_id()} is still active in this SSH connection ({self.get_ssh_connection()}).")
alarms_are_cleared = False
if alarms_are_cleared:
get_logger().log_info(f"All alarms defined by the following IDs: {alarm_ids} are now cleared in this SSH connection ({self.get_ssh_connection()}).")
return
get_logger().log_info(
f"Not all alarms with the following IDs: {alarm_ids} have been cleared in this SSH connection ({self.get_ssh_connection()}). Waiting for {self.get_check_interval_in_seconds():.3f} more seconds. Remaining time: {(end_time - now):.3f} seconds."
)
time.sleep(self._check_interval_in_seconds)
current_alarms = self.alarm_list()
now = time.time()
raise TimeoutError(f"The alarms identified by the following IDs: {alarm_ids} could not be cleared within a period of {self.get_timeout_in_seconds()} seconds.")
def get_timeout_in_seconds(self) -> int:
"""
Gets an integer representing the maximum time in seconds to wait for the alarms to be cleared.
Default value: 600.
Returns:
(int): An integer representing the maximum time in seconds to wait for the alarms to be cleared.
"""
return self._timeout_in_seconds
def set_timeout_in_seconds(self, timeout_in_seconds: int):
"""
Sets the integer representation of the maximum time in seconds to wait for the alarms to be cleared.
Args:
timeout_in_seconds (int): An integer representing the maximum time to wait for the alarms to be cleared.
"""
self._timeout_in_seconds = timeout_in_seconds
def get_check_interval_in_seconds(self) -> int:
"""
Gets an integer representing the interval in seconds at which this instance will check the alarms again.
Default value: 3.
Returns:
(int): An integer representing the interval in seconds at which this instance will check the alarms again.
"""
return self._check_interval_in_seconds
def set_check_interval_in_seconds(self, check_interval_in_seconds: int) -> int:
"""
Sets the integer representation of the interval in seconds at which this instance will check the alarms again.
Default value: 3.
Returns:
(int): An integer representing the interval in seconds at which this instance will check the alarms again.
"""
return self._check_interval_in_seconds
def get_ssh_connection(self):
"""
Gets the SSH connection of this AlarmListKeywords instance.
Returns:
SSHConnection: the SSH connection of this AlarmListKeywords instance.
"""
return self._ssh_connection

View File

@ -106,3 +106,11 @@ class AlarmListObject:
"""
return self.time_stamp
def __eq__(self, alarm_list_object):
if not isinstance(alarm_list_object, AlarmListObject):
return False
return (self.get_alarm_id() == alarm_list_object.get_alarm_id() and
self.get_severity() == alarm_list_object.get_severity() and
self.get_entity_id() == alarm_list_object.get_entity_id())

View File

@ -96,6 +96,23 @@ class SystemHostOutput:
return hosts[0]
def get_controllers(self) -> list[SystemHostObject]:
"""
Gets the list of controllers
Returns (list[SystemHostObject]): the list of controllers
"""
hosts = list(
filter(
lambda host: 'controller' in host.get_personality(),
self.system_hosts,
)
)
if len(hosts) == 0:
raise KeywordException("No controller was found.")
return hosts
def get_computes(self) -> [SystemHostObject]:
"""
Gets the compute

View File

@ -62,3 +62,17 @@ class SystemStorageBackendOutput:
system_storage_backend_object.add_capabilities(value['capabilities'])
self.system_storage_backends.append(system_storage_backend_object)
def get_system_storage_backends(self) -> list[SystemStorageBackendObject]:
"""
Returns a list of objects representing each row of the table displayed as the result of executing the
'system storage-backend-list' command.
Args: None.
Returns:
list[SystemStorageBackendObject]: list of objects representing each row of the table displayed as the result of executing the
'system storage-backend-list' command.
"""
return self.system_storage_backends

View File

@ -31,6 +31,7 @@ from keywords.cloud_platform.system.host.system_host_list_keywords import System
from keywords.cloud_platform.system.host.system_host_lock_keywords import SystemHostLockKeywords
from keywords.cloud_platform.system.host.system_host_reboot_keywords import SystemHostRebootKeywords
from keywords.cloud_platform.system.host.system_host_swact_keywords import SystemHostSwactKeywords
from keywords.cloud_platform.system.storage.system_storage_backend_keywords import SystemStorageBackendKeywords
from keywords.files.file_keywords import FileKeywords
from keywords.k8s.deployments.kubectl_delete_deployments_keywords import KubectlDeleteDeploymentsKeywords
from keywords.k8s.pods.kubectl_create_pods_keywords import KubectlCreatePodsKeywords
@ -829,3 +830,68 @@ def test_dc_swact_host(request):
)
get_logger().log_info("Completed the 'test_dc_swact_host' test case.")
@mark.p0
@mark.lab_has_subcloud
def test_dc_system_health_pre_session():
"""
Test the health of the DC System to guarantee the following requirements in the central cloud and in the subclouds:
_ Application 'platform-integ-apps' is in 'applied' status.
_ No alarms are present.
_ The health of Kubernetes pods.
Setup:
_ Defines a reference to 'platform-integ-apps' app name.
_ Defines a list of opened SSH connections to the central cloud and to the subclouds.
Test:
_ For each SSH connection to a subcloud or to the central cloud in the list:
_ Asserts the status of the 'platform-integ-apps' application is 'applied'
_ Asserts that no alarms are present.
_ Assert the Kubernetes pods are healthy.
Teardown:
_ Not required.
"""
# The application 'platform-integ-apps' is responsible for the installation, management, and integration
# of essential platform applications running on the underlying infrastructure. It must be in 'applied' status.
platform_app = 'platform-integ-apps'
# List of DC system SSH connections.
ssh_connections = []
# Opens an SSH session to the active controller.
ssh_connection_active_controller = LabConnectionKeywords().get_active_controller_ssh()
# Retrieves the subclouds. Considers only subclouds that are online, managed, deploy complete, and synchronized.
dcmanager_subcloud_list_object_filter = DcManagerSubcloudListObjectFilter().get_healthy_subcloud_filter()
dcmanager_subcloud_list_keywords = DcManagerSubcloudListKeywords(ssh_connection_active_controller)
dcmanager_subcloud_list = dcmanager_subcloud_list_keywords.get_dcmanager_subcloud_list()
dcmanager_subcloud_list_objects_filtered = dcmanager_subcloud_list.get_dcmanager_subcloud_list_objects_filtered(dcmanager_subcloud_list_object_filter)
# Adds the central subcloud SSH connection to the list of SSH connections.
ssh_connections.append(ssh_connection_active_controller)
# Adds the subcloud SSH connection to the list of SSH connections.
for subcloud in dcmanager_subcloud_list_objects_filtered:
ssh_connections.append(LabConnectionKeywords().get_subcloud_ssh(subcloud.get_name()))
for ssh_connection in ssh_connections:
# Asserts the status of the <platform_app> application in the current SSH connection is 'applied',
# provided the subcloud or central cloud has storage backends.
system_storage_backend_keywords = SystemStorageBackendKeywords(ssh_connection)
system_storage_backends = system_storage_backend_keywords.get_system_storage_backend_list().get_system_storage_backends()
if len(system_storage_backends) != 0:
system_application_list_keywords = SystemApplicationListKeywords(ssh_connection)
app_status = system_application_list_keywords.get_system_application_list().get_application(platform_app).get_status()
assert app_status == 'applied', f"The status of application '{platform_app}' is not 'applied'. Current status: {app_status}."
# Asserts that no alarms are present
alarm_list_keywords = AlarmListKeywords(ssh_connection)
alarm_list_keywords.wait_for_all_alarms_cleared()
# If this test case executed the line above with no exception, all alarms were cleared.
# TODO: to check the health of Kubernetes pods on subclouds.