Merge "rootca update upload cert API"

This commit is contained in:
Zuul 2021-06-17 20:22:07 +00:00 committed by Gerrit Code Review
commit 6807b69a83
11 changed files with 745 additions and 15 deletions

View File

@ -14,6 +14,7 @@ import wsmeext.pecan as wsme_pecan
from fm_api import fm_api from fm_api import fm_api
from fm_api import constants as fm_constants from fm_api import constants as fm_constants
from oslo_log import log from oslo_log import log
from pecan import expose
from pecan import rest from pecan import rest
from sysinv import objects from sysinv import objects
from sysinv.api.controllers.v1 import base from sysinv.api.controllers.v1 import base
@ -28,7 +29,37 @@ from wsme import types as wtypes
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
LOCK_NAME = 'KubeRootCAUpdateController' LOCK_KUBE_ROOTCA_UPLOAD_CONTROLLER = 'KubeRootCAUploadController'
LOCK_KUBE_ROOTCA_UPDATE_CONTROLLER = 'KubeRootCAUpdateController'
class KubeRootCAUploadController(rest.RestController):
@cutils.synchronized(LOCK_KUBE_ROOTCA_UPLOAD_CONTROLLER)
@expose('json')
def post(self):
fileitem = pecan.request.POST['file']
if not fileitem.filename:
raise wsme.exc.ClientSideError(("Error: No file uploaded"))
try:
fileitem.file.seek(0, os.SEEK_SET)
pem_contents = fileitem.file.read()
except Exception:
return dict(
success="",
error=("No kube rootca certificate have been added, invalid PEM document"))
try:
output = pecan.request.rpcapi.save_kubernetes_rootca_cert(
pecan.request.context,
pem_contents
)
except Exception:
msg = "Conductor call for new kube rootca upload failed"
return dict(success="", error=msg)
return output
class KubeRootCAUpdate(base.APIBase): class KubeRootCAUpdate(base.APIBase):
@ -84,10 +115,12 @@ class KubeRootCAUpdate(base.APIBase):
class KubeRootCAUpdateController(rest.RestController): class KubeRootCAUpdateController(rest.RestController):
"""REST controller for kubernetes rootCA updates.""" """REST controller for kubernetes rootCA updates."""
upload = KubeRootCAUploadController()
def __init__(self): def __init__(self):
self.fm_api = fm_api.FaultAPIs() self.fm_api = fm_api.FaultAPIs()
@cutils.synchronized(LOCK_NAME) @cutils.synchronized(LOCK_KUBE_ROOTCA_UPDATE_CONTROLLER)
@wsme_pecan.wsexpose(KubeRootCAUpdate, body=six.text_type) @wsme_pecan.wsexpose(KubeRootCAUpdate, body=six.text_type)
def post(self, body): def post(self, body):
"""Create a new Kubernetes RootCA Update and start update.""" """Create a new Kubernetes RootCA Update and start update."""
@ -138,7 +171,9 @@ class KubeRootCAUpdateController(rest.RestController):
"System is not in a valid state for kubernetes rootca update. " "System is not in a valid state for kubernetes rootca update. "
"Run system health-query for more details.")) "Run system health-query for more details."))
create_obj = {'state': kubernetes.KUBE_ROOTCA_UPDATE_STARTED} create_obj = {'state': kubernetes.KUBE_ROOTCA_UPDATE_STARTED,
'from_rootca_cert': body.get('from_rootca_cert')
}
new_update = pecan.request.dbapi.kube_rootca_update_create(create_obj) new_update = pecan.request.dbapi.kube_rootca_update_create(create_obj)
entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_HOST, entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_HOST,
@ -158,7 +193,6 @@ class KubeRootCAUpdateController(rest.RestController):
service_affecting=False) service_affecting=False)
self.fm_api.set_fault(fault) self.fm_api.set_fault(fault)
LOG.info("Started kubernetes rootca update") LOG.info("Started kubernetes rootca update")
return KubeRootCAUpdate.convert_with_links(new_update) return KubeRootCAUpdate.convert_with_links(new_update)
@wsme_pecan.wsexpose(KubeRootCAUpdate, types.uuid) @wsme_pecan.wsexpose(KubeRootCAUpdate, types.uuid)

View File

@ -1898,3 +1898,17 @@ CERT_MODE_TO_SECRET_NAME = {
SB_SUPPORTED_NETWORKS = { SB_SUPPORTED_NETWORKS = {
SB_TYPE_CEPH: [NETWORK_TYPE_MGMT, NETWORK_TYPE_CLUSTER_HOST] SB_TYPE_CEPH: [NETWORK_TYPE_MGMT, NETWORK_TYPE_CLUSTER_HOST]
} }
BEGIN_CERTIFICATE_MARKER = b"-----BEGIN CERTIFICATE-----\n"
END_CERTIFICATE_MARKER = b"\n-----END CERTIFICATE-----\n"
BEGIN_PRIVATE_KEY_MARKER = b"-----BEGIN PRIVATE KEY-----\n"
END_PRIVATE_KEY_MARKER = b"\n-----END PRIVATE KEY-----\n"
# Kubernetes root CA certficate update phases
KUBE_CERT_UPDATE_TRUSTBOTHCAS = "trust-both-cas"
KUBE_CERT_UPDATE_UPDATECERTS = "update-certs"
KUBE_CERT_UPDATE_TRUSTNEWCA = "trust-new-ca"
# kubernetes components secrets on rootCA update procedure
KUBE_ROOTCA_SECRET = 'system-kube-rootca-certificate'
KUBE_ROOTCA_ISSUER = 'system-kube-rootca-issuer'

View File

@ -881,6 +881,10 @@ class CertificateTypeNotFound(NotFound):
message = _("No certificate type of %(certtype)s") message = _("No certificate type of %(certtype)s")
class InvalidKubernetesCA(Invalid):
message = _("Invalid certificate for kubernetes rootca")
class DockerRegistryCredentialNotFound(NotFound): class DockerRegistryCredentialNotFound(NotFound):
message = _("Credentials to access local docker registry " message = _("Credentials to access local docker registry "
"for user %(name)s could not be found.") "for user %(name)s could not be found.")

View File

@ -47,6 +47,7 @@ KUBE_STATE_PARTIAL = 'partial'
# Kubernetes namespaces # Kubernetes namespaces
NAMESPACE_KUBE_SYSTEM = 'kube-system' NAMESPACE_KUBE_SYSTEM = 'kube-system'
NAMESPACE_DEPLOYMENT = 'deployment'
# Kubernetes control plane components # Kubernetes control plane components
KUBE_APISERVER = 'kube-apiserver' KUBE_APISERVER = 'kube-apiserver'
@ -80,12 +81,12 @@ KUBE_HOST_UPGRADING_KUBELET_FAILED = 'upgrading-kubelet-failed'
KUBE_ROOTCA_UPDATE_STARTED = 'update-started' KUBE_ROOTCA_UPDATE_STARTED = 'update-started'
KUBE_ROOTCA_UPDATE_CERT_UPLOADED = 'update-new-rootca-cert-uploaded' KUBE_ROOTCA_UPDATE_CERT_UPLOADED = 'update-new-rootca-cert-uploaded'
KUBE_ROOTCA_UPDATE_CERT_GENERATED = 'update-new-rootca-cert-generated' KUBE_ROOTCA_UPDATE_CERT_GENERATED = 'update-new-rootca-cert-generated'
KUBE_ROOTCA_UPDATE_UPDATING_PODS_TRUSTBOTHCAS = 'updating-pods-trustBothCAs' KUBE_ROOTCA_UPDATING_PODS_TRUSTBOTHCAS = 'updating-pods-trustBothCAs'
KUBE_ROOTCA_UPDATE_UPDATED_PODS_TRUSTBOTHCAS = 'updated-pods-trustBothCAs' KUBE_ROOTCA_UPDATED_PODS_TRUSTBOTHCAS = 'updated-pods-trustBothCAs'
KUBE_ROOTCA_UPDATE_UPDATING_PODS_TRUSTBOTHCAS_FAILED = 'updating-pods-trustBothCAs-failed' KUBE_ROOTCA_UPDATING_PODS_TRUSTBOTHCAS_FAILED = 'updating-pods-trustBothCAs-failed'
KUBE_ROOTCA_UPDATE_UPDATING_PODS_TRUSTNEWCA = 'updating-pods-trustNewCA' KUBE_ROOTCA_UPDATING_PODS_TRUSTNEWCA = 'updating-pods-trustNewCA'
KUBE_ROOTCA_UPDATE_UPDATED_PODS_TRUSTNEWCA = 'updated-pods-trustNewCA' KUBE_ROOTCA_UPDATED_PODS_TRUSTNEWCA = 'updated-pods-trustNewCA'
KUBE_ROOTCA_UPDATE_UPDATING_PODS_TRUSTNEWCA_FAILED = 'updating-pods-trustNewCA-failed' KUBE_ROOTCA_UPDATING_PODS_TRUSTNEWCA_FAILED = 'updating-pods-trustNewCA-failed'
KUBE_ROOTCA_UPDATE_COMPLETED = 'update-completed' KUBE_ROOTCA_UPDATE_COMPLETED = 'update-completed'
# Kubernetes rootca host update states # Kubernetes rootca host update states
@ -554,6 +555,44 @@ class KubeOperator(object):
"Namespace %s: %s" % (label, namespace, e)) "Namespace %s: %s" % (label, namespace, e))
raise raise
def get_custom_resource(self, group, version, namespace, plural, name):
custom_resource_api = self._get_kubernetesclient_custom_objects()
try:
cert = custom_resource_api.get_namespaced_custom_object(
group,
version,
namespace,
plural,
name)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Fail to access %s:%s. %s" % (namespace, name, e))
raise
else:
return cert
def apply_custom_resource(self, group, version, namespace, plural, name, body):
custom_resource_api = self._get_kubernetesclient_custom_objects()
# if resource already exists we apply just a patch
cert = self.get_custom_resource(group, version, namespace, plural, name)
if cert:
custom_resource_api.patch_namespaced_custom_object(group,
version,
namespace,
plural,
name,
body)
else:
custom_resource_api.create_namespaced_custom_object(group,
version,
namespace,
plural,
body)
def delete_custom_resource(self, group, version, namespace, plural, name): def delete_custom_resource(self, group, version, namespace, plural, name):
c = self._get_kubernetesclient_custom_objects() c = self._get_kubernetesclient_custom_objects()
body = {} body = {}

View File

@ -2564,6 +2564,45 @@ def get_aws_ecr_registry_credentials(dbapi, registry, username, password):
return dict(username=username, password=password) return dict(username=username, password=password)
def extract_ca_private_key_bytes_from_pem(pem_content):
""" Extract key from the PEM file bytes
:param pem_content: bytes from PEM file where we'll get the key
:return base64_crt: extracted key base64 encoded
"""
begin_search = pem_content.find(constants.BEGIN_PRIVATE_KEY_MARKER)
if begin_search < 0:
raise exception.InvalidKubernetesCA
end_search = pem_content.find(constants.END_PRIVATE_KEY_MARKER)
if end_search < 0:
LOG.info(pem_content)
raise exception.InvalidKubernetesCA
end_search += len(constants.END_PRIVATE_KEY_MARKER)
base64_key = base64.b64encode(pem_content[begin_search:end_search])
return base64_key
def extract_ca_crt_bytes_from_pem(pem_content):
""" Extract certificate from the PEM file bytes
:param pem_content: bytes from PEM file where we'll get the certificate
:return base64_crt: extracted certificate base64 encoded
"""
begin_search = pem_content.find(constants.BEGIN_CERTIFICATE_MARKER)
if begin_search < 0:
raise exception.InvalidKubernetesCA
end_search = pem_content.find(constants.END_CERTIFICATE_MARKER)
if end_search < 0:
raise exception.InvalidKubernetesCA
end_search += len(constants.END_CERTIFICATE_MARKER)
base64_crt = base64.b64encode(pem_content[begin_search:end_search])
return base64_crt
def extract_certs_from_pem(pem_contents): def extract_certs_from_pem(pem_contents):
""" """
Extract certificates from a pem string Extract certificates from a pem string
@ -2571,12 +2610,10 @@ def extract_certs_from_pem(pem_contents):
:param pem_contents: A string in pem format :param pem_contents: A string in pem format
:return certs: A list of x509 cert objects :return certs: A list of x509 cert objects
""" """
marker = b'-----BEGIN CERTIFICATE-----'
start = 0 start = 0
certs = [] certs = []
while True: while True:
index = pem_contents.find(marker, start) index = pem_contents.find(constants.BEGIN_CERTIFICATE_MARKER, start)
if index == -1: if index == -1:
break break
try: try:
@ -2589,10 +2626,25 @@ def extract_certs_from_pem(pem_contents):
"Failed to load pem x509 certificate")) "Failed to load pem x509 certificate"))
certs.append(cert) certs.append(cert)
start = index + len(marker) start = index + len(constants.BEGIN_CERTIFICATE_MARKER)
return certs return certs
def check_cert_validity(cert):
"""Perform checks on validity of certificate
"""
now = datetime.datetime.utcnow()
msg = ("certificate is not valid before %s nor after %s" %
(cert.not_valid_before, cert.not_valid_after))
LOG.info(msg)
if now <= cert.not_valid_before or now >= cert.not_valid_after:
msg = ("certificate is not valid before %s nor after %s" %
(cert.not_valid_before, cert.not_valid_after))
LOG.info(msg)
return msg
return None
def is_ca_cert(cert): def is_ca_cert(cert):
""" """
Check if the certificate is a CA certficate Check if the certificate is a CA certficate
@ -2634,6 +2686,55 @@ def get_cert_issuer_hash(cert):
return hash_issuer return hash_issuer
def get_cert_issuer_string_hash(cert):
"""
Get the hash value of the cert's issuer DN
:param cert: the certificate to get issuer from
:return: The hash value of the cert's issuer DN
"""
try:
public_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
cert_c = crypto.load_certificate(crypto.FILETYPE_PEM, public_bytes)
# get the issuer object from the loaded certificate
cert_issuer = cert_c.get_issuer()
# for each component presented on certificate issuer,
# converts the respective name and value for strings and join all
# together
issuer_attributes = "".join("/{0:s}={1:s}".format(name.decode(),
value.decode())
for name, value in
cert_issuer.get_components())
# apply the hash function to binary form of the string above and
# digest it as a hexdecimal value, and take the first 16 bytes.
hashed_attributes = \
hashlib.md5(issuer_attributes.encode()).hexdigest()[:16]
LOG.info("hashed issuer attributes %s from certificate "
% hashed_attributes)
except Exception:
LOG.exception()
raise exception.SysinvException(_(
"Failed to get certificate issuer hash."))
return hashed_attributes
def get_cert_serial(cert):
try:
public_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
cert_c = crypto.load_certificate(crypto.FILETYPE_PEM, public_bytes)
serial_number = cert_c.get_serial_number()
except Exception:
LOG.exception()
raise exception.SysinvException(_(
"Failed to get certificate serial number."))
return serial_number
def get_cert_subject_hash(cert): def get_cert_subject_hash(cert):
""" """
Get the hash value of the cert's subject DN Get the hash value of the cert's subject DN
@ -2653,6 +2754,43 @@ def get_cert_subject_hash(cert):
return hash_subject return hash_subject
def get_cert_subject_string_hash(cert):
"""
Get the hash value of the cert's subject DN
:param cert: the certificate to get subject from
:return: The hash value of the cert's subject DN
"""
try:
public_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
cert_c = crypto.load_certificate(crypto.FILETYPE_PEM, public_bytes)
# get the subject object from the loaded certificate
cert_subject = cert_c.get_subject()
# for each component presented on certificate subject,
# converts the respective name and value for strings and join all
# together
subject_attributes = "".join("/{0:s}={1:s}".format(name.decode(),
value.decode())
for name, value in
cert_subject.get_components())
# apply the hash function to binary form of the string above and
# digest it as a hexdecimal value, and take the first 16 bytes.
hashed_attributes = \
hashlib.md5(subject_attributes.encode()).hexdigest()[:16]
LOG.info("hashed subject attributes %s from certificate "
% hashed_attributes)
except Exception:
LOG.exception()
raise exception.SysinvException(_(
"Failed to get certificate subject hash."))
return hashed_attributes
def format_image_filename(device_image): def format_image_filename(device_image):
""" Format device image filename """ """ Format device image filename """
return "{}-{}-{}-{}.bit".format(device_image.bitstream_type, return "{}-{}-{}-{}.bit".format(device_image.bitstream_type,

View File

@ -13502,6 +13502,22 @@ class ConductorManager(service.PeriodicService):
self.fm_api.clear_fault(fm_constants.FM_ALARM_ID_DEVICE_IMAGE_UPDATE_IN_PROGRESS, self.fm_api.clear_fault(fm_constants.FM_ALARM_ID_DEVICE_IMAGE_UPDATE_IN_PROGRESS,
entity_instance_id) entity_instance_id)
def _get_current_kube_rootca(self):
""" Extract current k8s rootca """
current_cert = None
try:
with open('/etc/kubernetes/pki/ca.crt', 'rb') as old_rootca:
old_rootca.seek(0, os.SEEK_SET)
read_ca = old_rootca.read()
old_rootca_cert = cutils.extract_certs_from_pem(read_ca)[0]
hash_subject = cutils.get_cert_issuer_string_hash(old_rootca_cert)
serial_number = cutils.get_cert_serial(old_rootca_cert)
current_cert = "%s-%s" % (str(hash_subject), str(serial_number))
except Exception:
msg = "Extracting information regarding current k8s rootca failed"
return dict(success="", error=msg)
return current_cert
def fpga_device_update_by_host(self, context, def fpga_device_update_by_host(self, context,
host_uuid, fpga_device_dict_array): host_uuid, fpga_device_dict_array):
"""Create FPGA devices for an ihost with the supplied data. """Create FPGA devices for an ihost with the supplied data.
@ -13708,6 +13724,177 @@ class ConductorManager(service.PeriodicService):
LOG.info(output) LOG.info(output)
return output return output
def _create_kube_rootca_resources(self, certificate, key):
""" A method to create new resources to store new kubernetes
rootca data.
:param certificate: the certificate to be stored in TLS secret
:param key: the certificate key to be stored in TLS secret
:return: An error message if method is not successful, otherwhise None
"""
kube_operator = kubernetes.KubeOperator()
body = {
'apiVersion': 'v1',
'type': 'kubernetes.io/tls',
'kind': 'Secret',
'metadata': {
'name': constants.KUBE_ROOTCA_SECRET,
'namespace': kubernetes.NAMESPACE_DEPLOYMENT
},
'data': {
'tls.crt': certificate,
'tls.key': key
}
}
try:
secret = kube_operator.kube_get_secret(constants.KUBE_ROOTCA_SECRET,
kubernetes.NAMESPACE_DEPLOYMENT)
if secret is not None:
kube_operator.kube_delete_secret(constants.KUBE_ROOTCA_SECRET,
kubernetes.NAMESPACE_DEPLOYMENT)
kube_operator.kube_create_secret(kubernetes.NAMESPACE_DEPLOYMENT, body)
except Exception as e:
msg = "Creation of kube-rootca secret failed: %s" % str(e)
LOG.error(msg)
return msg
body = {
'apiVersion': 'cert-manager.io/v1alpha2',
'kind': 'Issuer',
'metadata': {
'name': constants.KUBE_ROOTCA_ISSUER,
'namespace': kubernetes.NAMESPACE_DEPLOYMENT
},
'spec': {
'ca': {
'secretName': constants.KUBE_ROOTCA_SECRET
}
}
}
try:
kube_operator.apply_custom_resource('cert-manager.io',
'v1alpha2',
kubernetes.NAMESPACE_DEPLOYMENT,
'issuers',
constants.KUBE_ROOTCA_ISSUER,
body)
except Exception as e:
msg = "Not successfull applying issuer: %s" % str(e)
return msg
def _precheck_save_kubernetes_rootca_cert(self, update, temp_pem_contents):
""" This method intends to do a series of validations to allow the upload
of a new rootca for kubernetes. These validations are respective to the
procedure itself or the new ca file that is being uploaded.
:param update: actual entry of kube rootca update procedure from DB
:param temp_pem_contents: content of the file uploaded to update kube rootca
:return: A dictionaire with a new_cert if successful and eventual error message
"""
if update.state != kubernetes.KUBE_ROOTCA_UPDATE_STARTED:
msg = "A new root CA certificate already exists"
return dict(success="", error=msg)
if update.to_rootca_cert:
LOG.info("root CA target with serial number %s will be overwritten"
% update.to_rootca_cert)
# extract the certificate contained in PEM file
try:
cert = cutils.extract_certs_from_pem(temp_pem_contents)[0]
except Exception as e:
msg = "Failed to extract certificate from file: %s" % str(e)
return dict(success="", error=msg)
if not cert:
msg = "No certificate have been added, " \
"no valid certificate found in file."
LOG.info(msg)
return dict(success="", error=msg)
# validate certificate
msg = cutils.check_cert_validity(cert)
if msg is not None:
return dict(success="", error=msg)
is_ca = cutils.is_ca_cert(cert)
if not is_ca:
msg = "The certificate in the file is not a CA certificate"
return dict(success="", error=msg)
# extract information regarding the new rootca
try:
hash_subject = cutils.get_cert_issuer_string_hash(cert)
serial_number = cutils.get_cert_serial(cert)
new_cert = '%s-%s' % (hash_subject, serial_number)
except Exception:
msg = "Failed to extract respective subject and serial number"
return dict(success="", error=msg)
return dict(success=new_cert, error="")
def save_kubernetes_rootca_cert(self, context, ca_file):
"""
Save a new uploaded kubernetes rootca for update procedure
:param context: request context
:param ca_file: a stream representing the PEM file uploaded
"""
# ca_file has to be in bytes format for extract information
if not isinstance(ca_file, bytes):
temp_pem_contents = ca_file.encode("utf-8")
else:
temp_pem_contents = ca_file
try:
update = self.dbapi.kube_rootca_update_get_one()
except exception.NotFound:
msg = "Kubernetes root CA update not started"
LOG.error(msg)
return dict(success="", error=msg)
result = self._precheck_save_kubernetes_rootca_cert(update, temp_pem_contents)
if result.get("error"):
msg = result.get("error")
return dict(success="", error=msg)
else:
new_cert = result.get("success")
# extract current k8s rootca
current_cert = self._get_current_kube_rootca()
if not current_cert:
msg = "Not able to get the current kube rootca"
return dict(success="", error=msg)
try:
certificate = cutils.extract_ca_crt_bytes_from_pem(temp_pem_contents)
except exception.InvalidKubernetesCA:
msg = "Invalid certificate format"
return dict(success="", error=msg)
try:
key = cutils.extract_ca_private_key_bytes_from_pem(temp_pem_contents)
except exception.InvalidKubernetesCA:
msg = "Failed to extract key from certificate file"
return dict(success="", error=msg)
msg = self._create_kube_rootca_resources(certificate, key)
if msg is not None:
return dict(success="", error=msg)
# update db
update_obj = {'state': kubernetes.KUBE_ROOTCA_UPDATE_CERT_UPLOADED,
'from_rootca_cert': current_cert,
'to_rootca_cert': new_cert}
r = self.dbapi.kube_rootca_update_update(update.id, update_obj)
return dict(success=r.to_rootca_cert, error="")
def mtc_action_apps_semantic_checks(self, context, action): def mtc_action_apps_semantic_checks(self, context, action):
"""Call semantic check for maintenance actions of each app. """Call semantic check for maintenance actions of each app.
Fail if at least one app rejects the action. Fail if at least one app rejects the action.

View File

@ -2220,3 +2220,12 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
""" """
return self.call(context, return self.call(context,
self.make_msg('update_dnsmasq_config')) self.make_msg('update_dnsmasq_config'))
def save_kubernetes_rootca_cert(self, context, certificate_file):
"""Save the new uploaded k8s root CA certificate
:param context: request context.
:certificate_file: the new rootca PEM file
"""
return self.call(context, self.make_msg('save_kubernetes_rootca_cert',
ca_file=certificate_file))

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAthLAcpLktKA5jm44TGVxza/M1Yt4/67wKXDHqCOixZQ4w7Pz
sS62YjpBInrs4taRX0GbDKondaur88dPVSZ3/7vXQ7RJCELnTSpM5KDN5NExzkte
Kr0zNrt8d0/h3UoWS9Jgl3/i4YIVu7l+spUNNV7YF0Nr9MUifRKTj3Cg5B5R4NTX
oXSgaYY5kL2MRF0YQ/FhYqz3nTCm2yogNM4R8FGIJYVbPAnA2wol06GL/IwMYght
wEdL7ocjcXVmsv7XidtI+Dp7BQWRs35w7atx+fr1Tr4jO6npkvzfUMgSy4d3c9R4
jhUaeMp62gEPp1MbZydlBWxM2s4q4kkgGHRMrQIDAQABAoIBAF77p1PeF9u23m/U
RiBsp5LjDFu2t/fCvl0QDchEVuz15ysJHK8pLFJQC5y+PggUYaAs7IMN3SoA1eKF
7ngAaoeJ6cHTMmpR5LKXx6dZ0C93hqEVJlnre+Uop8TicnTr6nfBl0xRlf2IzGez
XEozgcF+6gIw1QfLM7PF1h71Zam62RQuRAqjTpGUz0a5XuAs7H5MwLYGM8jebCNy
fGgi8NSHzGi8P83rNb3M486Tavlfh/OM7aM5S+Rf8PuG7BXvAzx9L4zf4Zo4L8Ud
Sw+l87SK62Lw0eMRq5obSDre4cXSJWGOqG8lAs/LfYc51n+mPcirxFvoiFGYspi8
lgVPGc0CgYEA7xDg/7yDLQ2QDGDP5ATi8Ea/FNORNo6ie+BIneZxrq822JRRJzoj
GstuZBoMeMKuSIhIOgXOAUAUNpKNV/pgxvdldZPkdR3JhZLHBO6AJ6Llj8M1shAt
wCKh2zKdPc9ACk+W1Ou6lgok6b0pRmhvJv+Rr9GlR9wXSke9FL/SiQcCgYEAwvhk
N+kze9+pWGrvTM9728ETP0Mq/Fg55I0D8F82msXSEdcmYGx5fJvGbNMshNy+jbLD
RUlAXmJfzmgqdGJKPNf5Rgl52Tr42h9BgqXTIGPO4nrXVa7apS8KWP2jt640VqSa
iGM6auC6elJsO8w1nQAVnxnJwiKzV3gWPaKO06sCgYBAugExPIkHmbR2pX+j7O7E
v2Lc8KtQai3z/DWtCsec1DO1T/Lo/ASlLI8m6yaVS6CEYuGrVAcCr6bJX8SFHXU2
aaU+wFwKmZYGZEcePrTUBnbBBclz/I1mh/nqrzmDkql0IThlTa2nEfgMkPqr5Xqy
xF9dixWE70IfCm1XQNhv4QKBgAOnO9mAWSKdEkNB3bIGwT9g4sdwrsGDtbH+onBC
mHdV9ZW3/lQYND6NfK5VVqQ2rqthCh+mO7qJBVqMwR7lKJbzRQx26P2VCUytAUE9
cjNNK3c67gYA/L/TndIFDqhGb1ygQPUFRvbxtwzLtpN4RBjpA36zsQAePlYJPgFx
plN5AoGBAJQUWaSb/dyPnekFfuTyXrkFRSL/rh/VXCABtIVqGVnAC7rD3vrW7ZFd
1yZwJPu8P7xrQ0e6QdzF1C/2TLnvVsPwgJYM0GR1rhdMuQ9GCdxg8GF0tMFbzngY
OdUmTvDa6j1rGc3yVwihZ8Z3RCM+VdYVA7V2uiB9YhmcATcnh+AN
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,81 @@
-----BEGIN CERTIFICATE-----
MIIE7TCCAtWgAwIBAgIDAMYmMA0GCSqGSIb3DQEBCwUAMB8xEDAOBgNVBAoMB0V0
Y2QgQ0ExCzAJBgNVBAMMAmNhMB4XDTIxMDMyOTE3NDYzM1oXDTMxMDMyNzE3NDYz
M1owHzEQMA4GA1UECgwHRXRjZCBDQTELMAkGA1UEAwwCY2EwggIiMA0GCSqGSIb3
DQEBAQUAA4ICDwAwggIKAoICAQC53t23VB7sngnQOO78GBmG1uOAEMWcM/MA9Tjh
bDV4R5BfMK7OPYQ5lEkLcA10Rk7MxCTPZ2R+9YDB/gXnb0sZRN2B46bYN7YQ6tAc
ZQosxt49FgqcoyRUWW5gxisHHFdirez8GZLHSfHdxj2cLc41HD0KyHVQJp+2s6bR
ipAEtrWHYp9ppgkmY0BXvozWEVBiJnb/nC2nsWyq6kuDbyc2h8ZADrETm5jdt+CC
HnBiv2r3teEkZiqbShy/Dh1jPzcpvmXYEPDhBDkKjCtPXOG4S8c2qQ6lqMdZhTqZ
K9GP5H+0N0bQd9RmDG81OuMMgXgPs2kjp2z7gVvtr2qLAsDeCiL9tyRuQJO1Q7Y8
pHAvlKDnhTYy0JiN62Wi93xpgCKirqH85BKhSF+8Dv2QFOnqGCRt27d17yInalj4
j0/YVhA2nH3wsTwcWMswcYRIrWUy9Ni3Yj/PwZp6ANUR3FaZuBjFMUFhURNvoC3q
+/2zv2FrRKCDos4Lddd0p7g+BjAwaBq9ldU8vFxcJMe8ECIbp8FER6g9T5wq6MWv
FdQsvKzH6WlkCcH0u27uvwBNDGkfUUNfvJEegkFfiqYk/GpvDyYMO89ZmpLTO4Oh
55641tHUh97eNpGfcMFXVhgRtcA01lIfghj7sKEEoTlVjprta+yuiAdp/8QHriRy
BxYNNQIDAQABozIwMDANBgNVHREEBjAEggJjYTALBgNVHQ8EBAMCAoQwEgYDVR0T
AQH/BAgwBgEB/wIBATANBgkqhkiG9w0BAQsFAAOCAgEAj8QylagndcbqRR0ZHXtx
/HZ5/bli3pkaUF0FCEHoypEOf6aIYHgjyFuoBaAPLnncsW0B4828AHx+nfWfNFnz
o1FF5/t5LUi9dCLPRXV0CL2Xnhb7QmNowFsbYb7W/xAkcMM4vOKDE6Z4HjUX8Ypj
5fHBIGYxEhma+BJC+02JRycGTa3Nr+1yDXfMBnUKgXVk9erz+krXvIxK7Kxc1cEi
NmpkJwKZtxx1PcMcsY071ahMO0bxcVS8JTn1jVG//yUYipYcaZqO8MvG8uYOFtCI
OHIKD+8e8H7mr83K+rcctI9+vkbkTSCs3CLeozyt1qZYRoh599rtjTMhyNDrsB75
bcYzXJIuP18xZAlQW7wYf1FHvWMGsZhBZBMwauOix86TfuoYpPfNyCRHysUQSkjy
BhWs2KPcNqWG/bS0l+ZAYMcZC1e9GnTMcXohDDij/QjUNncaSiikMoO3xrCajhkp
thGjWNvwQ2i1cWYSAWtm3W5w0TPr2bVf5nCZ29QgPZvZn0ziIrgYgOKRwCEcQQG8
y95klQRXr9ErU3fk1C01BlsLdzcRVC2EJh7FyTsNraUXFLiUo3xtE8XwR/rNWPK9
gAVNPIx62soLQfTUZKRIW0C2esqT7zNxQcbIswRqqe7fr2EG4Ab1p9PzXQv3L1HM
HNnJdMzekIAiKxt8obIAki4=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC53t23VB7sngnQ
OO78GBmG1uOAEMWcM/MA9TjhbDV4R5BfMK7OPYQ5lEkLcA10Rk7MxCTPZ2R+9YDB
/gXnb0sZRN2B46bYN7YQ6tAcZQosxt49FgqcoyRUWW5gxisHHFdirez8GZLHSfHd
xj2cLc41HD0KyHVQJp+2s6bRipAEtrWHYp9ppgkmY0BXvozWEVBiJnb/nC2nsWyq
6kuDbyc2h8ZADrETm5jdt+CCHnBiv2r3teEkZiqbShy/Dh1jPzcpvmXYEPDhBDkK
jCtPXOG4S8c2qQ6lqMdZhTqZK9GP5H+0N0bQd9RmDG81OuMMgXgPs2kjp2z7gVvt
r2qLAsDeCiL9tyRuQJO1Q7Y8pHAvlKDnhTYy0JiN62Wi93xpgCKirqH85BKhSF+8
Dv2QFOnqGCRt27d17yInalj4j0/YVhA2nH3wsTwcWMswcYRIrWUy9Ni3Yj/PwZp6
ANUR3FaZuBjFMUFhURNvoC3q+/2zv2FrRKCDos4Lddd0p7g+BjAwaBq9ldU8vFxc
JMe8ECIbp8FER6g9T5wq6MWvFdQsvKzH6WlkCcH0u27uvwBNDGkfUUNfvJEegkFf
iqYk/GpvDyYMO89ZmpLTO4Oh55641tHUh97eNpGfcMFXVhgRtcA01lIfghj7sKEE
oTlVjprta+yuiAdp/8QHriRyBxYNNQIDAQABAoICADZ0zl7E/Z5zmwpvc81WPjxc
PyEpSMxACCUys2yQKIZJ6UmKWNzB9zhrco8wUDDN3I5vtR0y/KWZxhSQGSi6WbVY
kNFaYmqcv/Hq6fg3vihqR3h8ObW0spMn9IfT541Yx114+aLO10seJgfE6g4U+YJj
+JptKrnF5ys/LVPdFd7brQmyYmQwqiOeFp7ejCK3xeZLwLeZCWNFP0JADMnASivW
0cW4yDancr0a/2MACgtUa8GRfxoL+NWwfAWZ3BBU2BOZ3frU084JT7EAajwBSXyW
bxJbq5frgCSBPS7dQLO4zZV+UHgJc6hGYlqlGxpx4DwxY0934R06xDU6HKwHrXug
PDV/HW9PbmlOAR2lKlIgJUcEazO2lJ7giGMfaUkk33cFeDmcTBrzA7G3e7KsLAiV
ovK3Td+2eIKOJ4B877Xyq+N1XYCyEiltUa9ySVN/c3yE6L8H7ad5A9r3kYTJ1yxh
t7OfYLN6FwENrZGSF3o1yAKl7c1/fO2TWbeKemI8UT6P2N8r5OBdkR+HPN1JIs/c
Plaj0xLfFOEjACQ0O8JQaAgSpo5DN55yuSjNSv92mi3BKcR7OGzH7QRLopxKyc4E
DR6caiQkwDPCxG14IRVfPm5gDrb7Wa0DkzgO8imfWy1zMSU4iLXdbT33ooC15YLW
MTWhhOB6n3iyUbWdcJEtAoIBAQDpkfwoMABePbEzaKTD5kiH5a+Snrh1sF9n2cwa
ZTEIf+QX4gRzYj3lNMqj0rzplv6wF9k9n3ruIMH8oaywO6uhM+OIvGS/ql9USw69
sJZIGYQrBAJZ7uKn2zbuEkEpOhfAbTZ7e3LLi4XiMyHUxrwHbo4U/tEOja+xclVx
G8a+wDplHHV5V9KTtKoeXHkP6d7S705u4S0KXhqA1qnv09WDTwhRdlh9rM41VwTz
EXqq5EzJB7S7oOLMe6szfqHVKuWtdB3JKbOVlhtY02RqfAqLf5NLwFku0KP61oVU
coPL1JH5WG2j0K7thJMHqBSxD8/qZmhEU7SjPb4rENGjxNcvAoIBAQDLuD9kEIrH
GeOiFwOdSg/eOMGqHNDfnZfCeQgTvFnqs5yXbTNL/PDilEZD4kDU+OoGNQbtoNpm
qfJzzunKaRrm/lkBvoGJWZaFFDhr8Lz8GfBoCE4mGv2EeHPddsZBYmvL9vLxmxG4
+aN4UlZ/PHTomLdgITf0d6XoAKl09KN0PfBDnF3VUdOq2uehfFV3cFdnDMWsfYnB
Ys5RVMWnRmxQaeorPXFAaHvjXEaqbj69mUvr2PwKI6FrifHSZwsWa826BZ/6Zz6L
2wUYYF/hLR0VhtgFPaoHCdcczH5UHpSZjdEmMLaNZOtyBUQwx6jGUKRQ8/25pima
HzwkZWxSb4jbAoIBADfRiX9ZKV1cRPLSOT4P1JmVjIXvpImLouFArYRJVpR/a9VB
UGr6uWwDV8Ia5Ma2LRuMN4CAknJCJdnoEUr0l6moquHMlA8x+iI85cLzZpbIckuN
Y7p2Wnhe7RusBSKDHZYBA5ozAFYge9h4+8bLz7e+9fmShAeEWM6BUmX7i12ettXf
HTvofwyJinZDBzOEYpnqUsYwzgDCSHct1eLYrxf4VTaSn8c4+vbIWwhzzur0MF2C
l/CXHFxd2aYuxyIYZFc1fsDKVH6VJuftbPv9tM9tp5fc2fNULTwO9EIgM9sMa+44
8crKXmOo4TJdOsSt0LRl0NkzX+H7KW1FUbRfoEUCggEAV4Wk1ly1Aq0AuxagGudC
wfooWelfY3LVTFurOK9nAgqAcB4eN7tH0lBZj7iYmecGw/vsKhM9QXYqD88JakiV
okAMBU/PXy76F9qEEvuudbC/NDK9QGnAGTWWscLhkh2yqkJCRcKVbp7xuDPHrYpP
v848mjQrUgBFatM9+l1QDBTAMIvxVEB/a5v4f8xm+5VsN32pP13/3PGSKib9c8wx
pKqcTE9tZHp/H0L5qScMFXDSyVTDk6eTJhxxpC9Y+B0Ambbo8C+DE5rZKYveJWO4
ZxMzo6zGa5eyr1C7xXAN75qaDIpJI54D+UyB62McA3eJ4K2yiBv3K5vXvttEGnaI
mQKCAQEApIi60M6DrXPylugZnruA7v8Beb0qsiHkqQErqbTo2HSe7W3+jObuiRdG
soQoTc5iLvZyWXNAM6qXrzJkIvS0Qsw1pyC0QmoA9DySDkVjA/AbnjK41nTBrTwl
ZFCUawURSi2OvYqFLBFbCmwPLlQQCbMgmHPqlYMJSj3G3LtURQX7n3pP23o3d/ZF
y/yuwGNmjbEYU5RxqnJcPEA+lmtcGenD3rCDDlNBHocBdvfTw4UYbHAFxdltuk64
2X6neO4fDKtFYNk54LYZLwQrQ3q4RI9y1o1B+0QzfM2Oowp5dMehmwOth1bxSzhd
ACeCTlUNHHFB14hL2b/ZuZ4dpc3ayw==
-----END PRIVATE KEY-----

View File

@ -2,9 +2,10 @@
Tests for the API /kube_rootca_update/ methods. Tests for the API /kube_rootca_update/ methods.
""" """
import json
import mock import mock
import os
from six.moves import http_client from six.moves import http_client
from sysinv.common import constants from sysinv.common import constants
from sysinv.common import health from sysinv.common import health
from sysinv.common import kubernetes from sysinv.common import kubernetes
@ -34,6 +35,9 @@ class FakeConductorAPI(object):
def __init__(self): def __init__(self):
self.service = ConductorManager('test-host', 'test-topic') self.service = ConductorManager('test-host', 'test-topic')
self.save_kubernetes_rootca_cert = self.fake_config_certificate
self.config_certificate_return = None
self.platcert_k8s_secret_value = False
def get_system_health(self, context, force=False, upgrade=False, def get_system_health(self, context, force=False, upgrade=False,
kube_upgrade=False, kube_rootca_update=False, kube_upgrade=False, kube_rootca_update=False,
@ -46,6 +50,12 @@ class FakeConductorAPI(object):
kube_rootca_update=kube_rootca_update, kube_rootca_update=kube_rootca_update,
alarm_ignore_list=alarm_ignore_list) alarm_ignore_list=alarm_ignore_list)
def fake_config_certificate(self, context, pem):
return self.config_certificate_return
def setup_config_certificate(self, data):
self.config_certificate_return = data
class TestKubeRootCAUpdate(base.FunctionalTest): class TestKubeRootCAUpdate(base.FunctionalTest):
@ -119,6 +129,8 @@ class TestPostKubeRootUpdate(TestKubeRootCAUpdate,
# Verify that the kubernetes rootca update has the expected attributes # Verify that the kubernetes rootca update has the expected attributes
self.assertEqual(result.json['state'], self.assertEqual(result.json['state'],
kubernetes.KUBE_ROOTCA_UPDATE_STARTED) kubernetes.KUBE_ROOTCA_UPDATE_STARTED)
self.assertNotEqual(result.json['from_rootca_cert'], None)
self.assertEqual(result.json['from_rootca_cert'], 'oldCertSerial')
def test_create_rootca_update_unhealthy_from_alarms(self): def test_create_rootca_update_unhealthy_from_alarms(self):
""" Test creation of kube rootca update while there are alarms""" """ Test creation of kube rootca update while there are alarms"""
@ -192,3 +204,41 @@ class TestPostKubeRootUpdate(TestKubeRootCAUpdate,
self.assertEqual(http_client.BAD_REQUEST, result.status_int) self.assertEqual(http_client.BAD_REQUEST, result.status_int)
self.assertIn("rootca update cannot be done while a platform upgrade", self.assertIn("rootca update cannot be done while a platform upgrade",
result.json['error_message']) result.json['error_message'])
class TestKubeRootCAUpload(TestKubeRootCAUpdate,
dbbase.ProvisionedControllerHostTestCase):
def setUp(self):
super(TestKubeRootCAUpload, self).setUp()
self.fake_conductor_api.service.dbapi = self.dbapi
@mock.patch.object(kubernetes.KubeOperator,
'kube_create_secret')
@mock.patch.object(kubernetes.KubeOperator,
'apply_custom_resource')
def test_upload_rootca(self, mock_create_secret, mock_create_custom_resource):
dbutils.create_test_kube_rootca_update(state=kubernetes.KUBE_ROOTCA_UPDATE_STARTED)
certfile = os.path.join(os.path.dirname(__file__), "data",
'rootca-with-key.pem')
fake_save_rootca_return = {'success': '137813-123', 'error': ''}
self.fake_conductor_api.\
setup_config_certificate(fake_save_rootca_return)
files = [('file', certfile)]
response = self.post_with_files('%s/%s' % ('/kube_rootca_update', 'upload'),
{},
upload_files=files,
headers={'User-Agent': 'sysinv-test'},
expect_errors=False)
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.OK)
resp = json.loads(response.body)
self.assertTrue(resp.get('success'))
self.assertEqual(resp.get('success'), fake_save_rootca_return.get('success'))
self.assertFalse(resp.get('error'))

View File

@ -27,6 +27,9 @@ import os.path
import tsconfig.tsconfig as tsc import tsconfig.tsconfig as tsc
import uuid import uuid
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from sysinv.agent import rpcapi as agent_rpcapi from sysinv.agent import rpcapi as agent_rpcapi
from sysinv.common import constants from sysinv.common import constants
from sysinv.common import device as dconstants from sysinv.common import device as dconstants
@ -68,6 +71,25 @@ class FakePopen(object):
class ManagerTestCase(base.DbTestCase): class ManagerTestCase(base.DbTestCase):
@staticmethod
def extract_certs_from_pem_file(certfile):
""" extract certificates from a X509 PEM file
"""
marker = b'-----BEGIN CERTIFICATE-----'
with open(certfile, 'rb') as f:
pem_contents = f.read()
start = 0
certs = []
while True:
index = pem_contents.find(marker, start)
if index == -1:
break
cert = x509.load_pem_x509_certificate(pem_contents[index::],
default_backend())
certs.append(cert)
start = index + len(marker)
return certs
def setUp(self): def setUp(self):
super(ManagerTestCase, self).setUp() super(ManagerTestCase, self).setUp()
@ -126,6 +148,13 @@ class ManagerTestCase(base.DbTestCase):
self._ready_to_apply_runtime_config self._ready_to_apply_runtime_config
self.addCleanup(self.ready_to_apply_runtime_config_patcher.stop) self.addCleanup(self.ready_to_apply_runtime_config_patcher.stop)
# Mock check_cert_validity
def mock_cert_validity(obj):
return None
self.mocked_cert_validity = mock.patch.object(cutils, 'check_cert_validity', mock_cert_validity)
self.mocked_cert_validity.start()
self.addCleanup(self.mocked_cert_validity.stop)
# Mock agent config_apply_runtime_manifest # Mock agent config_apply_runtime_manifest
def mock_agent_config_apply_runtime_manifest(obj, context, config_uuid, def mock_agent_config_apply_runtime_manifest(obj, context, config_uuid,
config_dict): config_dict):
@ -1840,6 +1869,124 @@ class ManagerTestCase(base.DbTestCase):
for key in fpga_dev_dict_update: for key in fpga_dev_dict_update:
self.assertEqual(dev[key], fpga_dev_dict_update[key]) self.assertEqual(dev[key], fpga_dev_dict_update[key])
def test_upload_rootca(self):
mock_kube_create_secret = mock.MagicMock()
p = mock.patch(
'sysinv.common.kubernetes.KubeOperator.kube_create_secret',
mock_kube_create_secret)
p.start()
self.addCleanup(p.stop)
mock_kube_create_issuer = mock.MagicMock()
q = mock.patch(
'sysinv.common.kubernetes.KubeOperator.apply_custom_resource',
mock_kube_create_issuer)
q.start()
self.addCleanup(q.stop)
mock_get_current_kube_rootca = mock.MagicMock()
z = mock.patch(
'sysinv.conductor.manager.ConductorManager._get_current_kube_rootca',
mock_get_current_kube_rootca
)
self.mock_current_kube_rootca = z.start()
self.mock_current_kube_rootca.return_value = 'test'
self.addCleanup(z.stop)
mock_get_secret = mock.MagicMock()
l = mock.patch(
'sysinv.common.kubernetes.KubeOperator.kube_get_secret',
mock_get_secret
)
self.mock_kube_get_secret = l.start()
self.addCleanup(l.stop)
mock_delete_secret = mock.MagicMock()
w = mock.patch(
'sysinv.common.kubernetes.KubeOperator.kube_delete_secret',
mock_delete_secret
)
self.mock_secret_delete = w.start()
self.addCleanup(w.stop)
utils.create_test_kube_rootca_update(state=kubernetes.KUBE_ROOTCA_UPDATE_STARTED)
file = os.path.join(os.path.dirname(__file__), "../api", "data",
'rootca-with-key.pem')
with open(file, 'rb') as certfile:
certfile.seek(0, os.SEEK_SET)
f = certfile.read()
resp = self.service.save_kubernetes_rootca_cert(self.context, f)
self.assertTrue(resp.get('success'))
self.assertFalse(resp.get('error'))
def test_upload_rootca_only_key(self):
mock_get_current_kube_rootca = mock.MagicMock()
z = mock.patch(
'sysinv.conductor.manager.ConductorManager._get_current_kube_rootca',
mock_get_current_kube_rootca
)
self.mock_current_kube_rootca = z.start()
self.mock_current_kube_rootca.return_value = 'test'
self.addCleanup(z.stop)
utils.create_test_kube_rootca_update(state=kubernetes.KUBE_ROOTCA_UPDATE_STARTED)
file = os.path.join(os.path.dirname(__file__), "../api", "data",
'only_key.pem')
with open(file, 'rb') as certfile:
certfile.seek(0, os.SEEK_SET)
f = certfile.read()
resp = self.service.save_kubernetes_rootca_cert(self.context, f)
self.assertTrue(resp.get('error'))
self.assertIn("Failed to extract certificate from file", resp.get('error'))
def test_upload_rootca_not_ca_certificate(self):
mock_get_current_kube_rootca = mock.MagicMock()
z = mock.patch(
'sysinv.conductor.manager.ConductorManager._get_current_kube_rootca',
mock_get_current_kube_rootca
)
self.mock_current_kube_rootca = z.start()
self.mock_current_kube_rootca.return_value = 'test'
self.addCleanup(z.stop)
utils.create_test_kube_rootca_update(state=kubernetes.KUBE_ROOTCA_UPDATE_STARTED)
file = os.path.join(os.path.dirname(__file__), "../api", "data", 'cert-with-key-SAN.pem')
with open(file, 'rb') as certfile:
certfile.seek(0, os.SEEK_SET)
f = certfile.read()
resp = self.service.save_kubernetes_rootca_cert(self.context, f)
self.assertTrue(resp.get('error'))
self.assertIn("certificate in the file is not a CA certificate", resp.get('error'))
def test_upload_rootca_not_in_progress(self):
file = os.path.join(os.path.dirname(__file__), "../api", "data",
'rootca-with-key.pem')
with open(file, 'rb') as certfile:
certfile.seek(0, os.SEEK_SET)
f = certfile.read()
resp = self.service.save_kubernetes_rootca_cert(self.context, f)
self.assertTrue(resp.get('error'))
self.assertIn("Kubernetes root CA update not started", resp.get('error'))
def test_upload_rootca_advanced_state(self):
utils.create_test_kube_rootca_update(state=kubernetes.KUBE_ROOTCA_UPDATING_PODS_TRUSTBOTHCAS)
file = os.path.join(os.path.dirname(__file__), "../api", "data",
'rootca-with-key.pem')
with open(file, 'rb') as certfile:
certfile.seek(0, os.SEEK_SET)
f = certfile.read()
resp = self.service.save_kubernetes_rootca_cert(self.context, f)
self.assertTrue(resp.get('error'))
self.assertIn("new root CA certificate already exists", resp.get('error'))
def test_device_update_image_status(self): def test_device_update_image_status(self):
mock_host_device_image_update_next = mock.MagicMock() mock_host_device_image_update_next = mock.MagicMock()