Use tls-certificates-interface library

Use tls-certificates-interface library from
https://charmhub.io/tls-certificates-interface/libraries/tls_certificates

Change-Id: I1d76469f46324d58c603047871acc4a848e52d4f
This commit is contained in:
Hemanth Nakkina
2022-11-23 16:09:54 +05:30
parent 2bd107debe
commit db81085058
4 changed files with 183 additions and 76 deletions

View File

@@ -142,7 +142,7 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
)
handlers.append(self.peers)
if self.can_add_handler("certificates", handlers):
self.certs = sunbeam_rhandlers.CertificatesHandler(
self.certs = sunbeam_rhandlers.TlsCertificatesHandler(
self,
"certificates",
self.configure_charm,

View File

@@ -26,7 +26,6 @@ from urllib.parse import (
urlparse,
)
import cryptography.hazmat.primitives.serialization as serialization
import ops.charm
import ops.framework
from ops.model import (
@@ -726,7 +725,7 @@ class CephClientHandler(RelationHandler):
return ctxt
class CertificatesHandler(RelationHandler):
class TlsCertificatesHandler(RelationHandler):
"""Handler for certificates interface."""
def __init__(
@@ -738,76 +737,173 @@ class CertificatesHandler(RelationHandler):
mandatory: bool = False,
) -> None:
"""Run constructor."""
# Lazy import to ensure this lib is only required if the charm
# has this relation.
import interface_tls_certificates.ca_client as ca_client
self.ca_client = ca_client
self.sans = sans
super().__init__(charm, relation_name, callback_f, mandatory)
def setup_event_handler(self) -> None:
"""Configure event handlers for peer relation."""
logger.debug("Setting up certificates event handler")
certs = self.ca_client.CAClient(
self.charm,
self.relation_name,
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
TLSCertificatesRequiresV1,
)
self.framework.observe(certs.on.ca_available, self._request_certs)
self.certificates = TLSCertificatesRequiresV1(
self.charm, "certificates"
)
self.framework.observe(self.charm.on.install, self._on_install)
self.framework.observe(
certs.on.tls_server_config_ready, self._certs_ready
self.charm.on.certificates_relation_joined,
self._on_certificates_relation_joined,
)
return certs
self.framework.observe(
self.charm.on.certificates_relation_broken,
self._on_certificates_relation_broken,
)
self.framework.observe(
self.certificates.on.certificate_available,
self._on_certificate_available,
)
self.framework.observe(
self.certificates.on.certificate_expiring,
self._on_certificate_expiring,
)
self.framework.observe(
self.certificates.on.certificate_expired,
self._on_certificate_expired,
)
return self.certificates
def _request_certs(self, event: ops.framework.EventBase) -> None:
"""Request Certificates."""
logger.debug(f"Requesting cert for {self.sans}")
self.interface.request_server_certificate(
self.model.unit.name.replace("/", "-"), self.sans
def _on_install(self, event: ops.framework.EventBase) -> None:
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
generate_private_key,
)
peer_relation = self.model.get_relation("peers")
if not peer_relation:
event.defer()
return
private_key = generate_private_key()
peer_relation.data[self.charm.model.unit].update(
{
"private_key": private_key.decode(),
}
)
def _on_certificates_relation_joined(
self, event: ops.framework.EventBase
) -> None:
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
generate_csr,
)
peer_relation = self.model.get_relation("peers")
if not peer_relation:
event.defer()
return
private_key = peer_relation.data[self.charm.model.unit].get(
"private_key"
)
csr = generate_csr(
private_key=private_key.encode(),
subject=self.charm.model.unit.name.replace("/", "-"),
sans=self.sans,
)
self.certificates.request_certificate_creation(
certificate_signing_request=csr
)
def _on_certificates_relation_broken(
self, event: ops.framework.EventBase
) -> None:
if self.mandatory:
self.status.set(BlockedStatus("integration missing"))
def _on_certificate_available(
self, event: ops.framework.EventBase
) -> None:
self.callback_f(event)
def _certs_ready(self, event: ops.framework.EventBase) -> None:
"""Request Certificates."""
self.callback_f(event)
def _on_certificate_expiring(self, event: ops.framework.EventBase) -> None:
logger.warning("Certificate getting expired")
self.status.set(ActiveStatus("Certificates are getting expired soon"))
def _on_certificate_expired(self, event: ops.framework.EventBase) -> None:
logger.warning("Certificate expired")
self.status.set(BlockedStatus("Certificates expired"))
def _get_csr_from_relation_unit_data(self) -> Optional[str]:
certificate_relations = list(self.model.relations[self.relation_name])
if not certificate_relations:
return None
# unit_data format:
# {"certificate_signing_requests": "['certificate_signing_request': 'CSRTEXT']"}
unit_data = certificate_relations[0].data[self.charm.model.unit]
csr = json.loads(unit_data.get("certificate_signing_requests", "[]"))
if not csr:
return None
csr = csr[0].get("certificate_signing_request", None)
return csr
def _get_cert_from_relation_data(self, csr: str) -> dict:
certificate_relations = list(self.model.relations[self.relation_name])
if not certificate_relations:
return {}
# app data format:
# {"certificates": "['certificate_signing_request': 'CSR',
# 'certificate': 'CERT', 'ca': 'CA', 'chain': 'CHAIN']"}
certs = certificate_relations[0].data[certificate_relations[0].app]
certs = json.loads(certs.get("certificates", "[]"))
for certificate in certs:
csr_from_app = certificate.get("certificate_signing_request", "")
if csr.strip() == csr_from_app.strip():
return {
"cert": certificate.get("certificate", None),
"ca": certificate.get("ca", None),
"chain": certificate.get("chain", []),
}
return {}
@property
def ready(self) -> bool:
"""Whether handler ready for use."""
return self.interface.is_server_cert_ready
csr_from_unit = self._get_csr_from_relation_unit_data()
if not csr_from_unit:
return False
certs = self._get_cert_from_relation_data(csr_from_unit)
return True if certs else False
def context(self) -> dict:
"""Certificates context."""
key = self.interface.server_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
cert = self.interface.server_certificate.public_bytes(
encoding=serialization.Encoding.PEM
)
try:
root_ca_chain = self.interface.root_ca_chain.public_bytes(
encoding=serialization.Encoding.PEM
)
except self.ca_client.CAClientError:
# A root ca chain is not always available. If configured to just
# use vault with self-signed certificates, you will not get a ca
# chain. Instead, you will get a CAClientError being raised. For
# now, use a bytes() object for the root_ca_chain as it shouldn't
# cause problems and if a ca_cert_chain comes later, then it will
# get updated.
root_ca_chain = bytes()
ca_cert = (
self.interface.ca_certificate.public_bytes(
encoding=serialization.Encoding.PEM
)
+ root_ca_chain
csr_from_unit = self._get_csr_from_relation_unit_data()
if not csr_from_unit:
return {}
certs = self._get_cert_from_relation_data(csr_from_unit)
cert = certs["cert"]
ca_cert = certs["ca"] + "\n" + "\n".join(certs["chain"])
peer_relation = self.model.get_relation("peers")
key = peer_relation.data[self.charm.model.unit].get(
"private_key", None
)
ctxt = {
"key": key.decode(),
"cert": cert.decode(),
"ca_cert": ca_cert.decode(),
"key": key,
"cert": cert,
"ca_cert": ca_cert,
}
return ctxt

View File

@@ -142,6 +142,24 @@ ULTaKTN2gp7E2BuxENtAyplrvLiXXYH3CqT528JgMdMm0al6X3MXo9WqbOg/KNpa
4JSyyuZ42yGmYlhMCimlk3kVnDxb8PJLWOFnx6f9/i0RWUqnY0nU
-----END RSA PRIVATE KEY-----"""
TEST_CSR = """-----BEGIN CERTIFICATE REQUEST-----
MIICxTCCAa0CAQAwRzEWMBQGA1UEAwwNb3ZuLWNlbnRyYWwtMDEtMCsGA1UELQwk
ZTFhZjIxMmEtNTUxOC00MjkyLWIxZTktZGM3NThlZTZkMzEwMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzrJfdNfHKyzu9dAnoEi2DE9nTQnkUjGVIMXg
8oCy5NgIm2ORoeXrdrr2ODIUepxX7M40cWmvoFiABK6DGgpP3wh6XosWftIc8hrX
/KaHtL4iru+dKJF9d5TNe+vWoFY1jh3k/+c+F59UhdoRbw4QcSTgLBvsb/XC8pdD
gc96GWgzyA5exZN9xXg8dvHCMKLLzCkAgHkMlkSPB6Ghi9bUeeXIYRvU8D+EMh+R
hKaSsvOxRyISgkvE0cGQ86NIuXkNvvvr8bFYNLMxBNkjZrqlZyqhEsq0eZAAm5iu
Fi33z3uBaA5d7V7wbYAxWuFlckdGTHql3vO/W2X3PHbT/TEBxQIDAQABoDkwNwYJ
KoZIhvcNAQkOMSowKDAmBgNVHREEHzAdggwxMC4xLjEwNy4yMTSCDTEwLjE1Mi4x
ODMuMTkwDQYJKoZIhvcNAQELBQADggEBADJSto8T6XiMdjMKekhS6SsQKNyijVJ0
cJr7x1u8FLEbCWlLRO9kMroz4i4iSu5xYcewNsRioiN4A56FuoOE8qCjzAHczR/8
Anah4rYJFt7wCu+RxfHEvBmSYgV0Rbq/KwjYnclCpTu/m5yrUmsI092+2AaOB/nA
c0Npr5oZZPeWL4S3+c02IxCeH1EwxIQtfprA/VgCWpEU25ImQb/c14KF5EQEHhv6
A5qVqdCCg4LlNrqiFYyVtHqDco+voq4W95KkkUYe20o16qOTwpR72qn75DagO/8I
R3iMBPwkhi4+igbliU/EMLltTj8pMilUhc1Ewuji4QZhsM2qxgZkcBk=
-----END CERTIFICATE REQUEST-----"""
class ContainerCalls:
"""Object to log container calls."""
@@ -518,16 +536,14 @@ def add_complete_ceph_relation(harness: Harness) -> None:
def add_certificates_relation_certs(harness: Harness, rel_id: str) -> None:
"""Add cert data to certificates relation."""
client_unit = harness.charm.unit.name.replace("/", "_")
cert = {
"certificate": TEST_SERVER_CERT,
"certificate_signing_request": TEST_CSR,
"ca": TEST_CA,
"chain": TEST_CHAIN,
}
harness.update_relation_data(
rel_id,
"vault/0",
{
f"{client_unit}.server.cert": TEST_SERVER_CERT,
f"{client_unit}.server.key": TEST_SERVER_KEY,
"chain": TEST_CHAIN,
"ca": TEST_CA,
},
rel_id, "vault", {"certificates": json.dumps([cert])}
)
@@ -535,8 +551,14 @@ def add_base_certificates_relation(harness: Harness) -> str:
"""Add certificates relation."""
rel_id = harness.add_relation("certificates", "vault")
harness.add_relation_unit(rel_id, "vault/0")
csr = {"certificate_signing_request": TEST_CSR}
harness.update_relation_data(
rel_id, "vault/0", {"ingress-address": "10.0.0.34"}
rel_id,
harness.charm.unit.name,
{
"ingress-address": "10.0.0.34",
"certificate_signing_requests": json.dumps([csr]),
},
)
return rel_id

View File

@@ -1,15 +1,4 @@
flake8-annotations
flake8-docstrings
coverage>=3.6
mock>=1.2
flake8
pyflakes
stestr>=2.2.0
requests>=2.18.4
psutil
# oslo.i18n dropped py35 support
oslo.i18n<4.0.0
git+https://github.com/openstack-charmers/zaza.git#egg=zaza
git+https://github.com/openstack-charmers/zaza-openstack-tests.git#egg=zaza.openstack
pytz # workaround for 14.04 pip/tox
pyudev # for ceph-* charm unit tests (not mocked?)
coverage
mock
stestr
requests