Merge "Cert-Alarm audit"

This commit is contained in:
Zuul 2021-09-24 18:56:05 +00:00 committed by Gerrit Code Review
commit ba6521c73c
5 changed files with 337 additions and 105 deletions

View File

@ -0,0 +1,228 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from datetime import datetime
from datetime import timedelta
from fm_api import constants as fm_constants
from itertools import chain
from oslo_log import log
import re
from sysinv.cert_alarm import fm as fm_mgr
from sysinv.cert_alarm import utils
from sysinv.common import constants
from sysinv.common import kubernetes as sys_kube
LOG = log.getLogger(__name__)
class CertAlarmAudit(object):
def __init__(self):
LOG.info('Initializing CertAlarmAudit')
self.fm_obj = fm_mgr.FaultApiMgr()
# ============== Full audit ===================
def run_full_audit(self):
"""
Run full audit
"""
LOG.info('Running cert-alarm full audit')
# Reset both CERT_SNAPSHOT & ALARM_SNAPSHOT
utils.reset_cert_snapshot()
self.fm_obj.reset_alarms_snapshot()
self.fm_obj.reset_entityid_to_certname_map()
# Collect CERT_SNAPSHOT
self.collect_cert_snapshot()
utils.print_cert_snapshot()
self.compute_action_full_audit()
LOG.info('cert-alarm full completed')
def collect_cert_snapshot(self):
"""
Collects snapshot of the certificates in the following order:
1. K8s secret/certificates
2. Platform certificate files
3. SSL_CA certificate files
"""
# 1. Process all k8s secrets/certificates
LOG.debug('Processing k8s secret/certificates...')
try:
all_secrets = utils.get_tls_secrets_from_all_ns()
if all_secrets:
LOG.info('Number of TLS secrets to process=%d' % len(all_secrets))
for item in all_secrets:
LOG.info('Processing item: %s' % item.metadata.name)
(certname_secret, exp_date_secret, anno_data_secret, mode_metadata) = \
utils.collect_certificate_data_from_kube_secret(item)
# if cert not present, exp_date will be None
if exp_date_secret is not None:
utils.add_cert_snapshot(certname_secret,
exp_date_secret,
anno_data_secret,
mode_metadata)
except Exception as e:
LOG.error(e)
# 2. Process platform certs stored as pem files
LOG.debug('Processing platform certificate files...')
for key, value in constants.CERT_LOCATION_MAP.items():
LOG.info('Processing item: %s at location %s' % (key, value))
if utils.is_certname_already_processed(key) is True:
continue
(certname_file, exp_date_file, anno_data_file, mode_metadata_file) = \
utils.collect_certificate_data_from_file(key, value)
# if cert not present, exp_date will be None
if exp_date_file is not None:
utils.add_cert_snapshot(certname_file,
exp_date_file,
anno_data_file,
mode_metadata_file)
# 3. Process SSL_CA certificates (special case, since there can be multiple files)
LOG.debug('Processing ssl_ca certificate files...')
ssl_ca_data_list = utils.collect_certificate_data_for_ssl_cas()
LOG.debug('ssl_ca_data_list=%s' % ssl_ca_data_list)
for entry in ssl_ca_data_list:
# if cert not present, exp_date will be None
if entry[1] is not None:
utils.add_cert_snapshot(entry[0], entry[1], entry[2], entry[3])
def compute_action_full_audit(self):
for cert_name in utils.CERT_SNAPSHOT:
self.compute_action(cert_name)
# ============== Active Alarm audit ===================
def run_active_alarm_audit(self):
"""
Run audit only on active alarms
"""
LOG.info('Running cert-alarm active_alarm_audit')
# Collect ALARM_SNAPSHOT
self.fm_obj.collect_all_cert_alarms()
self.compute_action_active_alarms()
LOG.info('cert-alarm active_alarm_audit completed')
def compute_action_active_alarms(self):
# Create single list of expiring_soon & expired certs
all_active_alarms = list(chain(*self.fm_obj.ALARMS_SNAPSHOT.values()))
for cert_name in all_active_alarms:
# 1. First refresh expiry date snapshot data
self.refresh_expiry_data(cert_name)
# 2. Now check dates and compute_action
self.compute_action(cert_name)
def refresh_expiry_data(self, cert_name):
if cert_name not in utils.CERT_SNAPSHOT:
LOG.error('Could not find cert %s in snapshot to refresh expiry data' % cert_name)
else:
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_SECRET or \
snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_CERT_MGR:
# mode is k8s secret
kube_op = sys_kube.KubeOperator()
try:
secobj = kube_op.kube_get_secret(snapshot[utils.SNAPSHOT_KEY_k8s_secret],
snapshot[utils.SNAPSHOT_KEY_k8s_ns])
(certname_secret, exp_date_secret, anno_data_secret, mode_metadata) = \
utils.collect_certificate_data_from_kube_secret(secobj)
if exp_date_secret is not None:
utils.add_cert_snapshot(certname_secret,
exp_date_secret,
anno_data_secret,
mode_metadata)
except Exception as e:
LOG.error("Failed to retrieve k8s_secret %s" % e)
else:
# mode is file
(certname_file, exp_date_file, anno_data_file, mode_metadata_file) = \
utils.collect_certificate_data_from_file(cert_name,
snapshot[utils.SNAPSHOT_KEY_FILE_LOC])
if exp_date_file is not None:
utils.add_cert_snapshot(certname_file,
exp_date_file,
anno_data_file,
mode_metadata_file)
# ============== Common ===================
@staticmethod
def parse_time(time_str):
regex = re.compile(r'((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?')
parts = regex.match(time_str).groupdict()
time_params = {}
for name, param in parts.items():
if param:
time_params[name] = int(param)
return timedelta(**time_params)
def compute_action(self, cert_name):
"""
Computes any action required based on parameters passed and calls FM API
Input: cert_name: Certificate name
"""
if cert_name not in utils.CERT_SNAPSHOT:
LOG.error('Could not find cert %s in snapshot' % cert_name)
return
snapshot = utils.CERT_SNAPSHOT[cert_name]
expiry = snapshot[utils.SNAPSHOT_KEY_EXPDATE] - datetime.now()
alarm_before = self.parse_time(snapshot[constants.CERT_ALARM_ANNOTATION_ALARM_BEFORE])
renew_before = None
if utils.SNAPSHOT_KEY_RENEW_BEFORE in snapshot:
renew_before = self.parse_time(snapshot[utils.SNAPSHOT_KEY_RENEW_BEFORE])
LOG.debug('cert_name=%s, expiry=%s, alarm_before=%s, renew_before=%s'
% (cert_name, expiry.days, alarm_before.days, renew_before.days))
days_to_expiry = expiry.days
alarm_before_days = alarm_before.days
renew_before_days = renew_before.days
# set threshold date to raise alarms
if renew_before_days:
# if renew_before_days valid, take latest (smaller timedelta) of two dates as threshold
threshold = renew_before_days if renew_before_days < alarm_before_days else alarm_before_days
else:
threshold = alarm_before_days
if days_to_expiry > threshold:
self.clear_expiring_soon(cert_name)
self.clear_expired(cert_name)
else:
if days_to_expiry > 0:
self.raise_expiring_soon(cert_name)
else:
# Expired. Clear expiring-soon & raise expired
self.clear_expiring_soon(cert_name)
self.raise_expired(cert_name)
def raise_expiring_soon(self, cert_name):
self.fm_obj.set_fault(cert_name,
False,
fm_constants.FM_ALARM_STATE_SET)
def clear_expiring_soon(self, cert_name):
self.fm_obj.set_fault(cert_name,
False,
fm_constants.FM_ALARM_STATE_CLEAR)
def raise_expired(self, cert_name):
self.fm_obj.set_fault(cert_name,
True,
fm_constants.FM_ALARM_STATE_SET)
def clear_expired(self, cert_name):
self.fm_obj.set_fault(cert_name,
True,
fm_constants.FM_ALARM_STATE_CLEAR)

View File

@ -11,9 +11,8 @@ from oslo_config import cfg
from oslo_log import log
from oslo_service import periodic_task
from sysinv.cert_alarm import fm as fm_mgr
from sysinv.cert_alarm import utils
from sysinv.common import constants
from sysinv.cert_alarm import audit as audit_mgr
from sysinv.cert_mon import utils as certmon_utils
LOG = log.getLogger(__name__)
@ -35,87 +34,27 @@ class CertificateAlarmManager(periodic_task.PeriodicTasks):
super(CertificateAlarmManager, self).__init__(CONF)
self.audit_thread = None
self.active_alarm_audit_thread = None
self.fm_obj = fm_mgr.FaultApiMgr()
self.audit_obj = audit_mgr.CertAlarmAudit()
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
@periodic_task.periodic_task(spacing=CONF.certalarm.audit_interval)
def periodic_full_audit(self):
def periodic_full_audit(self, context):
# this task runs every CONF.certalarm.audit_interval (24 hours)
self.run_full_audit()
def run_full_audit(self):
"""
Run full audit
"""
LOG.info('Running cert-alarm full audit')
utils.reset_cert_snapshot()
# 1. Process all k8s secrets/certificates
LOG.info('Processing (1/3) k8s secret/certificates...')
try:
all_secrets = utils.get_tls_secrets_from_all_ns()
if all_secrets:
LOG.info('Number of TLS secrets to process=%d' % len(all_secrets))
for item in all_secrets:
LOG.info('Processing item: %s' % item.metadata.name)
(certname_secret, exp_date_secret, anno_data_secret, mode_metadata) = \
utils.collect_certificate_data_from_kube_secret(item)
# if cert not present, exp_date will be None
if exp_date_secret is not None:
utils.add_cert_snapshot(certname_secret,
exp_date_secret,
anno_data_secret,
mode_metadata)
except Exception as e:
LOG.error(e)
# 2. Process platform certs stored as pem files
LOG.info('Processing (2/3) platform certificate files...')
for key, value in constants.CERT_LOCATION_MAP.items():
LOG.info('Processing item: %s at location %s' % (key, value))
if utils.is_certname_already_processed(key) is True:
continue
(certname_file, exp_date_file, anno_data_file, mode_metadata_file) = \
utils.collect_certificate_data_from_file(key, value)
# if cert not present, exp_date will be None
if exp_date_file is not None:
utils.add_cert_snapshot(certname_file,
exp_date_file,
anno_data_file,
mode_metadata_file)
# 3. Process SSL_CA certificates (special case, since there can be multiple files)
LOG.info('Processing (3/3) ssl_ca certificate files...')
ssl_ca_data_list = utils.collect_certificate_data_for_ssl_cas()
LOG.debug('ssl_ca_data_list=%s' % ssl_ca_data_list)
for entry in ssl_ca_data_list:
# if cert not present, exp_date will be None
if entry[1] is not None:
utils.add_cert_snapshot(entry[0], entry[1], entry[2], entry[3])
utils.print_cert_snapshot()
self.audit_obj.run_full_audit()
@periodic_task.periodic_task(spacing=CONF.certalarm.active_alarm_audit_interval)
def periodic_active_alarm_audit(self):
def periodic_active_alarm_audit(self, context):
# this task runs every CONF.certalarm.active_alarm_audit_interval (1 hour)
self.run_active_alarm_audit()
def run_active_alarm_audit(self):
"""
Run audit only on active alarms
"""
LOG.info('Running cert-alarm active_alarm_audit')
# TODO()
self.audit_obj.run_active_alarm_audit()
def start_audits(self):
certmon_utils.init_keystone_auth_opts()
LOG.info('Cert-alarm auditing interval %s' % CONF.certalarm.audit_interval)
self.audit_thread = greenthread.spawn(self.audit_certalarms)
self.run_full_audit()
self.audit_obj.run_full_audit()
LOG.info('Cert-alarm active alarms auditing interval %s' %
CONF.certalarm.active_alarm_audit_interval)

View File

@ -13,15 +13,14 @@ from sysinv.cert_alarm import utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
SNAPSHOT_KEY_EXPIRING_SOON = 'EXPIRING_SOON'
SNAPSHOT_KEY_EXPIRED = 'EXPIRED'
EXPIRING_SOON = 'EXPIRING_SOON'
EXPIRED = 'EXPIRED'
class FaultApiMgr(object):
def __init__(self):
LOG.info('Initializing FaultApiMgr')
self.fm_api = fm_api.FaultAPIs()
self.ALARMS_SNAPSHOT = {}
"""
After an audit is completed, ALARMS_SNAPSHOT stores all active alarms
ALARMS_SNAPSHOT is a dict of list.
@ -30,6 +29,13 @@ class FaultApiMgr(object):
EXPIRED: [certname7, certname8,...]
}
"""
self.ALARMS_SNAPSHOT = {}
"""
Entity ID to cert_name mapping
Due to the nature of entity_id strings generated, we need a map
to lookup cert_name's in utils.CERT_SNAPSHOT during audits
"""
self.ENTITYID_TO_CERTNAME_MAP = {}
def get_entity_instance_id(self, cert_name):
"""
@ -42,36 +48,43 @@ class FaultApiMgr(object):
OR
system.certificate.k8sRootCA
"""
id = []
tmp_id = []
if cert_name in utils.CERT_SNAPSHOT:
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_UUID:
id.append("system.certificate.mode=%s.uuid=%s" %
(cert_name, snapshot[utils.SNAPSHOT_KEY_uuid]))
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.UUID:
tmp_id.append("system.certificate.mode=%s.uuid=%s" %
(self.get_mode(cert_name), snapshot[utils.UUID]))
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_CERT_MGR:
id.append("namespace=%s.certificate=%s" %
tmp_id.append("namespace=%s.certificate=%s" %
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_cert]))
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_SECRET:
id.append("namespace=%s.secret=%s" %
tmp_id.append("namespace=%s.secret=%s" %
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_secret]))
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_OTHER:
id.append("system.certificate.%s" % cert_name)
tmp_id.append("system.certificate.%s" % cert_name)
id.append(" (%s=%s)" % (fm_constants.FM_ENTITY_TYPE_CERTIFICATE, cert_name))
return ''.join(id)
entity_id = ''.join(tmp_id)
self.ENTITYID_TO_CERTNAME_MAP[entity_id] = cert_name
return entity_id
def get_cert_name_from_entity_instance_id(self, instance_id):
start = "(" + fm_constants.FM_ENTITY_TYPE_CERTIFICATE
return instance_id[instance_id.find(start) + 1:instance_id.find(")")]
if instance_id in self.ENTITYID_TO_CERTNAME_MAP:
return self.ENTITYID_TO_CERTNAME_MAP[instance_id]
else:
return 'Unknown'
@staticmethod
def get_mode(cert_name):
return 'ssl_ca' if 'ssl_ca' in cert_name else cert_name
def get_reason_text(self, cert_name, expired_flag):
txt = ["Certificate "]
if cert_name in utils.CERT_SNAPSHOT:
# Add entity related text
snapshot = utils.CERT_SNAPSHOT[cert_name]
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_UUID:
if snapshot[utils.SNAPSHOT_KEY_MODE] is utils.UUID:
txt.append("\'system certificate-show %s\' (mode=%s) " %
(snapshot[utils.SNAPSHOT_KEY_uuid], cert_name))
(snapshot[utils.UUID], self.get_mode(cert_name)))
elif snapshot[utils.SNAPSHOT_KEY_MODE] is utils.MODE_CERT_MGR:
txt.append("namespace=%s, certificate=%s " %
(snapshot[utils.SNAPSHOT_KEY_k8s_ns], snapshot[utils.SNAPSHOT_KEY_k8s_cert]))
@ -83,10 +96,10 @@ class FaultApiMgr(object):
# Add Expired or Expiring
if expired_flag:
txt.append(" expired.")
txt.append("expired.")
else:
expiry_date = snapshot[utils.SNAPSHOT_KEY_EXPDATE]
txt.append(" is expiring soon on ")
txt.append("is expiring soon on ")
txt.append(expiry_date.strftime("%Y-%m-%d, %H:%M:%S"))
else:
@ -149,11 +162,11 @@ class FaultApiMgr(object):
# Expiring Soon alarms
exp_soon_alarms = self.get_faults(False)
self.add_alarms_snapshot(SNAPSHOT_KEY_EXPIRING_SOON, exp_soon_alarms)
self.add_alarms_snapshot(EXPIRING_SOON, exp_soon_alarms)
# Expired alarms
exprd_alarms = self.get_faults(True)
self.add_alarms_snapshot(SNAPSHOT_KEY_EXPIRED, exprd_alarms)
self.add_alarms_snapshot(EXPIRED, exprd_alarms)
def reset_alarms_snapshot(self):
self.ALARMS_SNAPSHOT = {}
@ -168,3 +181,9 @@ class FaultApiMgr(object):
cert_names.append(self.get_cert_name_from_entity_instance_id(item.entity_instance_id))
self.ALARMS_SNAPSHOT[key] = cert_names
def reset_entityid_to_certname_map(self):
self.ENTITYID_TO_CERTNAME_MAP = {}
def print_entityid_to_certname_map(self):
LOG.info('Entityid_to_certname map = %s' % self.ENTITYID_TO_CERTNAME_MAP)

View File

@ -11,6 +11,7 @@ from datetime import datetime
from oslo_config import cfg
from oslo_log import log
from OpenSSL import crypto
from sysinv.cert_mon import utils as certmon_utils
from sysinv.common import constants
from sysinv.common import kubernetes as sys_kube
@ -19,6 +20,7 @@ CONF = cfg.CONF
V1_ALPHA_3 = 'v1alpha3'
PLURAL_NAME_CERT = 'certificates'
SPEC = 'spec'
METADATA = 'metadata'
ANNOTATIONS = 'annotations'
CERTMGR_CERT_NAME = "cert-manager.io/certificate-name"
@ -26,13 +28,14 @@ CERTMGR_CERT_NAME = "cert-manager.io/certificate-name"
SNAPSHOT_KEY_EXPDATE = 'expiry_date'
# Mode will determine how alarm constructs entity_instance_id and description
SNAPSHOT_KEY_MODE = 'mode'
SNAPSHOT_KEY_uuid = 'mode_uuid'
SNAPSHOT_KEY_k8s_ns = 'mode_k8s_ns'
SNAPSHOT_KEY_k8s_cert = 'mode_k8s_cert'
SNAPSHOT_KEY_k8s_secret = 'mode_k8s_secret'
SNAPSHOT_KEY_FILE_LOC = 'file_location'
SNAPSHOT_KEY_RENEW_BEFORE = 'renewBefore'
# "mode" values can be:
MODE_UUID = 'uuid'
UUID = 'uuid'
MODE_SECRET = 'secret'
MODE_CERT_MGR = 'certmgr'
MODE_OTHER = 'other'
@ -40,7 +43,6 @@ MODE_OTHER = 'other'
CERT_SNAPSHOT = {}
"""
CERT_SNAPSHOT is a dict of dict. Each entry is per certificate.
Internal dict is expiry_date and all annotations collected via k8s secret
{
certname1: {
expiry_date: date
@ -49,11 +51,13 @@ Internal dict is expiry_date and all annotations collected via k8s secret
alarm_severity: <severity>
alarm_text: <custom pretext>
mode: <mode>
mode_uuid: <uuid>
uuid: <uuid>
mode_k8s_ns: <namespace>
mode_k8s_cert: <certificate>
mode_k8s_secret: <secret>
mode_other: <other>
file_location: <filepath>
renewBefore: <renewBefore>
}
certname2: {
...
@ -113,7 +117,7 @@ def collect_certificate_data_from_file(certname, pem_file):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf)
expiration_date = get_cert_expiration_date(cert)
annotation_data = get_default_annotation_values()
mode_metadata = get_file_mode_metadata(certname)
mode_metadata = get_file_mode_metadata(certname, pem_file)
return (certname, expiration_date, annotation_data, mode_metadata)
@ -210,6 +214,9 @@ def get_annotation_data(secretobj):
mode_metadata[SNAPSHOT_KEY_k8s_cert] = crd_cert_name
# Note: unlike k8s secret obj, get_custom_resource() returns a dict()
if SNAPSHOT_KEY_RENEW_BEFORE in certobj[SPEC]:
mode_metadata[SNAPSHOT_KEY_RENEW_BEFORE] = certobj[SPEC][SNAPSHOT_KEY_RENEW_BEFORE]
certobj_annotation = certobj[METADATA][ANNOTATIONS]
annotation_dict, patch_needed = process_annotation_data(certobj_annotation)
if patch_needed is True:
@ -303,22 +310,55 @@ def get_default_annotation_values():
def get_default_mode_metadata():
return {
SNAPSHOT_KEY_MODE: "",
SNAPSHOT_KEY_uuid: "",
UUID: "",
SNAPSHOT_KEY_k8s_ns: "",
SNAPSHOT_KEY_k8s_cert: "",
SNAPSHOT_KEY_k8s_secret: ""
SNAPSHOT_KEY_k8s_secret: "",
SNAPSHOT_KEY_FILE_LOC: "",
SNAPSHOT_KEY_RENEW_BEFORE: ""
}
def get_file_mode_metadata(certname):
def get_file_mode_metadata(certname, file_loc):
mode_metadata = get_default_mode_metadata()
mode_metadata[SNAPSHOT_KEY_FILE_LOC] = file_loc
# For k8s_root_ca and etcd, set "other". Rest should have UUID in sysinv db.
# In case of ssl & docker, if managed by cert_mgr, this wont be called from run_full_audit()
# so, can assume that we will have UUID in db
if certname is constants.CERT_MODE_KUBERNETES_ROOT_CA or certname is constants.CERT_MODE_ETCD:
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_OTHER
else:
mode_metadata[SNAPSHOT_KEY_MODE] = MODE_UUID
# TODO() get UUID from DB
mode_metadata[SNAPSHOT_KEY_MODE] = UUID
mode_metadata[UUID] = get_cert_uuid(certname)
return mode_metadata
def get_cert_uuid(certname):
token = certmon_utils._get_token(
CONF.keystone_authtoken.auth_url + '/v3/auth/tokens',
CONF.keystone_authtoken.project_name,
CONF.keystone_authtoken.username,
CONF.keystone_authtoken.password,
CONF.keystone_authtoken.user_domain_name,
CONF.keystone_authtoken.project_domain_name,
CONF.keystone_authtoken.region_name)
service_type = 'platform'
service_name = 'sysinv'
sysinv_url = token.get_service_internal_url(service_type,
service_name)
api_cmd = sysinv_url + '/certificate'
ret = 'unknown'
try:
res = certmon_utils.rest_api_request(token, "GET", api_cmd)
if len(res) == 1:
cert_list = res.get('certificates')
for item in cert_list:
if item['signature'] == certname:
ret = item['uuid']
break
except Exception as e:
LOG.exception(e)
return ret

View File

@ -7,8 +7,9 @@
"""Test class for Sysinv CertAlarm"""
from datetime import datetime
import os.path
import mock
from OpenSSL import crypto
import os.path
from sysinv.common import constants
from sysinv.cert_alarm import service as cert_alarm
@ -54,7 +55,8 @@ class CertAlarmTestCase(base.DbTestCase):
days_to_expiry = exp_date - datetime.now()
assert days_to_expiry.days < 0
def test_collect_certificate_data_from_file(self):
@mock.patch('sysinv.cert_alarm.utils.get_cert_uuid', return_value='unknown')
def test_collect_certificate_data_from_file(self, mock_get_cert_uuid):
cert_name = 'test_cert'
# If file doesn't exist, should return (certname, None, None)
@ -79,7 +81,7 @@ class CertAlarmTestCase(base.DbTestCase):
mode_metadata = ret[3]
self.assertIsNotNone(mode_metadata)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_MODE, mode_metadata)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_uuid, mode_metadata)
self.assertIn(cert_alarm_utils.UUID, mode_metadata)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_ns, mode_metadata)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_cert, mode_metadata)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_secret, mode_metadata)
@ -199,13 +201,17 @@ class CertAlarmTestCase(base.DbTestCase):
def test_get_default_mode_metadata(self):
data = cert_alarm_utils.get_default_mode_metadata()
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_MODE, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_uuid, data)
self.assertIn(cert_alarm_utils.UUID, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_ns, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_cert, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_k8s_secret, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_FILE_LOC, data)
self.assertIn(cert_alarm_utils.SNAPSHOT_KEY_RENEW_BEFORE, data)
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_MODE], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_uuid], "")
self.assertEqual(data[cert_alarm_utils.UUID], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_ns], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_cert], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_k8s_secret], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_FILE_LOC], "")
self.assertEqual(data[cert_alarm_utils.SNAPSHOT_KEY_RENEW_BEFORE], "")