config/sysinv/cgts-client/cgts-client/cgtsclient/v1/kube_rootca_update_shell.py
Yuxing Jiang 8ace6db94c API to get kube rootCA ID
The DC Kubernetes root CA audit was based on the cert expiration alarm.
We would like to switch to cert comparison to ensure the Kubernetes
root CA certs. For this purpose, this commit creates a new sub API to
get Kubernetes root CA ID from local.

Test plan:
1. Passed - Create ISO and deploy AIOSX and distributed cloud.
2. Passed - Run "system --debug  kube-rootca-get-cert-id", have the
expected response, and the cert ID was printed.

Story: 2010852
Task: 49091

Signed-off-by: Yuxing Jiang <Yuxing.Jiang@windriver.com>
Change-Id: Ie78121d0c21d2c6033c8b5d4919e251fc4d98050
2023-11-15 16:12:08 +00:00

197 lines
6.6 KiB
Python
Executable File

#
# Copyright (c) 2021-2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# All Rights Reserved.
#
from cgtsclient.common import utils
from cgtsclient import exc
from cgtsclient.v1 import ihost as ihost_utils
# Kubernetes constants
KUBE_ROOTCA_UPDATE_COMPLETED = 'update-completed'
KUBE_ROOTCA_UPDATE_ABORTED = 'update-aborted'
def _print_kube_rootca_update_show(obj):
fields = ['uuid', 'state', 'from_rootca_cert', 'to_rootca_cert',
'created_at', 'updated_at']
optional_fields = []
data = [(f, getattr(obj, f, '')) for f in fields]
if optional_fields:
data += [(f, getattr(obj, f, '')) for f in optional_fields
if hasattr(obj, f)]
utils.print_tuple_list(data)
def _print_kube_rootca_host_update_show(obj):
fields = ['uuid', 'state', 'effective_rootca_cert',
'target_rootca_cert', 'created_at', 'updated_at']
optional_fields = []
data = [(f, getattr(obj, f, '')) for f in fields]
if optional_fields:
data += [(f, getattr(obj, f, '')) for f in optional_fields
if hasattr(obj, f)]
utils.print_tuple_list(data)
@utils.arg('hostnameorid', metavar='<hostname or id>',
help="Name or ID of host")
@utils.arg('--phase',
choices=['trust-both-cas', 'update-certs', 'trust-new-ca'],
help="Specify the phase of the update")
def do_kube_rootca_host_update(cc, args):
"""Update root CA certificate on a host."""
ihost = ihost_utils._find_ihost(cc, args.hostnameorid)
if args.phase in ['trust-both-cas', 'update-certs', 'trust-new-ca']:
kube_update_host_rootca = cc.ihost.kube_update_rootca(ihost.uuid,
args.phase)
else:
raise exc.CommandError('Invalid phase value: %s' % args.phase)
_print_kube_rootca_host_update_show(kube_update_host_rootca)
@utils.arg('--phase',
choices=['trust-both-cas', 'trust-new-ca'],
help="Specify the phase of the update")
def do_kube_rootca_pods_update(cc, args):
"""Update root CA certificate for pods."""
if args.phase in ['trust-both-cas', 'trust-new-ca']:
kube_rootca_update = \
cc.kube_rootca_update.rootCA_pods_update(args.phase)
else:
raise exc.CommandError('Invalid phase value: %s' % args.phase)
_print_kube_rootca_update_show(kube_rootca_update)
@utils.arg('-f', '--force',
action='store_true',
default=False,
help="Ignore non management-affecting alarms")
def do_kube_rootca_update_start(cc, args):
"""Start a new procedure for kubernetes rootCA update """
kube_rootca_update = cc.kube_rootca_update.create(args.force)
uuid = getattr(kube_rootca_update, 'uuid', '')
try:
kube_rootca_update = cc.kube_rootca_update.get(uuid)
except exc.HTTPNotFound:
raise exc.CommandError('Created kubernetes rootca update UUID not found: %s'
% uuid)
_print_kube_rootca_update_show(kube_rootca_update)
@utils.arg('certificate_file',
metavar='<certificate_file>',
help='Path to the kubernetes certificate file (in PEM format) to upload.')
def do_kube_rootca_update_upload_cert(cc, args):
"""Upload new kubernetes rootCA """
certificate_file = args.certificate_file
try:
with open(certificate_file, 'rb') as sec_file:
cert_upload = cc.kube_rootca_update.rootCA_upload(sec_file)
except Exception:
raise exc.CommandError("Error: Could not open file %s." %
certificate_file)
if cert_upload.get("error"):
print(cert_upload.get("error"))
else:
# Show new rootca certificate identifier <issuer_hash>-<serial_number>
print("Uploaded new rootca certificate: %s" % cert_upload.get("success"))
def do_kube_rootca_get_cert_id(cc, args):
"""Get existing kubernetes rootCA cert ID"""
certificate = cc.kube_rootca_update.get_cert_id()
if certificate.error:
print(certificate.error)
else:
# Show new rootca certificate identifier <issuer_hash>-<serial_number>
print(f"Existing rootca certificate: {certificate.cert_id}")
@utils.arg('--expiry-date',
default=None,
help='Optional argument to define expiry date '
'in the format of YYYY-MM-DD for k8s root'
'CA to be generated')
@utils.arg('--subject',
default=None,
help='Subject to be set on new kubernetes root CA.Should '
'have the format <parameter_initials>=<value> and '
'supports C(Country), ST(State/Province), L(Locality) '
'O(Organization), OU(OrganizationalUnit), '
'CN(CommonName) subject parameters. An example is an '
'entry like C=US ST=California L=San Francisco '
'O=StarlingX OU=Company1 CN=Subject Example')
def do_kube_rootca_update_generate_cert(cc, args):
"""Generate new kubernetes rootCA"""
certificate = cc.kube_rootca_update.rootCA_generate(args.expiry_date, args.subject)
if certificate.error:
print(certificate.error)
else:
# Show new rootca certificate identifier <issuer_hash>-<serial_number>
print("Generated new rootca certificate: %s" % certificate.success)
def do_kube_rootca_update_show(cc, args):
"""Retrieves kubernetes rootCA update status"""
update_status = cc.kube_rootca_update.get_list()
fields = ['uuid', 'state', 'from_rootca_cert', 'to_rootca_cert',
'created_at', 'updated_at']
utils.print_list(update_status, fields, fields)
def do_kube_rootca_host_update_list(cc, args):
"""Retrieves kubernetes rootCA update status on each host"""
update_status_list = cc.kube_rootca_update.host_update_list()
fields = ['hostname', 'personality', 'state', 'effective_rootca_cert',
'target_rootca_cert', 'created_at', 'updated_at']
utils.print_list(update_status_list, fields, fields)
def do_kube_rootca_update_complete(cc, args):
"""Marks the rootca update as complete"""
patch = []
patch.append({'op': 'replace',
'path': '/state',
'value': KUBE_ROOTCA_UPDATE_COMPLETED})
update_status = cc.kube_rootca_update.update_complete(patch)
_print_kube_rootca_update_show(update_status)
def do_kube_rootca_update_abort(cc, args):
"""Marks the rootca update as aborted"""
patch = []
patch.append({'op': 'replace',
'path': '/state',
'value': KUBE_ROOTCA_UPDATE_ABORTED})
update_status = cc.kube_rootca_update.update_complete(patch)
_print_kube_rootca_update_show(update_status)