Cert-Alarm FM interface
Adding cert-alarm's fm interface to collect, raise and clear alarms. Also includes changes to utils' CERT_SNAPSHOT data structure to store data needed based on alarm's entity_instance_id and text. Depends-on: https://review.opendev.org/c/starlingx/fault/+/805075 Story: 2008946 Task: 42852 Signed-off-by: Sabeel Ansari <Sabeel.Ansari@windriver.com> Change-Id: I71096ccac14f63b6d127471697c93e88b164d5b5
This commit is contained in:
parent
c3d8d8822b
commit
0273db6417
|
@ -11,6 +11,7 @@ from oslo_config import cfg
|
|||
from oslo_log import log
|
||||
from oslo_service import periodic_task
|
||||
|
||||
from sysinv.cert_alarm import fm as fm_mgr
|
||||
from sysinv.cert_alarm import utils
|
||||
from sysinv.common import constants
|
||||
|
||||
|
@ -34,6 +35,7 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
|
|||
super(CertificateAlarmManager, self).__init__(CONF)
|
||||
self.audit_thread = None
|
||||
self.active_alarm_audit_thread = None
|
||||
self.fm_obj = fm_mgr.FaultApiMgr()
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
|
@ -60,11 +62,14 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
|
|||
LOG.info('Number of TLS secrets to process=%d' % len(all_secrets))
|
||||
for item in all_secrets:
|
||||
LOG.info('Processing item: %s' % item.metadata.name)
|
||||
(certname_secret, exp_date_secret, anno_data_secret) = \
|
||||
(certname_secret, exp_date_secret, anno_data_secret, mode_metadata) = \
|
||||
utils.collect_certificate_data_from_kube_secret(item)
|
||||
# if cert not present, exp_date will be None
|
||||
if exp_date_secret is not None:
|
||||
utils.add_cert_snapshot(certname_secret, exp_date_secret, anno_data_secret)
|
||||
utils.add_cert_snapshot(certname_secret,
|
||||
exp_date_secret,
|
||||
anno_data_secret,
|
||||
mode_metadata)
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
|
||||
|
@ -75,10 +80,14 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
|
|||
if utils.is_certname_already_processed(key) is True:
|
||||
continue
|
||||
|
||||
(certname_file, exp_date_file, anno_data_file) = utils.collect_certificate_data_from_file(key, value)
|
||||
(certname_file, exp_date_file, anno_data_file, mode_metadata_file) = \
|
||||
utils.collect_certificate_data_from_file(key, value)
|
||||
# if cert not present, exp_date will be None
|
||||
if exp_date_file is not None:
|
||||
utils.add_cert_snapshot(certname_file, exp_date_file, anno_data_file)
|
||||
utils.add_cert_snapshot(certname_file,
|
||||
exp_date_file,
|
||||
anno_data_file,
|
||||
mode_metadata_file)
|
||||
|
||||
# 3. Process SSL_CA certificates (special case, since there can be multiple files)
|
||||
LOG.info('Processing (3/3) ssl_ca certificate files...')
|
||||
|
@ -87,7 +96,7 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
|
|||
for entry in ssl_ca_data_list:
|
||||
# if cert not present, exp_date will be None
|
||||
if entry[1] is not None:
|
||||
utils.add_cert_snapshot(entry[0], entry[1], entry[2])
|
||||
utils.add_cert_snapshot(entry[0], entry[1], entry[2], entry[3])
|
||||
|
||||
utils.print_cert_snapshot()
|
||||
|
||||
|
@ -109,7 +118,7 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
|
|||
self.run_full_audit()
|
||||
|
||||
LOG.info('Cert-alarm active alarms auditing interval %s' %
|
||||
CONF.certalarm.active_alarm_audit_interval)
|
||||
CONF.certalarm.active_alarm_audit_interval)
|
||||
self.active_alarm_audit_thread = greenthread.spawn(self.active_alarm_audits)
|
||||
|
||||
def stop_audits(self):
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
#
|
||||
# Copyright (c) 2021 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from fm_api import constants as fm_constants
|
||||
from fm_api import fm_api
|
||||
from sysinv.cert_alarm import utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
SNAPSHOT_KEY_EXPIRING_SOON = 'EXPIRING_SOON'
|
||||
SNAPSHOT_KEY_EXPIRED = 'EXPIRED'
|
||||
|
||||
|
||||
class FaultApiMgr(object):
|
||||
def __init__(self):
|
||||
LOG.info('Initializing FaultApiMgr')
|
||||
self.fm_api = fm_api.FaultAPIs()
|
||||
self.ALARMS_SNAPSHOT = {}
|
||||
"""
|
||||
After an audit is completed, ALARMS_SNAPSHOT stores all active alarms
|
||||
ALARMS_SNAPSHOT is a dict of list.
|
||||
{
|
||||
EXPIRING_SOON: [certname1, certname2,...]
|
||||
EXPIRED: [certname7, certname8,...]
|
||||
}
|
||||
"""
|
||||
|
||||
def get_entity_instance_id(self, cert_name):
|
||||
"""
|
||||
Returns entity_instance_ids in format:
|
||||
system.certificate.mode=<mode>.uuid=<uuid>
|
||||
OR
|
||||
namespace=<namespace-name>.certificate=<certificate-name>
|
||||
OR
|
||||
namespace=<namespace-name>.secret=<secret-name>
|
||||
OR
|
||||
system.certificate.k8sRootCA
|
||||
"""
|
||||
id = []
|
||||
if cert_name in utils.CERT_SNAPSHOT:
|
||||
snapshot = utils.CERT_SNAPSHOT[cert_name]
|
||||
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_UUID:
|
||||
id.append("system.certificate.mode=%s.uuid=%s" %
|
||||
(cert_name, snapshot[utils.SNAPSHOT_KEY_uuid]))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_CERT_MGR:
|
||||
id.append("namespace=%s.certificate=%s" %
|
||||
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_cert]))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_SECRET:
|
||||
id.append("namespace=%s.secret=%s" %
|
||||
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_secret]))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_OTHER:
|
||||
id.append("system.certificate.%s" % cert_name)
|
||||
|
||||
id.append(" (%s=%s)" % (fm_constants.FM_ENTITY_TYPE_CERTIFICATE, cert_name))
|
||||
return ''.join(id)
|
||||
|
||||
def get_cert_name_from_entity_instance_id(self, instance_id):
|
||||
start = "(" + fm_constants.FM_ENTITY_TYPE_CERTIFICATE
|
||||
return instance_id[instance_id.find(start) + 1:instance_id.find(")")]
|
||||
|
||||
def get_reason_text(self, cert_name, expired_flag):
|
||||
txt = ["Certificate "]
|
||||
if cert_name in utils.CERT_SNAPSHOT:
|
||||
# Add entity related text
|
||||
snapshot = utils.CERT_SNAPSHOT[cert_name]
|
||||
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_UUID:
|
||||
txt.append("\'system certificate-show %s\' (mode=%s) " %
|
||||
(snapshot[utils.SNAPSHOT_KEY_uuid], cert_name))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_CERT_MGR:
|
||||
txt.append("namespace=%s, certificate=%s " %
|
||||
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_cert]))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_SECRET:
|
||||
txt.append("namespace=%s, secret=%s " %
|
||||
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_secret]))
|
||||
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_OTHER:
|
||||
txt.append(cert_name)
|
||||
|
||||
# Add Expired or Expiring
|
||||
if expired_flag:
|
||||
txt.append(" expired.")
|
||||
else:
|
||||
expiry_date = snapshot[utils.SNAPSHOT_KEY_EXPDATE]
|
||||
txt.append(" is expiring soon on ")
|
||||
txt.append(expiry_date.strftime("%Y-%m-%d, %H:%M:%S"))
|
||||
|
||||
else:
|
||||
LOG.error('Could not find certname %s in snapshot. Returning generic reason text' % cert_name)
|
||||
txt.append(cert_name)
|
||||
return ''.join(txt)
|
||||
|
||||
txt_str = ''.join(txt)
|
||||
LOG.debug('Alarm text: %s' % txt_str)
|
||||
return txt_str
|
||||
|
||||
def set_fault(self, cert_name, expired_flag, state):
|
||||
"""
|
||||
Set Fault calls the FM API to raise or clear alarm
|
||||
Params: cert-name: certificate name
|
||||
expired_flag: True/False
|
||||
Determines whether 'Expired' (True) or 'Expiring Soon' (False)
|
||||
Also determines the severity Critical (True) or Major (False)
|
||||
state: will determine SET or CLEAR
|
||||
"""
|
||||
|
||||
LOG.info('set_fault called with cert_name=%s, expired_flag=%s, state=%s' %
|
||||
(cert_name, expired_flag, state))
|
||||
|
||||
alrm_id = fm_constants.FM_ALARM_ID_CERT_EXPIRED if expired_flag \
|
||||
else fm_constants.FM_ALARM_ID_CERT_EXPIRING_SOON
|
||||
alrm_severity = fm_constants.FM_ALARM_SEVERITY_CRITICAL if expired_flag \
|
||||
else fm_constants.FM_ALARM_SEVERITY_MAJOR
|
||||
entity_inst_id = self.get_entity_instance_id(cert_name)
|
||||
|
||||
if state == fm_constants.FM_ALARM_STATE_SET:
|
||||
# Raise alarm only if alarm does not already exist
|
||||
if not self.fm_api.get_fault(alrm_id, entity_inst_id):
|
||||
fault = fm_api.Fault(
|
||||
alarm_id=alrm_id,
|
||||
alarm_state=state,
|
||||
entity_type_id=fm_constants.FM_ENTITY_TYPE_CERTIFICATE,
|
||||
entity_instance_id=entity_inst_id,
|
||||
severity=alrm_severity,
|
||||
reason_text=self.get_reason_text(cert_name, expired_flag),
|
||||
alarm_type=fm_constants.FM_ALARM_TYPE_9, # security-service
|
||||
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_77, # certificate-expiration
|
||||
proposed_repair_action="Renew certificate for entity identified",
|
||||
suppression=False,
|
||||
service_affecting=False)
|
||||
|
||||
self.fm_api.set_fault(fault)
|
||||
else:
|
||||
if self.fm_api.get_fault(alrm_id, entity_inst_id):
|
||||
self.fm_api.clear_fault(alrm_id, entity_inst_id)
|
||||
|
||||
def get_faults(self, expired_flag):
|
||||
alrm_id = fm_constants.FM_ALARM_ID_CERT_EXPIRED if expired_flag \
|
||||
else fm_constants.FM_ALARM_ID_CERT_EXPIRING_SOON
|
||||
alarms = self.fm_api.get_faults_by_id(alrm_id)
|
||||
return alarms
|
||||
|
||||
def collect_all_cert_alarms(self):
|
||||
LOG.debug('collect_all_cert_alarms called')
|
||||
|
||||
# Expiring Soon alarms
|
||||
exp_soon_alarms = self.get_faults(False)
|
||||
self.add_alarms_snapshot(SNAPSHOT_KEY_EXPIRING_SOON, exp_soon_alarms)
|
||||
|
||||
# Expired alarms
|
||||
exprd_alarms = self.get_faults(True)
|
||||
self.add_alarms_snapshot(SNAPSHOT_KEY_EXPIRED, exprd_alarms)
|
||||
|
||||
def reset_alarms_snapshot(self):
|
||||
self.ALARMS_SNAPSHOT = {}
|
||||
|
||||
def print_alarms_snapshot(self):
|
||||
LOG.info('Alarms snapshot = %s' % self.ALARMS_SNAPSHOT)
|
||||
|
||||
def add_alarms_snapshot(self, key, alarms):
|
||||
cert_names = []
|
||||
if alarms:
|
||||
for item in alarms:
|
||||
cert_names.append(self.get_cert_name_from_entity_instance_id(item.entity_instance_id))
|
||||
|
||||
self.ALARMS_SNAPSHOT[key] = cert_names
|
|
@ -24,6 +24,18 @@ ANNOTATIONS = 'annotations'
|
|||
CERTMGR_CERT_NAME = "cert-manager.io/certificate-name"
|
||||
|
||||
SNAPSHOT_KEY_EXPDATE = 'expiry_date'
|
||||
# Mode will determine how alarm constructs entity_instance_id and description
|
||||
SNAPSHOT_KEY_MODE = 'mode'
|
||||
SNAPSHOT_KEY_uuid = 'mode_uuid'
|
||||
SNAPSHOT_KEY_k8s_ns = 'mode_k8s_ns'
|
||||
SNAPSHOT_KEY_k8s_cert = 'mode_k8s_cert'
|
||||
SNAPSHOT_KEY_k8s_secret = 'mode_k8s_secret'
|
||||
|
||||
# "mode" values can be:
|
||||
MODE_UUID = 'uuid'
|
||||
MODE_SECRET = 'secret'
|
||||
MODE_CERT_MGR = 'certmgr'
|
||||
MODE_OTHER = 'other'
|
||||
|
||||
CERT_SNAPSHOT = {}
|
||||
"""
|
||||
|
@ -32,10 +44,16 @@ Internal dict is expiry_date and all annotations collected via k8s secret
|
|||
{
|
||||
certname1: {
|
||||
expiry_date: date
|
||||
alarm: enabled
|
||||
alarm_before: 30d
|
||||
alarm_severity: unknown
|
||||
alarm_text: ""
|
||||
alarm: <enabled/disabled>
|
||||
alarm_before: <days>
|
||||
alarm_severity: <severity>
|
||||
alarm_text: <custom pretext>
|
||||
mode: <mode>
|
||||
mode_uuid: <uuid>
|
||||
mode_k8s_ns: <namespace>
|
||||
mode_k8s_cert: <certificate>
|
||||
mode_k8s_secret: <secret>
|
||||
mode_other: <other>
|
||||
}
|
||||
certname2: {
|
||||
...
|
||||
|
@ -78,7 +96,7 @@ def collect_certificate_data_from_file(certname, pem_file):
|
|||
"""
|
||||
Collect certificate data
|
||||
Input: certname, pem_file
|
||||
Returns: (certname, expiration_date, annotation_data)
|
||||
Returns: (certname, expiration_date, annotation_data, mode_metadata)
|
||||
expiration_date will be None if data missing or error
|
||||
annotation_data will be set to defaults
|
||||
"""
|
||||
|
@ -90,12 +108,13 @@ def collect_certificate_data_from_file(certname, pem_file):
|
|||
cert_buf = f.read()
|
||||
except IOError:
|
||||
LOG.info('Certificate %s file not found' % certname)
|
||||
return (certname, None, None)
|
||||
return (certname, None, None, None)
|
||||
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf)
|
||||
expiration_date = get_cert_expiration_date(cert)
|
||||
annotation_data = get_default_annotation_values()
|
||||
return (certname, expiration_date, annotation_data)
|
||||
mode_metadata = get_file_mode_metadata(certname)
|
||||
return (certname, expiration_date, annotation_data, mode_metadata)
|
||||
|
||||
|
||||
def is_certname_already_processed(certname):
|
||||
|
@ -141,6 +160,7 @@ def collect_certificate_data_from_kube_secret(secretobj):
|
|||
Returns: (certname, expiration_date, annotation_data)
|
||||
expiration_date will be None if data missing or error
|
||||
annotation_data from k8s Secret or Certificate CRD
|
||||
mode_metadata includes details of namespace/cert/secret
|
||||
"""
|
||||
certname = secretobj.metadata.name
|
||||
LOG.debug('collect_certificate_data_from_kube_secret called for %s' % certname)
|
||||
|
@ -152,9 +172,9 @@ def collect_certificate_data_from_kube_secret(secretobj):
|
|||
txt_crt = base64.b64decode(secretobj.data['tls.crt'])
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, txt_crt)
|
||||
expiration_date = get_cert_expiration_date(cert)
|
||||
annotation_data = get_annotation_data(secretobj)
|
||||
LOG.debug('returning (%s, %s, %s)' % (certname, expiration_date, annotation_data))
|
||||
return (certname, expiration_date, annotation_data)
|
||||
annotation_data, mode_metadata = get_annotation_data(secretobj)
|
||||
LOG.debug('returning (%s, %s, %s, %s)' % (certname, expiration_date, annotation_data, mode_metadata))
|
||||
return (certname, expiration_date, annotation_data, mode_metadata)
|
||||
|
||||
|
||||
def get_annotation_data(secretobj):
|
||||
|
@ -171,6 +191,11 @@ def get_annotation_data(secretobj):
|
|||
annotation_dict = dict()
|
||||
patch_needed = False
|
||||
|
||||
mode_metadata = get_default_mode_metadata()
|
||||
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_SECRET
|
||||
mode_metadata[SNAPSHOT_KEY_k8s_ns] = ns
|
||||
mode_metadata[SNAPSHOT_KEY_k8s_secret] = secretobj.metadata.name
|
||||
|
||||
cm_managed = False
|
||||
# Annotations can be None, so need a check first
|
||||
if secretobj.metadata.annotations is not None:
|
||||
|
@ -181,6 +206,8 @@ def get_annotation_data(secretobj):
|
|||
certobj = kube_op.get_custom_resource(sys_kube.CERT_MANAGER_GROUP, V1_ALPHA_3,
|
||||
ns, PLURAL_NAME_CERT, crd_cert_name)
|
||||
cm_managed = True
|
||||
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_CERT_MGR
|
||||
mode_metadata[SNAPSHOT_KEY_k8s_cert] = crd_cert_name
|
||||
|
||||
# Note: unlike k8s secret obj, get_custom_resource() returns a dict()
|
||||
certobj_annotation = certobj[METADATA][ANNOTATIONS]
|
||||
|
@ -204,7 +231,7 @@ def get_annotation_data(secretobj):
|
|||
secretobj.metadata.annotations = annotation_dict
|
||||
kube_op.kube_patch_secret(secretobj.metadata.name, ns, secretobj)
|
||||
|
||||
return annotation_dict
|
||||
return annotation_dict, mode_metadata
|
||||
|
||||
|
||||
def process_annotation_data(annotation_dict):
|
||||
|
@ -251,25 +278,47 @@ def print_cert_snapshot():
|
|||
LOG.info('Cert snapshot = %s' % CERT_SNAPSHOT)
|
||||
|
||||
|
||||
def add_cert_snapshot(certname, expirydate, annotation_data):
|
||||
def add_cert_snapshot(certname, expirydate, annotation_data, mode_metadata):
|
||||
global CERT_SNAPSHOT
|
||||
internaldict = dict()
|
||||
internaldict[SNAPSHOT_KEY_EXPDATE] = expirydate
|
||||
internaldict[constants.CERT_ALARM_ANNOTATION_ALARM] = \
|
||||
annotation_data[constants.CERT_ALARM_ANNOTATION_ALARM]
|
||||
internaldict[constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE] = \
|
||||
annotation_data[constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE]
|
||||
internaldict[constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY] = \
|
||||
annotation_data[constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY]
|
||||
internaldict[constants.CERT_ALARM_ANNOTATION_ALARM_TEXT] = \
|
||||
annotation_data[constants.CERT_ALARM_ANNOTATION_ALARM_TEXT]
|
||||
internaldict.update(annotation_data)
|
||||
internaldict.update(mode_metadata)
|
||||
CERT_SNAPSHOT[certname] = internaldict
|
||||
|
||||
|
||||
def get_default_annotation_values():
|
||||
ret = dict()
|
||||
ret[constants.CERT_ALARM_ANNOTATION_ALARM] = constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM
|
||||
ret[constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE] = constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_BEFORE
|
||||
ret[constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY] = constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_SEVERITY
|
||||
ret[constants.CERT_ALARM_ANNOTATION_ALARM_TEXT] = constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_TEXT
|
||||
return ret
|
||||
return {
|
||||
constants.CERT_ALARM_ANNOTATION_ALARM:
|
||||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM,
|
||||
constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE:
|
||||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_BEFORE,
|
||||
constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY:
|
||||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_SEVERITY,
|
||||
constants.CERT_ALARM_ANNOTATION_ALARM_TEXT:
|
||||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_TEXT
|
||||
}
|
||||
|
||||
|
||||
def get_default_mode_metadata():
|
||||
return {
|
||||
SNAPSHOT_KEY_MODE: "",
|
||||
SNAPSHOT_KEY_uuid: "",
|
||||
SNAPSHOT_KEY_k8s_ns: "",
|
||||
SNAPSHOT_KEY_k8s_cert: "",
|
||||
SNAPSHOT_KEY_k8s_secret: ""
|
||||
}
|
||||
|
||||
|
||||
def get_file_mode_metadata(certname):
|
||||
mode_metadata = get_default_mode_metadata()
|
||||
# For k8s_root_ca and etcd, set "other". Rest should have UUID in sysinv db.
|
||||
# In case of ssl & docker, if managed by cert_mgr, this wont be called from run_full_audit()
|
||||
# so, can assume that we will have UUID in db
|
||||
if certname is constants.CERT_MODE_KUBERNETES_ROOT_CA or certname is constants.CERT_MODE_ETCD:
|
||||
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_OTHER
|
||||
else:
|
||||
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_UUID
|
||||
# TODO() get UUID from DB
|
||||
|
||||
return mode_metadata
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
"""Test class for Sysinv CertAlarm"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import os.path
|
||||
from OpenSSL import crypto
|
||||
|
||||
|
@ -77,6 +76,13 @@ class CertAlarmTestCase(base.DbTestCase):
|
|||
self.assertIn(constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE, annotation_data)
|
||||
self.assertIn(constants.CERT_ALARM_ANNOTATION_ALARM_SEVERITY, annotation_data)
|
||||
self.assertIn(constants.CERT_ALARM_ANNOTATION_ALARM_TEXT, annotation_data)
|
||||
mode_metadata = ret[3]
|
||||
self.assertIsNotNone(mode_metadata)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_MODE, mode_metadata)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_uuid, mode_metadata)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_ns, mode_metadata)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_cert, mode_metadata)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_secret, mode_metadata)
|
||||
|
||||
def test_process_annotation_data(self):
|
||||
# 1. Test with blank dict - should return default annotations + patch_needed True
|
||||
|
@ -160,17 +166,16 @@ class CertAlarmTestCase(base.DbTestCase):
|
|||
certname1 = 'c1'
|
||||
certname2 = 'c2'
|
||||
now = datetime.now()
|
||||
now2 = datetime.now() - timedelta(days=1)
|
||||
ann_data1 = cert_alarm_utils.get_default_annotation_values()
|
||||
ann_data2 = cert_alarm_utils.get_default_annotation_values()
|
||||
cert_alarm_utils.add_cert_snapshot(certname1, now, ann_data1)
|
||||
cert_alarm_utils.add_cert_snapshot(certname2, now2, ann_data2)
|
||||
ann_data = cert_alarm_utils.get_default_annotation_values()
|
||||
mode_metadata = cert_alarm_utils.get_default_mode_metadata()
|
||||
cert_alarm_utils.add_cert_snapshot(certname1, now, ann_data, mode_metadata)
|
||||
cert_alarm_utils.add_cert_snapshot(certname2, now, ann_data, mode_metadata)
|
||||
|
||||
self.assertEqual(len(cert_alarm_utils.CERT_SNAPSHOT), 2)
|
||||
item1 = cert_alarm_utils.CERT_SNAPSHOT[certname1]
|
||||
self.assertEqual(item1[cert_alarm_utils.SNAPSHOT_KEY_EXPDATE], now)
|
||||
item2 = cert_alarm_utils.CERT_SNAPSHOT[certname2]
|
||||
self.assertEqual(item2[cert_alarm_utils.SNAPSHOT_KEY_EXPDATE], now2)
|
||||
self.assertEqual(item2[cert_alarm_utils.SNAPSHOT_KEY_EXPDATE], now)
|
||||
|
||||
cert_alarm_utils.reset_cert_snapshot()
|
||||
self.assertEqual(len(cert_alarm_utils.CERT_SNAPSHOT), 0)
|
||||
|
@ -190,3 +195,17 @@ class CertAlarmTestCase(base.DbTestCase):
|
|||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_SEVERITY)
|
||||
self.assertEqual(data[constants.CERT_ALARM_ANNOTATION_ALARM_TEXT],
|
||||
constants.CERT_ALARM_DEFAULT_ANNOTATION_ALARM_TEXT)
|
||||
|
||||
def test_get_default_mode_metadata(self):
|
||||
data = cert_alarm_utils.get_default_mode_metadata()
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_MODE, data)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_uuid, data)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_ns, data)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_cert, data)
|
||||
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_secret, data)
|
||||
|
||||
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_MODE], "")
|
||||
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_uuid], "")
|
||||
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_ns], "")
|
||||
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_cert], "")
|
||||
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_secret], "")
|
||||
|
|
Loading…
Reference in New Issue