Alarm overrides + other minor changes

During testing, it was observed that alarm annotations
weren't considered in the initial implementation of this
feature. This commit adds functionality to allow user to
override cert-alarm settings via k8s annotations.

Other minor commits are related to:
- Changing the log statement position when alarm is
  raised/cleared to avoid ambuguity
- Added space character after "other" alarm types so the
  output alarm text is as expected
- Using a dict.get(key, default) rather than dict[key] in
  order to avoid key errors.

Test Plan:

PASS: Verify the alarm annotations are applied in case of
      enabling/disabling alarms, changing severity, pre-text
      values etc.
PASS: Verify the log statements are not ambiguous
PASS: Verify dict key errors do not occur
PASS: Verify the output alarm text is as expected

Story: 2008946
Task: 42852

Signed-off-by: Sabeel Ansari <Sabeel.Ansari@windriver.com>
Change-Id: I9a986a9fc1fff0a05485f9fe953162904bf32400
This commit is contained in:
Sabeel Ansari 2021-10-04 09:13:26 -04:00
parent 6b2d8a7a50
commit bd035ec530
2 changed files with 40 additions and 8 deletions

View File

@ -178,7 +178,8 @@ class CertAlarmAudit(object):
snapshot = utils.CERT_SNAPSHOT[cert_name]
expiry = snapshot[utils.SNAPSHOT_KEY_EXPDATE] - datetime.now()
alarm_before = self.parse_time(snapshot[constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE])
alarm_before = self.parse_time(snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE,
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_BEFORE))
renew_before = None
if utils.SNAPSHOT_KEY_RENEW_BEFORE in snapshot:
renew_before = self.parse_time(snapshot[utils.SNAPSHOT_KEY_RENEW_BEFORE])

View File

@ -9,6 +9,7 @@ from oslo_log import log
from fm_api import constants as fm_constants
from fm_api import fm_api
from sysinv.cert_alarm import utils
from sysinv.common import constants
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -78,10 +79,19 @@ class FaultApiMgr(object):
return 'ssl_ca' if 'ssl_ca' in cert_name else cert_name
def get_reason_text(self, cert_name, expired_flag):
txt = ["Certificate "]
txt = []
if cert_name in utils.CERT_SNAPSHOT:
# Add entity related text
snapshot = utils.CERT_SNAPSHOT[cert_name]
# Append alarm_text from annotation as pre-text
pretext = snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM_TEXT,
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_TEXT)
if len(pretext) > 0:
txt.append(pretext)
txt.append(' ')
txt.append("Certificate ")
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.UUID:
txt.append("\'system certificate-show %s\' (mode=%s) " %
(snapshot[utils.UUID], self.get_mode(cert_name)))
@ -93,6 +103,7 @@ class FaultApiMgr(object):
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_secret]))
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_OTHER:
txt.append(cert_name)
txt.append(' ')
# Add Expired or Expiring
if expired_flag:
@ -111,6 +122,18 @@ class FaultApiMgr(object):
LOG.debug('Alarm text: %s' % txt_str)
return txt_str
def get_severity(self, cert_name, expired_flag):
alarm_severity = fm_constants.FM_ALARM_SEVERITY_CRITICAL if expired_flag \
else fm_constants.FM_ALARM_SEVERITY_MAJOR
# Check for annotation overrides
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
alarm_severity = snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY,
alarm_severity)
return alarm_severity
def set_fault(self, cert_name, expired_flag, state):
"""
Set Fault calls the FM API to raise or clear alarm
@ -121,24 +144,28 @@ class FaultApiMgr(object):
state: will determine SET or CLEAR
"""
LOG.info('set_fault called with cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
alrm_id = fm_constants.FM_ALARM_ID_CERT_EXPIRED if expired_flag \
else fm_constants.FM_ALARM_ID_CERT_EXPIRING_SOON
alrm_severity = fm_constants.FM_ALARM_SEVERITY_CRITICAL if expired_flag \
else fm_constants.FM_ALARM_SEVERITY_MAJOR
entity_inst_id = self.get_entity_instance_id(cert_name)
if state == fm_constants.FM_ALARM_STATE_SET:
# Raise alarm only if alarm does not already exist
if not self.fm_api.get_fault(alrm_id, entity_inst_id):
# Check for annotation override
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM,
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM) == 'disabled':
LOG.info('Found annotation override, disabling alarm. Suppressing %s' %
cert_name)
return
fault = fm_api.Fault(
alarm_id=alrm_id,
alarm_state=state,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CERTIFICATE,
entity_instance_id=entity_inst_id,
severity=alrm_severity,
severity=self.get_severity(cert_name, expired_flag),
reason_text=self.get_reason_text(cert_name, expired_flag),
alarm_type=fm_constants.FM_ALARM_TYPE_9, # security-service
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_77, # certificate-expiration
@ -146,9 +173,13 @@ class FaultApiMgr(object):
suppression=False,
service_affecting=False)
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.set_fault(fault)
else:
if self.fm_api.get_fault(alrm_id, entity_inst_id):
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.clear_fault(alrm_id, entity_inst_id)
def get_faults(self, expired_flag):