Wait for more alarms to clear on unlock

Some alarms were still there after unlock. These were causing
other sanity tests to fail.

Change-Id: I2f2d4a7feee96d964548c6792143928ab89ef189
This commit is contained in:
jpike
2025-03-17 08:46:18 -04:00
parent 219ba10832
commit 07fb7d10ba
2 changed files with 56 additions and 36 deletions

View File

@@ -2,6 +2,7 @@ import time
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
@@ -13,43 +14,46 @@ class SystemHostLockKeywords(BaseKeyword):
Keywords for System Host Lock commands
"""
def __init__(self, ssh_connection):
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection:
ssh_connection (SSHConnection): the ssh connection
"""
self.ssh_connection = ssh_connection
def lock_host(self, host_name: str) -> bool:
"""
Locks the given host name. It's expected that the host will already be unlocked
Args:
host_name (): the name of the host
Returns: True if it's successful
Args:
host_name (str): the name of the host
Returns:
bool: True if it's successful
Raises:
KeywordException: If lock does not occur in the given amount of time
"""
self.ssh_connection.send(source_openrc(f'system host-lock {host_name}'))
self.ssh_connection.send(source_openrc(f"system host-lock {host_name}"))
self.validate_success_return_code(self.ssh_connection)
is_host_locked = SystemHostLockKeywords(self.ssh_connection).wait_for_host_locked(host_name)
if not is_host_locked:
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
raise KeywordException(
"Lock host did not lock in the required time. Host values were: "
f"Operational: {host_value.get_operational()} "
f"Administrative: {host_value.get_administrative()} "
f"Availability: {host_value.get_availability()}"
)
raise KeywordException("Lock host did not lock in the required time. Host values were: " f"Operational: {host_value.get_operational()} " f"Administrative: {host_value.get_administrative()} " f"Availability: {host_value.get_availability()}")
return True
def wait_for_host_locked(self, host_name: str) -> bool:
"""
Waits for the given host name to be locked
Args:
host_name (): the host name
host_name (str): the host name
Returns:
bool: True if lock successful
"""
timeout = time.time() + 600
@@ -61,51 +65,54 @@ class SystemHostLockKeywords(BaseKeyword):
return False
def is_host_locked(self, host_name: str):
def is_host_locked(self, host_name: str) -> bool:
"""
Returns true if the host is locked
Args:
host_name (): the name of the host
host_name (str): the name of the host
Returns:
bool: True is host is locked
"""
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
if host_value.get_availability() == 'online' and host_value.get_administrative() == 'locked' and host_value.get_operational() == 'disabled':
if host_value.get_availability() == "online" and host_value.get_administrative() == "locked" and host_value.get_operational() == "disabled":
return True
return False
def unlock_host(self, host_name: str) -> bool:
"""
Unlocks the given host
Args:
ssh_connection (): the ssh connection
host_name (): the host name
host_name (str): the host name
Returns:
bool: True if the unlock is successful
Raises:
KeywordException: If unlock does not occur in the given time
"""
self.ssh_connection.send(source_openrc(f'system host-unlock {host_name}'))
self.ssh_connection.send(source_openrc(f"system host-unlock {host_name}"))
self.validate_success_return_code(self.ssh_connection)
is_host_unlocked = self.wait_for_host_unlocked(host_name)
if not is_host_unlocked:
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
raise KeywordException(
"Lock host did not lock in the required time. Host values were: "
f"Operational: {host_value.get_operational()} "
f"Administrative: {host_value.get_administrative()} "
f"Availability: {host_value.get_availability()}"
)
raise KeywordException("Unlock host did not unlock in the required time. Host values were: " f"Operational: {host_value.get_operational()} " f"Administrative: {host_value.get_administrative()} " f"Availability: {host_value.get_availability()}")
return True
def wait_for_host_unlocked(self, host_name: str, unlock_wait_timeout=1800) -> bool:
def wait_for_host_unlocked(self, host_name: str, unlock_wait_timeout: int = 1800) -> bool:
"""
Wait for the host to be unlocked
Args:
host_name (): the host name
unlock_wait_timeout: the amount of time in secs to wait for the host to unlock
host_name (str): the host name
unlock_wait_timeout (int): the amount of time in secs to wait for the host to unlock
Returns:
bool: True if host is unlocked
"""
timeout = time.time() + unlock_wait_timeout
@@ -123,21 +130,22 @@ class SystemHostLockKeywords(BaseKeyword):
return False
def is_host_unlocked(self, host_name: str):
def is_host_unlocked(self, host_name: str) -> bool:
"""
Returns true if the host is unlocked
Args:
host_name ():
host_name (str): the name of the host
Returns:
bool: True is host is unlocked
"""
is_host_list_ok = False
# Check System Host-List
host_value = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_host(host_name)
if host_value.get_availability() == 'available' and host_value.get_administrative() == 'unlocked' and host_value.get_operational() == 'enabled':
if host_value.get_availability() == "available" and host_value.get_administrative() == "unlocked" and host_value.get_operational() == "enabled":
get_logger().log_info("The host is in a good state from system host list.")
is_host_list_ok = True
@@ -145,7 +153,8 @@ class SystemHostLockKeywords(BaseKeyword):
alarms = AlarmListKeywords(self.ssh_connection).alarm_list()
is_alarms_list_ok = True
for alarm in alarms:
if alarm.get_alarm_id() == "250.001": # Configuration is out-of-date
# Configuration is out-of-date or apps need re-apply or app being reapplied
if alarm.get_alarm_id() == "250.001" or alarm.get_alarm_id() == "750.006" or alarm.get_alarm_id() == "750.004":
is_alarms_list_ok = False
if is_alarms_list_ok:
get_logger().log_info("There are no Config-out-of-date alarms")

View File

@@ -11,7 +11,7 @@ from framework.ssh.secure_transfer_file.secure_transfer_file import SecureTransf
from framework.ssh.secure_transfer_file.secure_transfer_file_enum import TransferDirection
from framework.ssh.secure_transfer_file.secure_transfer_file_input_object import SecureTransferFileInputObject
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals
from framework.validation.validation import validate_equals, validate_equals_with_retry
from framework.web.webdriver_core import WebDriverCore
from keywords.cloud_platform.dcmanager.dcmanager_alarm_summary_keywords import DcManagerAlarmSummaryKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
@@ -68,8 +68,19 @@ def test_check_alarms():
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
alarms = AlarmListKeywords(ssh_connection).alarm_list()
assert not alarms, "There were alarms found on the system"
def no_alarms() -> bool:
"""
Checks if there are no alarms on the system
Returns:
bool: True if no alarms
"""
alarms = AlarmListKeywords(ssh_connection).alarm_list()
return not alarms
# if another test created some temp alarms, lets give it some time
validate_equals_with_retry(no_alarms, True, "Validate that no alarms on the system", 300)
@mark.p0