Handle FM APIExceptions

Support for alarm overrides was recently committed. However,
the default override for alarm severity is set to 'unknown'.
If this default is applied on newly raised alarms, FM rejects
the alarms since an 'unknown' severity type is not supported.

In addition, added try-except blocks around the FM API calls.
APIException was being raised by FM, which was unhandled
previously.

Test Plan:
PASS: Verify that non-overridden alarm severity can raise
      alarm without any errors from FM.
PASS: Verify that any exception raised by FM is handled.

Story: 2008946
Task: 42852

Signed-off-by: Sabeel Ansari <Sabeel.Ansari@windriver.com>
Change-Id: Ife732114cdda8e10ffb0bc785c52399b13d0f8af
This commit is contained in:
Sabeel Ansari 2021-10-08 11:36:57 -04:00
parent bd035ec530
commit 97ba2434fa
1 changed files with 43 additions and 34 deletions

View File

@ -129,8 +129,10 @@ class FaultApiMgr(object):
# Check for annotation overrides
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
alarm_severity = snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY,
alarm_severity)
override = snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY,
alarm_severity)
if override != "unknown": # Cannot have "unknown" for fault severity
alarm_severity = override
return alarm_severity
@ -148,44 +150,51 @@ class FaultApiMgr(object):
else fm_constants.FM_ALARM_ID_CERT_EXPIRING_SOON
entity_inst_id = self.get_entity_instance_id(cert_name)
if state == fm_constants.FM_ALARM_STATE_SET:
# Raise alarm only if alarm does not already exist
if not self.fm_api.get_fault(alrm_id, entity_inst_id):
# Check for annotation override
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM,
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM) == 'disabled':
LOG.info('Found annotation override, disabling alarm. Suppressing %s' %
cert_name)
return
try:
if state == fm_constants.FM_ALARM_STATE_SET:
# Raise alarm only if alarm does not already exist
if not self.fm_api.get_fault(alrm_id, entity_inst_id):
# Check for annotation override
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot.get(constants.CERT_ALARM_ANNOTATION_ALARM,
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM) == 'disabled':
LOG.info('Found annotation override, disabling alarm. Suppressing %s' %
cert_name)
return
fault = fm_api.Fault(
alarm_id=alrm_id,
alarm_state=state,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CERTIFICATE,
entity_instance_id=entity_inst_id,
severity=self.get_severity(cert_name, expired_flag),
reason_text=self.get_reason_text(cert_name, expired_flag),
alarm_type=fm_constants.FM_ALARM_TYPE_9, # security-service
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_77, # certificate-expiration
proposed_repair_action="Renew certificate for entity identified",
suppression=False,
service_affecting=False)
fault = fm_api.Fault(
alarm_id=alrm_id,
alarm_state=state,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CERTIFICATE,
entity_instance_id=entity_inst_id,
severity=self.get_severity(cert_name, expired_flag),
reason_text=self.get_reason_text(cert_name, expired_flag),
alarm_type=fm_constants.FM_ALARM_TYPE_9,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_77,
proposed_repair_action="Renew certificate for entity identified",
suppression=False,
service_affecting=False)
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.set_fault(fault)
else:
if self.fm_api.get_fault(alrm_id, entity_inst_id):
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.clear_fault(alrm_id, entity_inst_id)
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.set_fault(fault)
else:
if self.fm_api.get_fault(alrm_id, entity_inst_id):
LOG.info('Setting fault for cert_name=%s, expired_flag=%s, state=%s' %
(cert_name, expired_flag, state))
self.fm_api.clear_fault(alrm_id, entity_inst_id)
except Exception as e:
LOG.warn(e)
def get_faults(self, expired_flag):
alrm_id = fm_constants.FM_ALARM_ID_CERT_EXPIRED if expired_flag \
else fm_constants.FM_ALARM_ID_CERT_EXPIRING_SOON
alarms = self.fm_api.get_faults_by_id(alrm_id)
alarms = None
try:
alarms = self.fm_api.get_faults_by_id(alrm_id)
except Exception as e:
LOG.warn(e)
return alarms
def collect_all_cert_alarms(self):