Add new mode openldap_ca to certificate-install API
This change added a new mode "openldap_ca" to certificate-install sysinv API. With this mode, an openldap CA certificate can be installed as an k8s secret. Note that using this new mode to install openldap CA cert is only supported in the API, it is not supported by the certificate-install CLI, (will return invalid mode), since openldap certs are managed internally. This commit also include an unit test for the new mode. Test Plan: PASS: Call the API to install openldap CA certificate as k8s secret. PASS: While the secret exists, call the API to install the CA cert again, verify the secret is updated. PASS: Run "system certificate-install -m openldap_ca <cert file>", verify "Warning: Invalid mode: openldap_ca" is returned. PASS: DC multi-nodes subcloud upgrade. Story: 2009834 Task: 46893 Signed-off-by: Andy Ning <andy.ning@windriver.com> Change-Id: I47eba6353c8cb64b65b291acca9b9c96a7c0e466
This commit is contained in:
parent
1cacd6010a
commit
e6d29d951b
@ -83,8 +83,8 @@ def do_certificate_install(cc, args):
|
|||||||
data = {'passphrase': args.passphrase,
|
data = {'passphrase': args.passphrase,
|
||||||
'mode': args.mode}
|
'mode': args.mode}
|
||||||
|
|
||||||
if data['mode'] == 'openldap':
|
if data['mode'] in ['openldap', 'openldap_ca']:
|
||||||
raise exc.CommandError('Warning: Invalid mode: openldap')
|
raise exc.CommandError('Warning: Invalid mode: %s' % data['mode'])
|
||||||
|
|
||||||
has_private_key = False
|
has_private_key = False
|
||||||
try:
|
try:
|
||||||
|
@ -1553,12 +1553,14 @@ CERT_MODE_DOCKER_REGISTRY = 'docker_registry'
|
|||||||
CERT_MODE_OPENSTACK = 'openstack'
|
CERT_MODE_OPENSTACK = 'openstack'
|
||||||
CERT_MODE_OPENSTACK_CA = 'openstack_ca'
|
CERT_MODE_OPENSTACK_CA = 'openstack_ca'
|
||||||
CERT_MODE_OPENLDAP = 'openldap'
|
CERT_MODE_OPENLDAP = 'openldap'
|
||||||
|
CERT_MODE_OPENLDAP_CA = 'openldap_ca'
|
||||||
CERT_MODES_SUPPORTED = [CERT_MODE_SSL,
|
CERT_MODES_SUPPORTED = [CERT_MODE_SSL,
|
||||||
CERT_MODE_SSL_CA,
|
CERT_MODE_SSL_CA,
|
||||||
CERT_MODE_DOCKER_REGISTRY,
|
CERT_MODE_DOCKER_REGISTRY,
|
||||||
CERT_MODE_OPENSTACK,
|
CERT_MODE_OPENSTACK,
|
||||||
CERT_MODE_OPENSTACK_CA,
|
CERT_MODE_OPENSTACK_CA,
|
||||||
CERT_MODE_OPENLDAP,
|
CERT_MODE_OPENLDAP,
|
||||||
|
CERT_MODE_OPENLDAP_CA,
|
||||||
]
|
]
|
||||||
CERT_MODES_SUPPORTED_CERT_MANAGER = [CERT_MODE_SSL,
|
CERT_MODES_SUPPORTED_CERT_MANAGER = [CERT_MODE_SSL,
|
||||||
CERT_MODE_DOCKER_REGISTRY,
|
CERT_MODE_DOCKER_REGISTRY,
|
||||||
|
@ -13213,6 +13213,41 @@ class ConductorManager(service.PeriodicService):
|
|||||||
config_uuid,
|
config_uuid,
|
||||||
config_dict,
|
config_dict,
|
||||||
force=True)
|
force=True)
|
||||||
|
# Special mode for openldap CA certificate.
|
||||||
|
# This CA certificate will be stored in k8s as an opaque secret
|
||||||
|
elif mode == constants.CERT_MODE_OPENLDAP_CA:
|
||||||
|
kube_operator = kubernetes.KubeOperator()
|
||||||
|
public_bytes = self._get_public_bytes(cert_list)
|
||||||
|
cert_secret = base64.encode_as_text(public_bytes)
|
||||||
|
|
||||||
|
body = {
|
||||||
|
'apiVersion': 'v1',
|
||||||
|
'type': 'Opaque',
|
||||||
|
'kind': 'Secret',
|
||||||
|
'metadata': {
|
||||||
|
'name': constants.OPENLDAP_CA_CERT_SECRET_NAME,
|
||||||
|
'namespace': constants.CERT_NAMESPACE_PLATFORM_CA_CERTS
|
||||||
|
},
|
||||||
|
'data': {
|
||||||
|
'ca.crt': cert_secret,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
secret = kube_operator.kube_get_secret(
|
||||||
|
constants.OPENLDAP_CA_CERT_SECRET_NAME,
|
||||||
|
constants.CERT_NAMESPACE_PLATFORM_CA_CERTS)
|
||||||
|
if secret is not None:
|
||||||
|
kube_operator.kube_delete_secret(
|
||||||
|
constants.OPENLDAP_CA_CERT_SECRET_NAME,
|
||||||
|
constants.CERT_NAMESPACE_PLATFORM_CA_CERTS)
|
||||||
|
kube_operator.kube_create_secret(
|
||||||
|
constants.CERT_NAMESPACE_PLATFORM_CA_CERTS, body)
|
||||||
|
except Exception as e:
|
||||||
|
msg = "Failed to store openldap CA in k8s secret: %s" % str(e)
|
||||||
|
LOG.error(msg)
|
||||||
|
return msg
|
||||||
|
|
||||||
elif mode == constants.CERT_MODE_DOCKER_REGISTRY:
|
elif mode == constants.CERT_MODE_DOCKER_REGISTRY:
|
||||||
LOG.info("Docker registry certificate install")
|
LOG.info("Docker registry certificate install")
|
||||||
# docker registry requires a PKCS1 key for the token server
|
# docker registry requires a PKCS1 key for the token server
|
||||||
|
@ -625,6 +625,59 @@ class ApiCertificatePostTestSuite(ApiCertificateTestCaseMixin,
|
|||||||
resp = json.loads(response.body)
|
resp = json.loads(response.body)
|
||||||
self.assertIn('certificates', resp)
|
self.assertIn('certificates', resp)
|
||||||
|
|
||||||
|
# Test install an openldap_ca certificate
|
||||||
|
def test_install_openldap_ca_certificate(self):
|
||||||
|
mode = 'openldap_ca'
|
||||||
|
certfile = os.path.join(os.path.dirname(__file__), "data",
|
||||||
|
'ca-cert-one-cert.pem')
|
||||||
|
|
||||||
|
in_certs = self.extract_certs_from_pem_file(certfile)
|
||||||
|
fake_config_certificate_return = []
|
||||||
|
for index, in_cert in enumerate(in_certs):
|
||||||
|
fake_config_certificate_return.append(
|
||||||
|
{'signature': self.get_cert_signature(mode, in_cert),
|
||||||
|
'not_valid_before': in_cert.not_valid_before,
|
||||||
|
'not_valid_after': in_cert.not_valid_after})
|
||||||
|
self.fake_conductor_api.\
|
||||||
|
setup_config_certificate(fake_config_certificate_return)
|
||||||
|
|
||||||
|
data = {'mode': mode}
|
||||||
|
files = [('file', certfile)]
|
||||||
|
response = self.post_with_files('%s/%s' % (self.API_PREFIX, 'certificate_install'),
|
||||||
|
data,
|
||||||
|
upload_files=files,
|
||||||
|
headers=self.API_HEADERS,
|
||||||
|
expect_errors=False)
|
||||||
|
|
||||||
|
self.assertEqual(response.status_code, http_client.OK)
|
||||||
|
resp = json.loads(response.body)
|
||||||
|
self.assertIn('certificates', resp)
|
||||||
|
ret_certs = resp.get('certificates')
|
||||||
|
|
||||||
|
self.assertEqual(len(ret_certs), 1)
|
||||||
|
ret_cert = ret_certs[0]
|
||||||
|
in_cert = in_certs[0]
|
||||||
|
|
||||||
|
self.assertIn('certtype', ret_cert)
|
||||||
|
self.assertEqual(ret_cert.get('certtype'), mode)
|
||||||
|
self.assertIn('signature', ret_cert)
|
||||||
|
self.assertIn('start_date', ret_cert)
|
||||||
|
self.assertIn('expiry_date', ret_cert)
|
||||||
|
|
||||||
|
ret_cert_start_date = str(ret_cert.get('start_date'))
|
||||||
|
ret_cert_start_date = ret_cert_start_date.replace('+00:00', '')
|
||||||
|
ret_cert_expiry_date = str(ret_cert.get('expiry_date'))
|
||||||
|
ret_cert_expiry_date = ret_cert_expiry_date.replace('+00:00', '')
|
||||||
|
found_match = False
|
||||||
|
if ret_cert.get('signature') == \
|
||||||
|
self.get_cert_signature(mode, in_cert) and \
|
||||||
|
ret_cert_start_date == \
|
||||||
|
str(in_cert.not_valid_before) and \
|
||||||
|
ret_cert_expiry_date == \
|
||||||
|
str(in_cert.not_valid_after):
|
||||||
|
found_match = True
|
||||||
|
self.assertTrue(found_match)
|
||||||
|
|
||||||
|
|
||||||
class ApiCertificateDeleteTestSuite(ApiCertificateTestCaseMixin,
|
class ApiCertificateDeleteTestSuite(ApiCertificateTestCaseMixin,
|
||||||
base.FunctionalTest):
|
base.FunctionalTest):
|
||||||
|
Loading…
Reference in New Issue
Block a user