Support multiple CA certificates installation

This update enhanced sysinv certificate install API to be able to
install multiple CA certs from a file. The returns from the API call
indicates the certs actually installed in the call (ie, excluding these
that are already in the system). This is neccessary especially for DC to
support multiple CA certs synchronization.

This update also added sysinv certficate uninstall API. The API is to
be used to remove a particular CA certficate from the system, identified
by its uuid. The API returns a json body with information about the
certificate that has been removed. This is required by DC sysinv api
proxy for certificate deletion synchronization, since DC tracks subcloud
certificates resource by signature while the uninstall API request
contains only uuid.

The uninstall API only supports ssl_ca certificate.

cgtsclient and system CLI are also updated to align with the updated
and new APIs. User can use "system certificate-install ..." to install
one or multiple CA certificates, and "system certificate-uninstall ..."
to remove a particular CA certificate from the system.

When multiple CA certificates are installed in the system,
"system certificate-list" will display each of the individual
certificates.

THe sysinv certificate configuration API reference is updated with the
new uninstall API. Unit tests are added for CA certificate install and
delete APIs.

Change-Id: I7dba11e56792b7d198403c436c37f71d7b7193c9
Depends-On: https://review.opendev.org/#/c/711633/
Closes-Bug: 1861438
Closes-Bug: 1860995
Signed-off-by: Andy Ning <andy.ning@windriver.com>
This commit is contained in:
Andy Ning 2020-02-18 14:52:06 -05:00
parent 13e5ff6c49
commit c1c18871d7
15 changed files with 799 additions and 137 deletions

View File

@ -10917,7 +10917,7 @@ Install System Certificate
.. rest_method:: POST /v1/certificate/certificate_install
Accepts a PEM file containing the X509 certificate.
Accepts a PEM file containing the X509 certificates.
For security reasons, the original certificate, containing the private
key, will be removed, once the private key is processed.
@ -11025,6 +11025,59 @@ itemNotFound (404)
This operation does not accept a request body.
**************************
Deletes a CA certificate
**************************
.. rest_method:: DELETE /v1/certificate/{uuid}
**Normal response codes**
200
**Error response codes**
serviceUnavailable (503), badRequest (400), unauthorized (401),
forbidden (403), badMethod (405), overLimit (413), itemNotFound (404)
**Request parameters**
.. csv-table::
:header: "Parameter", "Style", "Type", "Description"
:widths: 20, 20, 20, 60
"uuid", "URI", "csapi:UUID", "The unique identifier of the CA Certificate."
**Response parameters**
.. csv-table::
:header: "Parameter", "Style", "Type", "Description"
:widths: 20, 20, 20, 60
"uuid (Optional)", "plain", "csapi:UUID", "The universally unique identifier for this object."
"certtype (Optional)", "plain", "xsd:string", "The type of the certificate."
"signature (Optional)", "plain", "xsd:string", "The signature of the certificate."
"details (Optional)", "plain", "xsd:string", "A dictionary of the certificate details."
"links (Optional)", "plain", "xsd:list", "For convenience, resources contain links to themselves. This allows a client to easily obtain rather than construct resource URIs. The following types of link relations are associated with resources: a self link containing a versioned link to the resource, and a bookmark link containing a permanent link to a resource that is appropriate for long term storage."
"created_at (Optional)", "plain", "xsd:dateTime", "The time when the object was created."
"updated_at (Optional)", "plain", "xsd:dateTime", "The time when the object was last updated."
"start_date (Optional)", "plain", "xsd:dateTime", "The time when the certificate becomes valid."
"expiry_date (Optional)", "plain", "xsd:dateTime", "The time when the certificate expires."
::
{
"uuid": "32e8053a-04de-468c-a3c3-6bf55be4d0e6",
"certtype": "ssl_ca",
"expiry_date": "2022-12-14T15:08:25+00:00",
"details": null,
"signature": "ssl_ca_9552807080826043442",
"start_date":"2020-02-24T15:08:25+00:00",
"issuer": null
}
This operation does not accept a request body.
---------------
Docker Registry
---------------

View File

@ -1,2 +1,2 @@
SRC_DIR="cgts-client"
TIS_PATCH_VER=74
TIS_PATCH_VER=75

View File

@ -36,3 +36,8 @@ class CertificateManager(base.Manager):
def certificate_install(self, certificate_file, data=None):
path = self._path("certificate_install")
return self._upload(path, certificate_file, data=data)
def certificate_uninstall(self, uuid):
path = self._path(uuid)
_, body = self.api.json_request('DELETE', path)
return body

View File

@ -100,9 +100,26 @@ def do_certificate_install(cc, args):
raise exc.CommandError('Certificate %s not installed: %s' %
(certificate_file, e))
else:
_print_certificate_show(response.get('certificates'))
certificates = response.get('certificates')
for certificate in certificates:
_print_certificate_show(certificate)
try:
os.remove(certificate_file)
except OSError:
raise exc.CommandError('Error: Could not remove the '
'certificate %s' % certificate_file)
@utils.arg('certificate_uuid', metavar='<certificate_uuid>',
help="UUID of certificate to uninstall")
@utils.arg('-m', '--mode',
metavar='<mode>',
help="Supported mode: 'ssl_ca'.")
def do_certificate_uninstall(cc, args):
"""Uninstall certificate."""
supported_modes = ['ssl_ca']
if args.mode not in supported_modes:
raise exc.CommandError('Unsupported mode: %s' % args.mode)
cc.certificate.certificate_uninstall(args.certificate_uuid)
print('Uninstalled certificate: %s' % (args.certificate_uuid))

View File

@ -1,2 +1,2 @@
SRC_DIR="sysinv"
TIS_PATCH_VER=344
TIS_PATCH_VER=345

View File

@ -27,7 +27,6 @@ import wsme
import wsmeext.pecan as wsme_pecan
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from pecan import expose
from pecan import rest
@ -42,6 +41,7 @@ from sysinv.api.controllers.v1 import utils
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils as cutils
from sysinv.openstack.common.rpc.common import RemoteError
from wsme import types as wtypes
LOG = log.getLogger(__name__)
@ -322,23 +322,33 @@ class CertificateController(rest.RestController):
error=("No certificates have been added, "
"invalid PEM document: %s" % e))
# Extract the certificate from the pem file
cert = x509.load_pem_x509_certificate(pem_contents,
default_backend())
msg = self._check_cert_validity(cert)
if msg is not True:
# Extract the certificates from the pem file
try:
certs = cutils.extract_certs_from_pem(pem_contents)
except Exception as e:
msg = "No certificates have been added, %s" % e
return dict(success="", error=msg)
if mode == constants.CERT_MODE_OPENSTACK:
domain, msg = _check_endpoint_domain_exists()
if domain:
msg = _check_cert_dns_name(cert, domain)
if msg is not True:
return dict(success="", error=msg.message)
elif msg:
if not certs:
msg = "No certificates have been added, " \
"no valid certificates found in file."
LOG.info(msg)
return dict(success="", error=msg)
for cert in certs:
msg = self._check_cert_validity(cert)
if msg is not True:
return dict(success="", error=msg)
if mode == constants.CERT_MODE_OPENSTACK:
domain, msg = _check_endpoint_domain_exists()
if domain:
msg = _check_cert_dns_name(cert, domain)
if msg is not True:
return dict(success="", error=msg.message)
elif msg:
return dict(success="", error=msg)
if mode == constants.CERT_MODE_TPM:
try:
tpm = pecan.request.dbapi.tpmconfig_get_one()
@ -364,63 +374,105 @@ class CertificateController(rest.RestController):
config_dict = {'passphrase': passphrase,
'mode': mode,
}
signature = pecan.request.rpcapi.config_certificate(
inv_certs = pecan.request.rpcapi.config_certificate(
pecan.request.context,
pem_contents,
config_dict)
except Exception as e:
except RemoteError as e:
msg = "Exception occurred e={}".format(e)
LOG.info(msg)
return dict(success="", error=str(e), body="", certificates={})
LOG.warn(msg)
return dict(success="", error=str(e.value), body="", certificates={})
# Update with installed certificate information
values = {
'certtype': mode,
# TODO(jkung) 'issuer': cert.issuer,
'signature': signature,
'start_date': cert.not_valid_before,
'expiry_date': cert.not_valid_after,
}
LOG.info("config_certificate values=%s" % values)
certificates = pecan.request.dbapi.certificate_get_list()
# ssl and ssl_tpm certs are mutual exclusive, so
# if the new cert is a SSL cert, delete the existing TPM cert as well
# if the new cert is a TPM cert, delete the existing SSL cert as well
for certificate in certificates:
if (mode == constants.CERT_MODE_SSL
and certificate.certtype == constants.CERT_MODE_TPM) or \
(mode == constants.CERT_MODE_TPM
and certificate.certtype == constants.CERT_MODE_SSL):
pecan.request.dbapi.certificate_destroy(certificate.uuid)
if mode in [constants.CERT_MODE_SSL, constants.CERT_MODE_TPM]:
if mode == constants.CERT_MODE_SSL:
remove_certtype = constants.CERT_MODE_TPM
# Create new or update existing certificates in sysinv with the
# information returned from conductor manager.
certificate_dicts = []
for inv_cert in inv_certs:
values = {
'certtype': mode,
'signature': inv_cert.get('signature'),
'start_date': inv_cert.get('not_valid_before'),
'expiry_date': inv_cert.get('not_valid_after'),
}
LOG.info("config_certificate values=%s" % values)
# check to see if the installed cert exist in sysinv
uuid = None
for certificate in certificates:
if mode == constants.CERT_MODE_SSL_CA:
if inv_cert.get('signature') == certificate.signature:
uuid = certificate.uuid
break
else:
if mode == certificate.certtype:
uuid = certificate.uuid
break
if uuid:
certificate = pecan.request.dbapi.certificate_update(uuid,
values)
else:
remove_certtype = constants.CERT_MODE_SSL
try:
remove_certificate = \
pecan.request.dbapi.certificate_get_by_certtype(
remove_certtype)
LOG.info("remove certificate certtype=%s uuid`=%s" %
(remove_certtype, remove_certificate.uuid))
pecan.request.dbapi.certificate_destroy(
remove_certificate.uuid)
except exception.CertificateTypeNotFound:
pass
try:
certificate = \
pecan.request.dbapi.certificate_get_by_certtype(
mode)
certificate = \
pecan.request.dbapi.certificate_update(certificate.uuid,
values)
except exception.CertificateTypeNotFound:
certificate = pecan.request.dbapi.certificate_create(values)
pass
sp_certificates_dict = certificate.as_dict()
LOG.debug("certificate_install sp_certificates={}".format(
sp_certificates_dict))
certificate = pecan.request.dbapi.certificate_create(values)
certificate_dict = certificate.as_dict()
LOG.debug("certificate_install certificate={}".format(
certificate_dict))
certificate_dicts.append(certificate_dict)
log_end = cutils.timestamped("certificate_do_post_end")
LOG.info("certificate %s" % log_end)
return dict(success="", error="", body="",
certificates=sp_certificates_dict)
certificates=certificate_dicts)
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(Certificate, types.uuid, status_code=200)
def delete(self, certificate_uuid):
"""Uninstall a certificate."""
# Only support ssl_ca cert type
log_start = cutils.timestamped("certificate_do_delete_start")
try:
certificate = pecan.request.dbapi.certificate_get(certificate_uuid)
except exception.InvalidParameterValue:
raise wsme.exc.ClientSideError(
_("No certificate found for %s" % certificate_uuid))
if certificate and \
certificate.certtype not in [constants.CERT_MODE_SSL_CA]:
msg = "Unupported mode: {}".format(certificate.certtype)
raise wsme.exc.ClientSideError(_(msg))
LOG.info("certificate %s certificate_uuid=%s" %
(log_start, certificate_uuid))
try:
pecan.request.rpcapi.delete_certificate(pecan.request.context,
certificate.certtype,
certificate.signature)
except RemoteError as e:
msg = "Exception occurred e={}".format(e)
LOG.warn(msg)
raise wsme.exc.ClientSideError(
_("Failed to delete the certificate: %s, %s" %
(certificate_uuid, str(e.value))))
pecan.request.dbapi.certificate_destroy(certificate_uuid)
log_end = cutils.timestamped("certificate_do_delete_end")
LOG.info("certificate %s" % log_end)
return Certificate.convert_with_links(certificate)
def _check_endpoint_domain_exists():

View File

@ -1300,6 +1300,7 @@ DOCKER_REGISTRY_PKCS1_KEY_FILE_SHARED = os.path.join(tsc.CONFIG_PATH,
SSL_CERT_CA_DIR = "/etc/pki/ca-trust/source/anchors/"
SSL_CERT_CA_FILE = os.path.join(SSL_CERT_CA_DIR, CERT_CA_FILE)
SSL_CERT_CA_FILE_SHARED = os.path.join(tsc.CONFIG_PATH, CERT_CA_FILE)
SSL_CERT_CA_LIST_SHARED_DIR = os.path.join(tsc.CONFIG_PATH, "ssl_ca")
KUBERNETES_PKI_SHARED_DIR = os.path.join(tsc.CONFIG_PATH, "kubernetes/pki")

View File

@ -28,6 +28,8 @@ import boto3
from botocore.config import Config
import collections
import contextlib
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import datetime
import errno
import functools
@ -2185,3 +2187,32 @@ def get_aws_ecr_registry_credentials(dbapi, registry, username, password):
"Failed to get AWS ECR credentials: %s" % e))
return dict(username=username, password=password)
def extract_certs_from_pem(pem_contents):
"""
Extract certificates from a pem string
:param pem_contents: A string in pem format
:return certs: A list of x509 cert objects
"""
marker = b'-----BEGIN CERTIFICATE-----'
start = 0
certs = []
while True:
index = pem_contents.find(marker, start)
if index == -1:
break
try:
cert = x509.load_pem_x509_certificate(pem_contents[index::],
default_backend())
except Exception:
LOG.exception(_("Load pem x509 certificate failed at file "
"location: %s") % index)
raise exception.SysinvException(_(
"Failed to load pem x509 certificate"))
certs.append(cert)
start = start + index + len(marker)
return certs

View File

@ -51,7 +51,6 @@ import tsconfig.tsconfig as tsc
from collections import namedtuple
from cgcs_patch.patch_verify import verify_files
from controllerconfig.upgrades import management as upgrades_management
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
@ -10028,24 +10027,19 @@ class ConductorManager(service.PeriodicService):
"""Extract keys from the pem contents
:param mode: mode one of: ssl, tpm_mode, docker_registry
:param pem_contents: pem_contents
:param pem_contents: pem_contents in unicode
:param cert_format: serialization.PrivateFormat
:param passphrase: passphrase for PEM file
:returns: private_bytes, public_bytes, signature
:returns: A list of {cert, private_bytes, public_bytes, signature}
"""
temp_pem_file = constants.SSL_PEM_FILE + '.temp'
with os.fdopen(os.open(temp_pem_file, os.O_CREAT | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'w') as f:
f.write(pem_contents)
if passphrase:
passphrase = str(passphrase)
private_bytes = None
private_mode = False
temp_pem_contents = pem_contents.encode("utf-8")
if mode in [constants.CERT_MODE_SSL,
constants.CERT_MODE_TPM,
constants.CERT_MODE_DOCKER_REGISTRY,
@ -10053,43 +10047,100 @@ class ConductorManager(service.PeriodicService):
]:
private_mode = True
with open(temp_pem_file, "r") as key_file:
if private_mode:
# extract private_key with passphrase
try:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=passphrase,
backend=default_backend())
except Exception as e:
raise exception.SysinvException(_("Error decrypting PEM "
"file: %s" % e))
key_file.seek(0)
# extract the certificate from the pem file
cert = x509.load_pem_x509_certificate(key_file.read(),
default_backend())
os.remove(temp_pem_file)
if private_mode:
# extract private_key with passphrase
try:
private_key = serialization.load_pem_private_key(
temp_pem_contents,
password=passphrase,
backend=default_backend())
except Exception as e:
raise exception.SysinvException(_("Error loading private key "
"from PEM data: %s" % e))
if not isinstance(private_key, rsa.RSAPrivateKey):
raise exception.SysinvException(_("Only RSA encryption based "
"Private Keys are supported."))
raise exception.SysinvException(_(
"Only RSA encryption based Private Keys are supported."))
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=cert_format,
encryption_algorithm=serialization.NoEncryption())
try:
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=cert_format,
encryption_algorithm=serialization.NoEncryption())
except Exception as e:
raise exception.SysinvException(_("Error loading private "
"bytes from PEM data: %s"
% e))
signature = mode + '_' + str(cert.serial_number)
if len(signature) > 255:
LOG.info("Truncating certificate serial no %s" % signature)
signature = signature[:255]
LOG.info("config_certificate signature=%s" % signature)
certs = cutils.extract_certs_from_pem(temp_pem_contents)
key_list = []
for cert in certs:
# format=serialization.PrivateFormat.TraditionalOpenSSL,
try:
public_bytes = cert.public_bytes(
encoding=serialization.Encoding.PEM)
except Exception as e:
raise exception.SysinvException(_("Error loading public "
"bytes from PEM data: %s"
% e))
# format=serialization.PrivateFormat.TraditionalOpenSSL,
public_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
signature = mode + '_' + str(cert.serial_number)
if len(signature) > 255:
LOG.info("Truncating certificate serial no %s" % signature)
signature = signature[:255]
LOG.info("config_certificate signature=%s" % signature)
return private_bytes, public_bytes, signature
key_list.append({'cert': cert,
'private_bytes': private_bytes,
'public_bytes': public_bytes,
'signature': signature})
return key_list
@staticmethod
def _get_public_bytes_one(key_list):
"""Get exactly one public bytes entry from key list"""
if len(key_list) != 1:
msg = "There should be exactly one certificate " \
"(ie, public_bytes) in the pem contents."
LOG.error(msg)
raise exception.SysinvException(_(msg))
return key_list[0].get('public_bytes')
@staticmethod
def _get_private_bytes_one(key_list):
"""Get exactly one private bytes entry from key list"""
if len(key_list) != 1:
msg = "There should be exactly one private key " \
"(ie, private_bytes) in the pem contents."
LOG.error(msg)
raise exception.SysinvException(_(msg))
return key_list[0].get('private_bytes')
@staticmethod
def _consolidate_cert_files():
# Cat all the cert files into one CA cert file and store it in
# the shared directory to update system CA certs
try:
new_cert_files = \
os.listdir(constants.SSL_CERT_CA_LIST_SHARED_DIR)
with os.fdopen(
os.open(constants.SSL_CERT_CA_FILE_SHARED,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_DEFAULT),
'wb') as f:
for fname in new_cert_files:
fname = \
os.path.join(constants.SSL_CERT_CA_LIST_SHARED_DIR,
fname)
with open(fname, "r") as infile:
f.write(infile.read())
except Exception as e:
msg = "Failed to consolidate cert files: %s" % str(e)
LOG.warn(msg)
raise exception.SysinvException(_(msg))
def _perform_config_certificate_tpm_mode(self, context,
tpm, private_bytes, public_bytes):
@ -10155,7 +10206,7 @@ class ConductorManager(service.PeriodicService):
LOG.info("config_certificate mode=%s" % mode)
private_bytes, public_bytes, signature = \
key_list = \
self._extract_keys_from_pem(mode, pem_contents,
serialization.PrivateFormat.PKCS8,
passphrase)
@ -10168,19 +10219,23 @@ class ConductorManager(service.PeriodicService):
pass
if mode == constants.CERT_MODE_TPM:
private_bytes = self._get_private_bytes_one(key_list)
public_bytes = self._get_public_bytes_one(key_list)
self._perform_config_certificate_tpm_mode(
context, tpm, private_bytes, public_bytes)
file_content = public_bytes
# copy the certificate to shared directory
with os.fdopen(os.open(constants.SSL_PEM_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(file_content)
elif mode == constants.CERT_MODE_SSL:
config_uuid = self._config_update_hosts(context, personalities)
private_bytes = self._get_private_bytes_one(key_list)
public_bytes = self._get_public_bytes_one(key_list)
file_content = private_bytes + public_bytes
config_dict = {
'personalities': personalities,
@ -10193,7 +10248,7 @@ class ConductorManager(service.PeriodicService):
# copy the certificate to shared directory
with os.fdopen(os.open(constants.SSL_PEM_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(file_content)
@ -10214,33 +10269,65 @@ class ConductorManager(service.PeriodicService):
config_dict)
elif mode == constants.CERT_MODE_SSL_CA:
file_content = public_bytes
personalities = [constants.CONTROLLER,
constants.WORKER,
constants.STORAGE]
# copy the certificate to shared directory
with os.fdopen(os.open(constants.SSL_CERT_CA_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_DEFAULT),
'wb') as f:
f.write(file_content)
# The list of the existing CA certs in sysinv DB.
certificates = self.dbapi.certificate_get_list()
certs_inv = [certificate.signature
for certificate in certificates
if certificate.certtype == mode]
# The list of the actual CA certs as files in FS
certs_file = os.listdir(constants.SSL_CERT_CA_LIST_SHARED_DIR)
config_uuid = self._config_update_hosts(context, personalities)
config_dict = {
"personalities": personalities,
"classes": ['platform::config::runtime']
}
self._config_apply_runtime_manifest(context,
config_uuid,
config_dict,
force=True)
# Remove these already installed from the key list
key_list_c = key_list[:]
for key in key_list_c:
if key.get('signature') in certs_inv \
and key.get('signature') in certs_file:
key_list.remove(key)
# Don't do anything if there are no new certs to install
if key_list:
# Save each cert in a separate file with signature as its name
try:
for key in key_list:
file_content = key.get('public_bytes')
file_name = \
os.path.join(constants.SSL_CERT_CA_LIST_SHARED_DIR,
key.get('signature'))
with os.fdopen(
os.open(file_name,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_DEFAULT),
'wb') as f:
f.write(file_content)
except Exception as e:
msg = "Failed to save cert file: %s" % str(e)
LOG.warn(msg)
raise exception.SysinvException(_(msg))
# consolidate the CA cert files into ca-cert.pem to update
# system CA certs.
self._consolidate_cert_files()
personalities = [constants.CONTROLLER,
constants.WORKER,
constants.STORAGE]
config_uuid = self._config_update_hosts(context, personalities)
config_dict = {
"personalities": personalities,
"classes": ['platform::config::runtime']
}
self._config_apply_runtime_manifest(context,
config_uuid,
config_dict,
force=True)
elif mode == constants.CERT_MODE_DOCKER_REGISTRY:
LOG.info("Docker registry certificate install")
# docker registry requires a PKCS1 key for the token server
pkcs1_private_bytes, pkcs1_public_bytes, pkcs1_signature = \
key_list_pkcs1 = \
self._extract_keys_from_pem(mode, pem_contents,
serialization.PrivateFormat
.TraditionalOpenSSL, passphrase)
pkcs1_private_bytes = self._get_private_bytes_one(key_list_pkcs1)
# install certificate, key, and pkcs1 key to controllers
config_uuid = self._config_update_hosts(context, personalities)
@ -10248,6 +10335,9 @@ class ConductorManager(service.PeriodicService):
cert_path = constants.DOCKER_REGISTRY_CERT_FILE
pkcs1_key_path = constants.DOCKER_REGISTRY_PKCS1_KEY_FILE
private_bytes = self._get_private_bytes_one(key_list)
public_bytes = self._get_public_bytes_one(key_list)
config_dict = {
'personalities': personalities,
'file_names': [key_path, cert_path, pkcs1_key_path],
@ -10261,17 +10351,17 @@ class ConductorManager(service.PeriodicService):
# copy certificate to shared directory
with os.fdopen(os.open(constants.DOCKER_REGISTRY_CERT_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(public_bytes)
with os.fdopen(os.open(constants.DOCKER_REGISTRY_KEY_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(private_bytes)
with os.fdopen(os.open(constants.DOCKER_REGISTRY_PKCS1_KEY_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(pkcs1_private_bytes)
@ -10306,6 +10396,9 @@ class ConductorManager(service.PeriodicService):
config_uuid = self._config_update_hosts(context, personalities)
key_path = constants.OPENSTACK_CERT_KEY_FILE
cert_path = constants.OPENSTACK_CERT_FILE
private_bytes = self._get_private_bytes_one(key_list)
public_bytes = self._get_public_bytes_one(key_list)
config_dict = {
'personalities': personalities,
'file_names': [key_path, cert_path],
@ -10320,12 +10413,12 @@ class ConductorManager(service.PeriodicService):
os.makedirs(constants.CERT_OPENSTACK_SHARED_DIR)
# copy the certificate to shared directory
with os.fdopen(os.open(constants.OPENSTACK_CERT_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(public_bytes)
with os.fdopen(os.open(constants.OPENSTACK_CERT_KEY_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(private_bytes)
@ -10342,7 +10435,9 @@ class ConductorManager(service.PeriodicService):
elif mode == constants.CERT_MODE_OPENSTACK_CA:
config_uuid = self._config_update_hosts(context, personalities)
file_content = public_bytes
file_content = ''
for key in key_list:
file_content += key.get('public_bytes', '')
config_dict = {
'personalities': personalities,
'file_names': [constants.OPENSTACK_CERT_CA_FILE],
@ -10353,7 +10448,7 @@ class ConductorManager(service.PeriodicService):
# copy the certificate to shared directory
with os.fdopen(os.open(constants.OPENSTACK_CERT_CA_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_DEFAULT),
'wb') as f:
f.write(file_content)
@ -10372,7 +10467,14 @@ class ConductorManager(service.PeriodicService):
LOG.warn(msg)
raise exception.SysinvException(_(msg))
return signature
inv_certs = []
for key in key_list:
inv_cert = {'signature': key.get('signature'),
'not_valid_before': key.get('cert').not_valid_before,
'not_valid_after': key.get('cert').not_valid_after}
inv_certs.append(inv_cert)
return inv_certs
def _config_selfsigned_certificate(self, context):
"""
@ -10392,7 +10494,7 @@ class ConductorManager(service.PeriodicService):
LOG.info("_config_selfsigned_certificate mode=%s file=%s" % (mode, certificate_file))
private_bytes, public_bytes, signature = \
key_list = \
self._extract_keys_from_pem(mode, pem_contents,
serialization.PrivateFormat.PKCS8,
passphrase)
@ -10400,6 +10502,8 @@ class ConductorManager(service.PeriodicService):
personalities = [constants.CONTROLLER]
config_uuid = self._config_update_hosts(context, personalities)
private_bytes = self._get_private_bytes_one(key_list)
public_bytes = self._get_public_bytes_one(key_list)
file_content = private_bytes + public_bytes
config_dict = {
'personalities': personalities,
@ -10412,12 +10516,54 @@ class ConductorManager(service.PeriodicService):
# copy the certificate to shared directory
with os.fdopen(os.open(constants.SSL_PEM_FILE_SHARED,
os.O_CREAT | os.O_WRONLY,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(file_content)
return signature
return key_list[0].get('signature')
def delete_certificate(self, context, mode, signature):
"""Delete a certificate by its mode and signature.
:param context: an admin context.
:param mode: the mode of the certificate
:param signature: the signature of the certificate.
Currently only ssl_ca cert can be deleted.
"""
LOG.info("delete_certificate mode=%s, signature=%s" %
(mode, signature))
if mode == constants.CERT_MODE_SSL_CA:
try:
cert_file = \
os.path.join(constants.SSL_CERT_CA_LIST_SHARED_DIR,
signature)
os.remove(cert_file)
except Exception as e:
msg = "Failed to delete cert file: %s" % str(e)
LOG.warn(msg)
raise exception.SysinvException(_(msg))
self._consolidate_cert_files()
personalities = [constants.CONTROLLER,
constants.WORKER,
constants.STORAGE]
config_uuid = self._config_update_hosts(context, personalities)
config_dict = {
"personalities": personalities,
"classes": ['platform::config::runtime']
}
self._config_apply_runtime_manifest(context,
config_uuid,
config_dict,
force=True)
else:
msg = "delete_certificate unsupported mode=%s" % mode
LOG.error(msg)
raise exception.SysinvException(_(msg))
def get_helm_chart_namespaces(self, context, chart_name):
"""Get supported chart namespaces.

View File

@ -1572,6 +1572,20 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
config_dict=config_dict,
))
def delete_certificate(self, context, mode, signature):
"""Synchronously, have the conductor delete the certificate.
:param context: request context.
:param mode: the mode of the certificate
:param signature: the signature of the certificate.
"""
return self.call(context,
self.make_msg('delete_certificate',
mode=mode,
signature=signature,
))
def get_helm_chart_namespaces(self, context, chart_name):
"""Get supported chart namespaces.

View File

@ -98,6 +98,25 @@ class FunctionalTest(base.TestCase):
print('GOT:%s' % response)
return response
def post_with_files(self, path, params, upload_files, expect_errors=False,
headers=None, method="post", extra_environ=None,
status=None, path_prefix=PATH_PREFIX):
full_path = path_prefix + path
if DEBUG_PRINTING:
print('%s: %s %s' % (method.upper(), full_path, params))
response = getattr(self.app, "%s" % method)(
str(full_path),
params,
upload_files=upload_files,
headers=headers,
status=status,
extra_environ=extra_environ,
expect_errors=expect_errors
)
if DEBUG_PRINTING:
print('GOT:%s' % response)
return response
def put_json(self, *args, **kwargs):
kwargs['method'] = 'put'
return self.post_json(*args, **kwargs)

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDfTCCAmWgAwIBAgIJAKW/fs28rzSQMA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV
BAYTAkNBMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg
Q29tcGFueSBMdGQxETAPBgNVBAMMCHRlc3RfY2ExMB4XDTIwMDMxODEzNDcyNloX
DTIzMDEwNjEzNDcyNlowVTELMAkGA1UEBhMCQ0ExFTATBgNVBAcMDERlZmF1bHQg
Q2l0eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDERMA8GA1UEAwwIdGVz
dF9jYTEwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCwIvCInUpgBlwz
+ZPr++dsL5UygKQjUwWkjp4NDxs2vYmCuhwgeLoOYwf9TCAIXD+9iR3rN+lUzqWH
NvJAfeW6q0cBnFf6NSI4gW0JVvJOUY2d0JJwLsQNyirI8ssxZcuoFr7iKb1rxnPM
Suyh1Ji+GeC8CPLnNdWZGvnNtPNOCpdK72l2uWPcBLSvU+/zGEkhw6yzoQhZBMAX
OXC4DIrAfcS7MehYpmLnmLdEn0MKLe9fssjuHSALos8FEszfU2Q5sdOO5HxV3+Ua
JyY4jnxuP5eq/VmzPnfjNJqYOTpX5ZZGr91LPvERaPybMwaGLHV/ZdrkAZntTWoM
F4JI2eb1AgMBAAGjUDBOMB0GA1UdDgQWBBTYewS81nc74bgd82r0OULsaCyvDTAf
BgNVHSMEGDAWgBTYewS81nc74bgd82r0OULsaCyvDTAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4IBAQCpbrpcKCAqgjUHDm9DbG9Y3NUED/gajE8+mJFvcjEC
CJlLISDoUrRpE/vqlVpoj8mPmMaSVd5doX6G6PSnA2hNnjLkts9OQGGbGpXYtkBN
WD09EnrJbeEtofc/eSgTO17ePirTBy2LJ0nTuTUlN2wkAhzOtrYI2fEw4ZqqLBkM
eOpUE3+A92/L4iqhCxyxv1DxvYNDRq7SvtS/TxkXRcsyPDrUR5/sOhn6Rcb0J9I8
pA37oiqiBRUnDoE2+IxRiCyC5/FYQdCIR8Y/2g8xpgY/trYFl5IDJbge+6jaCfMl
5NgkuCPTKCtPtfLKAWUfXV/FM58nyDYKuyreCr7lAnc0
-----END CERTIFICATE-----

View File

@ -0,0 +1,42 @@
-----BEGIN CERTIFICATE-----
MIIDfTCCAmWgAwIBAgIJAKW/fs28rzSQMA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV
BAYTAkNBMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg
Q29tcGFueSBMdGQxETAPBgNVBAMMCHRlc3RfY2ExMB4XDTIwMDMxODEzNDcyNloX
DTIzMDEwNjEzNDcyNlowVTELMAkGA1UEBhMCQ0ExFTATBgNVBAcMDERlZmF1bHQg
Q2l0eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDERMA8GA1UEAwwIdGVz
dF9jYTEwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCwIvCInUpgBlwz
+ZPr++dsL5UygKQjUwWkjp4NDxs2vYmCuhwgeLoOYwf9TCAIXD+9iR3rN+lUzqWH
NvJAfeW6q0cBnFf6NSI4gW0JVvJOUY2d0JJwLsQNyirI8ssxZcuoFr7iKb1rxnPM
Suyh1Ji+GeC8CPLnNdWZGvnNtPNOCpdK72l2uWPcBLSvU+/zGEkhw6yzoQhZBMAX
OXC4DIrAfcS7MehYpmLnmLdEn0MKLe9fssjuHSALos8FEszfU2Q5sdOO5HxV3+Ua
JyY4jnxuP5eq/VmzPnfjNJqYOTpX5ZZGr91LPvERaPybMwaGLHV/ZdrkAZntTWoM
F4JI2eb1AgMBAAGjUDBOMB0GA1UdDgQWBBTYewS81nc74bgd82r0OULsaCyvDTAf
BgNVHSMEGDAWgBTYewS81nc74bgd82r0OULsaCyvDTAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4IBAQCpbrpcKCAqgjUHDm9DbG9Y3NUED/gajE8+mJFvcjEC
CJlLISDoUrRpE/vqlVpoj8mPmMaSVd5doX6G6PSnA2hNnjLkts9OQGGbGpXYtkBN
WD09EnrJbeEtofc/eSgTO17ePirTBy2LJ0nTuTUlN2wkAhzOtrYI2fEw4ZqqLBkM
eOpUE3+A92/L4iqhCxyxv1DxvYNDRq7SvtS/TxkXRcsyPDrUR5/sOhn6Rcb0J9I8
pA37oiqiBRUnDoE2+IxRiCyC5/FYQdCIR8Y/2g8xpgY/trYFl5IDJbge+6jaCfMl
5NgkuCPTKCtPtfLKAWUfXV/FM58nyDYKuyreCr7lAnc0
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDfTCCAmWgAwIBAgIJAJKcXHBwS9zSMA0GCSqGSIb3DQEBCwUAMFUxCzAJBgNV
BAYTAkNBMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg
Q29tcGFueSBMdGQxETAPBgNVBAMMCHRlc3RfY2EyMB4XDTIwMDMxODIxMDQzMVoX
DTIzMDEwNjIxMDQzMVowVTELMAkGA1UEBhMCQ0ExFTATBgNVBAcMDERlZmF1bHQg
Q2l0eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDERMA8GA1UEAwwIdGVz
dF9jYTIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDKJSwLNnNf4djp
zep+cGn35u/AY7X7D/g1bETX7evDq9EQjSntZzjop/r6MxM57dCRRVSe9M8SsqUX
UBtUTe2sg30lVJqMP7WRT8p06ie/e6prHHUjcIFUd4xm8AmWORTXr0FsXr3mI2VJ
lW9ZDuF7tuuBuK67IAdA2T2snUjG+V5k0aW70JLisu2Mnhgn1o4+0UGOIc3UDQ/q
WfMsGN/rTZV/XbVyZJoi9iWKnhwpGLlgA9ouVr9WK1Co/ZMw05lrDjzLmG6niyBW
LUEET0ASnuaAV12EFpEvWIq9xk9wssBgf87WSF0Z/vk1++aKjF6lBfMKEhbz8hof
yFF9lQ07AgMBAAGjUDBOMB0GA1UdDgQWBBQSjySIXiA5Gdjhbl/EhpWyb12ErjAf
BgNVHSMEGDAWgBQSjySIXiA5Gdjhbl/EhpWyb12ErjAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4IBAQBzYgj4QUkspL65Hf1k47l9ptTPGm/XqqKBEPe2I9o6
9v0Ogfy3HwWgyUpN3cww6SN9xIPZAaBv+mbSDa/mw9woewJ8+gUBIM98rzJmfF9x
UUzuEBRuTT/K36QzblcgC+1RbLeLOQJ+TvTfnTFBh8+UF+GgUJAIKsGEOX7Ww5cw
OmfKDu56gNLqdlWT7tXKpc3m0DlADV0HrmeOoUoBRi0PdB5FfSXGnNc8vrEicpZO
Yo6E4ZCB0dRJhAgl4sVFNUw5xK1eXQPjkHNkd26zGNKb0u2G8XOxfbSXTTcU1gqb
Bl93WuquFHeLMPeX7w1+FPvP9kXA1ibBfrfHSyp65dXL
-----END CERTIFICATE-----

View File

@ -8,14 +8,33 @@
#
"""
Tests for the API /certificate_install/ methods.
Tests for the API /certificate_install/delete methods.
"""
import json
import mock
import os
import uuid as UUID
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from six.moves import http_client
from sysinv.api.controllers.v1 import certificate as cert_api
from sysinv.tests.api import base
from sysinv.tests.db import utils as dbutils
class FakeConductorAPI(object):
def __init__(self):
self.config_certificate = self.fake_config_certificate
self.delete_certificate = mock.MagicMock()
self.config_certificate_return = None
def fake_config_certificate(self, context, pem, config_dict):
return self.config_certificate_return
def setup_config_certificate(self, data):
self.config_certificate_return = data
class CertificateTestCase(base.FunctionalTest):
@ -137,3 +156,225 @@ class CertificateTestCase(base.FunctionalTest):
result = cert_api._check_cert_dns_name(cert, 'x.example.com')
self.assertIn("doesn't match", str(result))
class ApiCertificateTestCaseMixin(object):
# API_HEADERS are a generic header passed to most API calls
API_HEADERS = {'User-Agent': 'sysinv-test'}
# API_PREFIX is the prefix for the URL
API_PREFIX = '/certificate'
# RESULT_KEY is the python table key for the list of results
RESULT_KEY = 'certificates'
# COMMON_FIELD is a field that is known to exist for inputs and outputs
COMMON_FIELD = 'certificates'
# expected_api_fields are attributes that should be populated by
# an API query
expected_api_fields = ['uuid']
# hidden_api_fields are attributes that should not be populated by
# an API query
hidden_api_fields = []
def setUp(self):
super(ApiCertificateTestCaseMixin, self).setUp()
self.fake_conductor_api = FakeConductorAPI()
p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI')
self.mock_conductor_api = p.start()
self.mock_conductor_api.return_value = self.fake_conductor_api
self.addCleanup(p.stop)
def get_single_url(self, uuid):
return '%s/%s' % (self.API_PREFIX, uuid)
def _create_db_object(self, obj_id=None):
return dbutils.create_test_certificate(
id=obj_id, certtype='ssl_ca', signature='ssl_ca_123456789')
@staticmethod
def extract_certs_from_pem_file(certfile):
""" extract certificates from a X509 PEM file
"""
marker = b'-----BEGIN CERTIFICATE-----'
with open(certfile, 'rb') as f:
pem_contents = f.read()
start = 0
certs = []
while True:
index = pem_contents.find(marker, start)
if index == -1:
break
cert = x509.load_pem_x509_certificate(pem_contents[index::],
default_backend())
certs.append(cert)
start = start + index + len(marker)
return certs
@staticmethod
def get_cert_signature(mode, cert):
signature = mode + '_' + str(cert.serial_number)
if len(signature) > 255:
signature = signature[:255]
return signature
class ApiCertificatePostTestSuite(ApiCertificateTestCaseMixin,
base.FunctionalTest):
""" Certificate post operations
"""
def setUp(self):
super(ApiCertificatePostTestSuite, self).setUp()
self.create_test_isystem()
def create_test_isystem(self):
return dbutils.create_test_isystem(capabilities={'https_enabled': True})
# Test successful POST operation to install 1 CA certificate
def test_install_one_CA_certificate(self):
mode = 'ssl_ca'
certfile = os.path.join(os.path.dirname(__file__), "data",
'ca-cert-one-cert.pem')
in_certs = self.extract_certs_from_pem_file(certfile)
fake_config_certificate_return = []
for in_cert in in_certs:
fake_config_certificate_return.append(
{'signature': self.get_cert_signature(mode, in_cert),
'not_valid_before': in_cert.not_valid_before,
'not_valid_after': in_cert.not_valid_after})
self.fake_conductor_api.\
setup_config_certificate(fake_config_certificate_return)
data = {'mode': mode}
files = [('file', certfile)]
response = self.post_with_files('%s/%s' % (self.API_PREFIX, 'certificate_install'),
data,
upload_files=files,
headers=self.API_HEADERS,
expect_errors=False)
self.assertEqual(response.status_code, http_client.OK)
resp = json.loads(response.body)
self.assertIn('certificates', resp)
ret_certs = resp.get('certificates')
self.assertEqual(len(in_certs), len(ret_certs))
for ret_cert in ret_certs:
self.assertIn('certtype', ret_cert)
self.assertEqual(ret_cert.get('certtype'), mode)
self.assertIn('signature', ret_cert)
self.assertIn('start_date', ret_cert)
self.assertIn('expiry_date', ret_cert)
found_match = False
for in_cert in in_certs:
ret_cert_start_date = str(ret_cert.get('start_date'))
ret_cert_start_date = ret_cert_start_date.replace('+00:00', '')
ret_cert_expiry_date = str(ret_cert.get('expiry_date'))
ret_cert_expiry_date = \
ret_cert_expiry_date.replace('+00:00', '')
if ret_cert.get('signature') == \
self.get_cert_signature(mode, in_cert) and \
ret_cert_start_date == \
str(in_cert.not_valid_before) and \
ret_cert_expiry_date == \
str(in_cert.not_valid_after):
found_match = True
self.assertTrue(found_match)
# Test successful POST operation to install 2 CA certificate
def test_install_two_CA_certificate(self):
mode = 'ssl_ca'
certfile = os.path.join(os.path.dirname(__file__), "data",
'ca-cert-two-certs.pem')
in_certs = self.extract_certs_from_pem_file(certfile)
fake_config_certificate_return = []
for in_cert in in_certs:
fake_config_certificate_return.append(
{'signature': self.get_cert_signature(mode, in_cert),
'not_valid_before': in_cert.not_valid_before,
'not_valid_after': in_cert.not_valid_after})
self.fake_conductor_api.\
setup_config_certificate(fake_config_certificate_return)
data = {'mode': mode}
files = [('file', certfile)]
response = self.post_with_files('%s/%s' % (self.API_PREFIX,
'certificate_install'),
data,
upload_files=files,
headers=self.API_HEADERS,
expect_errors=False)
self.assertEqual(response.status_code, http_client.OK)
resp = json.loads(response.body)
self.assertIn('certificates', resp)
ret_certs = resp.get('certificates')
self.assertEqual(len(in_certs), len(ret_certs))
for ret_cert in ret_certs:
self.assertIn('certtype', ret_cert)
self.assertEqual(ret_cert.get('certtype'), mode)
self.assertIn('signature', ret_cert)
self.assertIn('start_date', ret_cert)
self.assertIn('expiry_date', ret_cert)
found_match = False
for in_cert in in_certs:
ret_cert_start_date = str(ret_cert.get('start_date'))
ret_cert_start_date = ret_cert_start_date.replace('+00:00', '')
ret_cert_expiry_date = str(ret_cert.get('expiry_date'))
ret_cert_expiry_date = \
ret_cert_expiry_date.replace('+00:00', '')
if ret_cert.get('signature') == \
self.get_cert_signature(mode, in_cert) and \
ret_cert_start_date == \
str(in_cert.not_valid_before) and \
ret_cert_expiry_date == \
str(in_cert.not_valid_after):
found_match = True
self.assertTrue(found_match)
class ApiCertificateDeleteTestSuite(ApiCertificateTestCaseMixin,
base.FunctionalTest):
""" Certificate delete operations
"""
def setUp(self):
super(ApiCertificateDeleteTestSuite, self).setUp()
self.delete_object = self._create_db_object()
# Test successful CA certficate DELETE operation
def test_delete_ca_certificate(self):
uuid = self.delete_object.uuid
certtype = self.delete_object.certtype
signature = self.delete_object.signature
response = self.delete(self.get_single_url(uuid),
headers=self.API_HEADERS,
expect_errors=False)
self.assertEqual(response.status_code, http_client.OK)
self.assertTrue(response.body)
resp = json.loads(response.body)
self.assertIn('uuid', resp)
self.assertEqual(uuid, resp.get('uuid'))
self.assertIn('certtype', resp)
self.assertEqual(certtype, resp.get('certtype'))
self.assertIn('signature', resp)
self.assertEqual(signature, resp.get('signature'))
# Test CA certficate DELETE operation, no certificate found
def test_delete_ca_certificate_not_found(self):
uuid = UUID.uuid4()
response = self.delete(self.get_single_url(uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
self.assertTrue(response.body)
resp = json.loads(response.body)
self.assertTrue(resp.get('error_message'))
fault_string_expected = 'No certificate found for %s' % uuid
self.assertIn(fault_string_expected, str(resp.get('error_message')))

View File

@ -1373,3 +1373,23 @@ def create_test_service_parameter(**kw):
def create_test_oam(**kw):
dbapi = db_api.get_instance()
return dbapi.iextoam_get_one()
# Create test certficate object
def get_test_certificate(**kw):
certificate = {
'id': kw.get('id'),
'uuid': kw.get('uuid'),
'certtype': kw.get('certtype'),
'signature': kw.get('signature')
}
return certificate
def create_test_certificate(**kw):
certificate = get_test_certificate(**kw)
# Let DB generate ID if it isn't specified explicitly
if 'id' not in kw:
del certificate['id']
dbapi = db_api.get_instance()
return dbapi.certificate_create(certificate)