# Copyright 2021 Canonical Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Base classes for defining a charm using the Operator framework.""" import hashlib import json import logging import secrets import string from typing import ( Callable, Dict, FrozenSet, List, Optional, Tuple, Union, ) from urllib.parse import ( urlparse, ) import ops.charm import ops.framework from ops.model import ( ActiveStatus, BlockedStatus, SecretNotFoundError, UnknownStatus, WaitingStatus, ) import ops_sunbeam.compound_status as compound_status import ops_sunbeam.interfaces as sunbeam_interfaces logger = logging.getLogger(__name__) ERASURE_CODED = "erasure-coded" REPLICATED = "replicated" class RelationHandler(ops.framework.Object): """Base handler class for relations. A relation handler is used to manage a charms interaction with a relation interface. This includes: 1) Registering handlers to process events from the interface. The last step of these handlers is to make a callback to a specified method within the charm `callback_f` 2) Expose a `ready` property so the charm can check a relations readiness 3) A `context` method which returns a dict which pulls together data received and sent on an interface. """ def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool = False, ) -> None: """Run constructor.""" super().__init__( charm, # Ensure we can have multiple instances of a relation handler, # but only one per relation. key=type(self).__name__ + "_" + relation_name, ) self.charm = charm self.relation_name = relation_name self.callback_f = callback_f self.interface = self.setup_event_handler() self.mandatory = mandatory self.status = compound_status.Status(self.relation_name) self.charm.status_pool.add(self.status) self.set_status(self.status) def set_status(self, status: compound_status.Status) -> None: """Set the status based on current state. Will be called once, during construction, after everything else is initialised. Override this in a child class if custom logic should be used. """ if not self.model.relations.get(self.relation_name): if self.mandatory: status.set(BlockedStatus("integration missing")) else: status.set(UnknownStatus()) elif self.ready: status.set(ActiveStatus("")) else: status.set(WaitingStatus("integration incomplete")) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for the relation. This method must be overridden in concrete class implementations. """ raise NotImplementedError def get_interface(self) -> Tuple[ops.framework.Object, str]: """Return the interface that this handler encapsulates. This is a combination of the interface object and the name of the relation its wired into. """ return self.interface, self.relation_name def interface_properties(self) -> dict: """Extract properties of the interface.""" property_names = [ p for p in dir(self.interface) if isinstance(getattr(type(self.interface), p, None), property) ] properties = { p: getattr(self.interface, p) for p in property_names if not p.startswith("_") and p not in ["model"] } return properties @property def ready(self) -> bool: """Determine with the relation is ready for use.""" raise NotImplementedError def context(self) -> dict: """Pull together context for rendering templates.""" return self.interface_properties() def update_relation_data(self): """Update relation outside of relation context.""" raise NotImplementedError class IngressHandler(RelationHandler): """Base class to handle Ingress relations.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, service_name: str, default_ingress_port: int, callback_f: Callable, mandatory: bool = False, ) -> None: """Run constructor.""" self.default_ingress_port = default_ingress_port self.service_name = service_name super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for an Ingress relation.""" logger.debug("Setting up ingress event handler") from charms.traefik_k8s.v2.ingress import ( IngressPerAppRequirer, ) interface = IngressPerAppRequirer( self.charm, self.relation_name, port=self.default_ingress_port, ) self.framework.observe(interface.on.ready, self._on_ingress_ready) self.framework.observe(interface.on.revoked, self._on_ingress_revoked) return interface def _on_ingress_ready(self, event) -> None: # noqa: ANN001 """Handle ingress relation changed events. `event` is an instance of `charms.traefik_k8s.v2.ingress.IngressPerAppReadyEvent`. """ url = self.url logger.debug(f"Received url: {url}") if not url: return self.callback_f(event) def _on_ingress_revoked(self, event) -> None: # noqa: ANN001 """Handle ingress relation revoked event. `event` is an instance of `charms.traefik_k8s.v2.ingress.IngressPerAppRevokedEvent` """ # Callback call to update keystone endpoints self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) @property def ready(self) -> bool: """Whether the handler is ready for use.""" from charms.traefik_k8s.v2.ingress import ( DataValidationError, ) try: url = self.interface.url except DataValidationError: logger.debug( "Failed to fetch relation's url," " the root cause might a change to V2 Ingress, " "in this case, this error should go away.", exc_info=True, ) return False if url: return True return False @property def url(self) -> Optional[str]: """Return the URL used by the remote ingress service.""" if not self.ready: return None return self.interface.url def context(self) -> dict: """Context containing ingress data.""" parse_result = urlparse(self.url) return { "ingress_path": parse_result.path, } class IngressInternalHandler(IngressHandler): """Handler for Ingress relations on internal interface.""" class IngressPublicHandler(IngressHandler): """Handler for Ingress relations on public interface.""" class DBHandler(RelationHandler): """Handler for DB relations.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, database: str, mandatory: bool = False, ) -> None: """Run constructor.""" # a database name as requested by the charm. self.database_name = database super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for a MySQL relation.""" logger.debug("Setting up DB event handler") # Import here to avoid import errors if ops_sunbeam is being used # with a charm that doesn't want a DBHandler # and doesn't install this database_requires library. from charms.data_platform_libs.v0.database_requires import ( DatabaseRequires, ) # Alias is required to events for this db # from trigger handlers for other dbs. # It also must be a valid python identifier. alias = self.relation_name.replace("-", "_") db = DatabaseRequires( self.charm, self.relation_name, self.database_name, relations_aliases=[alias], ) self.framework.observe( # db.on[f"{alias}_database_created"], # this doesn't work because: # RuntimeError: Framework.observe requires a BoundEvent as # second parameter, got None: """Handle database change events.""" if not (event.username or event.password or event.endpoints): return data = event.relation.data[event.relation.app] display_data = {k: v for k, v in data.items()} if "password" in display_data: display_data["password"] = "REDACTED" logger.info(f"Received data: {display_data}") self.callback_f(event) def _on_database_relation_broken( self, event: ops.framework.EventBase ) -> None: """Handle database gone away event.""" if self.mandatory: self.status.set(BlockedStatus("integration missing")) def get_relation_data(self) -> dict: """Load the data from the relation for consumption in the handler.""" if len(self.interface.relations) > 0: return self.interface.relations[0].data[ self.interface.relations[0].app ] return {} @property def ready(self) -> bool: """Whether the handler is ready for use.""" data = self.get_relation_data() return bool( data.get("endpoints") and data.get("username") and data.get("password") ) def context(self) -> dict: """Context containing database connection data.""" if not self.ready: return {} data = self.get_relation_data() database_name = self.database_name database_host = data["endpoints"] database_user = data["username"] database_password = data["password"] database_type = "mysql+pymysql" has_tls = data.get("tls") tls_ca = data.get("tls-ca") connection = ( f"{database_type}://{database_user}:{database_password}" f"@{database_host}/{database_name}" ) if has_tls: connection = connection + f"?ssl_ca={tls_ca}" # This context ends up namespaced under the relation name # (normalised to fit a python identifier - s/-/_/), # and added to the context for jinja templates. # eg. if this DBHandler is added with relation name api-database, # the database connection string can be obtained in templates with # `api_database.connection`. return { "database": database_name, "database_host": database_host, "database_password": database_password, "database_user": database_user, "database_type": database_type, "connection": connection, } class RabbitMQHandler(RelationHandler): """Handler for managing a rabbitmq relation.""" DEFAULT_PORT = "5672" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, username: str, vhost: int, mandatory: bool = False, ) -> None: """Run constructor.""" self.username = username self.vhost = vhost super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for an AMQP relation.""" logger.debug("Setting up AMQP event handler") # Lazy import to ensure this lib is only required if the charm # has this relation. import charms.rabbitmq_k8s.v0.rabbitmq as sunbeam_rabbitmq amqp = sunbeam_rabbitmq.RabbitMQRequires( self.charm, self.relation_name, self.username, self.vhost ) self.framework.observe(amqp.on.ready, self._on_amqp_ready) self.framework.observe(amqp.on.goneaway, self._on_amqp_goneaway) return amqp def _on_amqp_ready(self, event: ops.framework.EventBase) -> None: """Handle AMQP change events.""" # Ready is only emitted when the interface considers # that the relation is complete (indicated by a password) self.callback_f(event) def _on_amqp_goneaway(self, event: ops.framework.EventBase) -> None: """Handle AMQP change events.""" # Goneaway is only emitted when the interface considers # that the relation is broken self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) @property def ready(self) -> bool: """Whether handler is ready for use.""" try: return bool(self.interface.password) and bool( self.interface.hostnames ) except (AttributeError, KeyError): return False def context(self) -> dict: """Context containing AMQP connection data.""" try: hosts = self.interface.hostnames except (AttributeError, KeyError): return {} if not hosts: return {} ctxt = super().context() ctxt["hostnames"] = list(set(ctxt["hostnames"])) ctxt["hosts"] = ",".join(ctxt["hostnames"]) ctxt["port"] = ctxt.get("ssl_port") or self.DEFAULT_PORT transport_url_hosts = ",".join( [ "{}:{}@{}:{}".format( self.username, ctxt["password"], host_, # TODO deal with IPv6 ctxt["port"], ) for host_ in ctxt["hostnames"] ] ) transport_url = "rabbit://{}/{}".format( transport_url_hosts, self.vhost ) ctxt["transport_url"] = transport_url return ctxt class AMQPHandler(RabbitMQHandler): """Backwards compatibility class for older library consumers.""" pass class IdentityServiceRequiresHandler(RelationHandler): """Handler for managing a identity-service relation.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, service_endpoints: dict, region: str, mandatory: bool = False, ) -> None: """Run constructor.""" self.service_endpoints = service_endpoints self.region = region super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for an Identity service relation.""" logger.debug("Setting up Identity Service event handler") import charms.keystone_k8s.v1.identity_service as sun_id id_svc = sun_id.IdentityServiceRequires( self.charm, self.relation_name, self.service_endpoints, self.region ) self.framework.observe( id_svc.on.ready, self._on_identity_service_ready ) self.framework.observe( id_svc.on.goneaway, self._on_identity_service_goneaway ) return id_svc def _on_identity_service_ready( self, event: ops.framework.EventBase ) -> None: """Handle AMQP change events.""" # Ready is only emitted when the interface considers # that the relation is complete (indicated by a password) self.callback_f(event) def _on_identity_service_goneaway( self, event: ops.framework.EventBase ) -> None: """Handle identity service gone away event.""" # Goneaway is only emitted when the interface considers # that the relation is broken or departed. self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) def update_service_endpoints(self, service_endpoints: dict) -> None: """Update service endpoints on the relation.""" self.service_endpoints = service_endpoints self.interface.register_services(service_endpoints, self.region) @property def ready(self) -> bool: """Whether handler is ready for use.""" try: return bool(self.interface.service_password) except (AttributeError, KeyError): return False class BasePeerHandler(RelationHandler): """Base handler for managing a peers relation.""" LEADER_READY_KEY = "leader_ready" def setup_event_handler(self) -> None: """Configure event handlers for peer relation.""" logger.debug("Setting up peer event handler") # Lazy import to ensure this lib is only required if the charm # has this relation. peer_int = sunbeam_interfaces.OperatorPeers( self.charm, self.relation_name, ) self.framework.observe( peer_int.on.peers_relation_joined, self._on_peers_relation_joined ) self.framework.observe( peer_int.on.peers_data_changed, self._on_peers_data_changed ) return peer_int def _on_peers_relation_joined( self, event: ops.framework.EventBase ) -> None: """Process peer joined event.""" self.callback_f(event) def _on_peers_data_changed(self, event: ops.framework.EventBase) -> None: """Process peer data changed event.""" self.callback_f(event) @property def ready(self) -> bool: """Whether the handler is complete.""" return bool(self.interface.peers_rel) def context(self) -> dict: """Return all app data set on the peer relation.""" try: _db = { k.replace("-", "_"): v for k, v in self.interface.get_all_app_data().items() } return _db except (AttributeError, KeyError): return {} def set_app_data(self, settings: dict) -> None: """Store data in peer app db.""" self.interface.set_app_data(settings) def get_app_data(self, key: str) -> Optional[str]: """Retrieve data from the peer relation.""" return self.interface.get_app_data(key) def leader_get(self, key: str) -> str: """Retrieve data from the peer relation.""" return self.peers.get_app_data(key) def leader_set(self, settings: dict, **kwargs) -> None: """Store data in peer app db.""" settings = settings or {} settings.update(kwargs) self.set_app_data(settings) def set_leader_ready(self) -> None: """Tell peers the leader is ready.""" self.set_app_data({self.LEADER_READY_KEY: json.dumps(True)}) def is_leader_ready(self) -> bool: """Whether the leader has announced it is ready.""" ready = self.get_app_data(self.LEADER_READY_KEY) if ready is None: return False else: return json.loads(ready) def set_unit_data(self, settings: Dict[str, str]) -> None: """Publish settings on the peer unit data bag.""" self.interface.set_unit_data(settings) def get_all_unit_values( self, key: str, include_local_unit: bool = False ) -> List[str]: """Retrieve value for key from all related units. :param include_local_unit: Include value set by local unit """ return self.interface.get_all_unit_values( key, include_local_unit=include_local_unit ) class CephClientHandler(RelationHandler): """Handler for ceph-client interface.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, allow_ec_overwrites: bool = True, app_name: str = None, mandatory: bool = False, ) -> None: """Run constructor.""" self.allow_ec_overwrites = allow_ec_overwrites self.app_name = app_name super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for an ceph-client interface.""" logger.debug("Setting up ceph-client event handler") # Lazy import to ensure this lib is only required if the charm # has this relation. import interface_ceph_client.ceph_client as ceph_client ceph = ceph_client.CephClientRequires( self.charm, self.relation_name, ) self.framework.observe( ceph.on.pools_available, self._on_pools_available ) self.framework.observe(ceph.on.broker_available, self.request_pools) return ceph def _on_pools_available(self, event: ops.framework.EventBase) -> None: """Handle pools available event.""" # Ready is only emitted when the interface considers # that the relation is complete self.callback_f(event) def request_pools(self, event: ops.framework.EventBase) -> None: """Request Ceph pool creation when interface broker is ready. The default handler will automatically request erasure-coded or replicated pools depending on the configuration of the charm from which the handler is being used. To provide charm specific behaviour, subclass the default handler and use the required broker methods on the underlying interface object. """ config = self.model.config.get data_pool_name = ( config("rbd-pool-name") or config("rbd-pool") or self.charm.app.name ) metadata_pool_name = ( config("ec-rbd-metadata-pool") or f"{self.charm.app.name}-metadata" ) weight = config("ceph-pool-weight") replicas = config("ceph-osd-replication-count") # TODO: add bluestore compression options if config("pool-type") == ERASURE_CODED: # General EC plugin config plugin = config("ec-profile-plugin") technique = config("ec-profile-technique") device_class = config("ec-profile-device-class") bdm_k = config("ec-profile-k") bdm_m = config("ec-profile-m") # LRC plugin config bdm_l = config("ec-profile-locality") crush_locality = config("ec-profile-crush-locality") # SHEC plugin config bdm_c = config("ec-profile-durability-estimator") # CLAY plugin config bdm_d = config("ec-profile-helper-chunks") scalar_mds = config("ec-profile-scalar-mds") # Profile name profile_name = ( config("ec-profile-name") or f"{self.charm.app.name}-profile" ) # Metadata sizing is approximately 1% of overall data weight # but is in effect driven by the number of rbd's rather than # their size - so it can be very lightweight. metadata_weight = weight * 0.01 # Resize data pool weight to accommodate metadata weight weight = weight - metadata_weight # Create erasure profile self.interface.create_erasure_profile( name=profile_name, k=bdm_k, m=bdm_m, lrc_locality=bdm_l, lrc_crush_locality=crush_locality, shec_durability_estimator=bdm_c, clay_helper_chunks=bdm_d, clay_scalar_mds=scalar_mds, device_class=device_class, erasure_type=plugin, erasure_technique=technique, ) # Create EC data pool self.interface.create_erasure_pool( name=data_pool_name, erasure_profile=profile_name, weight=weight, allow_ec_overwrites=self.allow_ec_overwrites, app_name=self.app_name, ) # Create EC metadata pool self.interface.create_replicated_pool( name=metadata_pool_name, replicas=replicas, weight=metadata_weight, app_name=self.app_name, ) else: self.interface.create_replicated_pool( name=data_pool_name, replicas=replicas, weight=weight, app_name=self.app_name, ) @property def ready(self) -> bool: """Whether handler ready for use.""" return self.interface.pools_available @property def key(self) -> str: """Retrieve the cephx key provided for the application.""" return self.interface.get_relation_data().get("key") def context(self) -> dict: """Context containing Ceph connection data.""" ctxt = super().context() data = self.interface.get_relation_data() ctxt["mon_hosts"] = ",".join(sorted(data.get("mon_hosts"))) ctxt["auth"] = data.get("auth") ctxt["key"] = data.get("key") ctxt["rbd_features"] = None return ctxt class TlsCertificatesHandler(RelationHandler): """Handler for certificates interface.""" class PeerKeyStore: """Store private key sercret id in peer storage relation.""" def __init__(self, relation, unit): self.relation = relation self.unit = unit def store_ready(self) -> bool: """Check if store is ready.""" return bool(self.relation) def get_private_key(self) -> str: """Return private key.""" try: key = self.relation.data[self.unit].get("private_key") except AttributeError: key = None return key def set_private_key(self, value: str): """Update private key.""" self.relation.data[self.unit]["private_key"] = value class LocalDBKeyStore: """Store private key sercret id in local unit db. This is a fallback for when the peer relation is not present. """ def __init__(self, state_db): self.state_db = state_db try: self.state_db.private_key except AttributeError: self.state_db.private_key = None def store_ready(self) -> bool: """Check if store is ready.""" return True def get_private_key(self) -> str: """Return private key.""" return self.state_db.private_key def set_private_key(self, value: str): """Update private key.""" self.state_db.private_key = value def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, sans_dns: List[str] = None, sans_ips: List[str] = None, mandatory: bool = False, ) -> None: """Run constructor.""" self._private_key = None self.sans_dns = sans_dns self.sans_ips = sans_ips super().__init__(charm, relation_name, callback_f, mandatory) try: self.store = self.PeerKeyStore( self.model.get_relation("peers"), self.charm.model.unit ) except KeyError: self.store = self.LocalDBKeyStore(charm._state) self.setup_private_key() def setup_event_handler(self) -> None: """Configure event handlers for tls relation.""" logger.debug("Setting up certificates event handler") # Lazy import to ensure this lib is only required if the charm # has this relation. from charms.tls_certificates_interface.v1.tls_certificates import ( TLSCertificatesRequiresV1, ) self.certificates = TLSCertificatesRequiresV1( self.charm, "certificates" ) self.framework.observe( self.charm.on.certificates_relation_joined, self._on_certificates_relation_joined, ) self.framework.observe( self.charm.on.certificates_relation_broken, self._on_certificates_relation_broken, ) self.framework.observe( self.certificates.on.certificate_available, self._on_certificate_available, ) self.framework.observe( self.certificates.on.certificate_expiring, self._on_certificate_expiring, ) self.framework.observe( self.certificates.on.certificate_expired, self._on_certificate_expired, ) return self.certificates def setup_private_key(self) -> None: """Create and store private key if needed.""" # Lazy import to ensure this lib is only required if the charm # has this relation. from charms.tls_certificates_interface.v1.tls_certificates import ( generate_private_key, ) if not self.store.store_ready(): logger.debug("Store not ready, cannot generate key") return if self.store.get_private_key(): logger.debug("Private key already present") private_key_secret_id = self.store.get_private_key() try: private_key_secret = self.model.get_secret( id=private_key_secret_id ) except SecretNotFoundError: # When a unit is departing its secrets are removed by Juju. # So trying to access the secret will result in # SecretNotFoundError. Given this secret is set by this # unit and only consumed by this unit it is unlikely there # is any other reason for the secret to be missing. logger.debug( "SecretNotFoundError not found, likely due to departing " "unit." ) return private_key_secret = self.model.get_secret( id=private_key_secret_id ) self._private_key = ( private_key_secret.get_content().get("private-key").encode() ) return self._private_key = generate_private_key() private_key_secret = self.model.unit.add_secret( {"private-key": self._private_key.decode()}, label=f"{self.charm.model.unit}-private-key", ) self.store.set_private_key(private_key_secret.id) @property def private_key(self): """Private key for certificates.""" logger.debug("Returning private key: {}".format(self._private_key)) if self._private_key: return self._private_key.decode() else: # Private key has not been set yet return None def update_relation_data(self): """Request certificates outside of relation context.""" if list(self.model.relations[self.relation_name]): self._request_certificates() else: logger.debug( "Not updating certificate request data, no relation found" ) def _on_certificates_relation_joined( self, event: ops.framework.EventBase ) -> None: """Request certificates in response to relation join event.""" self._request_certificates() def _request_certificates(self): """Request certificates from remote provider.""" # Lazy import to ensure this lib is only required if the charm # has this relation. from charms.tls_certificates_interface.v1.tls_certificates import ( generate_csr, ) if self.ready: logger.debug("Certificate request already complete.") return if self.private_key: logger.debug("Private key found, requesting certificates") else: logger.debug("Cannot request certificates, private key not found") return csr = generate_csr( private_key=self.private_key.encode(), subject=self.charm.model.unit.name.replace("/", "-"), sans_dns=self.sans_dns, sans_ip=self.sans_ips, ) self.certificates.request_certificate_creation( certificate_signing_request=csr ) def _on_certificates_relation_broken( self, event: ops.framework.EventBase ) -> None: if self.mandatory: self.status.set(BlockedStatus("integration missing")) def _on_certificate_available( self, event: ops.framework.EventBase ) -> None: self.callback_f(event) def _on_certificate_expiring(self, event: ops.framework.EventBase) -> None: logger.warning("Certificate getting expired") self.status.set(ActiveStatus("Certificates are getting expired soon")) def _on_certificate_expired(self, event: ops.framework.EventBase) -> None: logger.warning("Certificate expired") self.status.set(BlockedStatus("Certificates expired")) def _get_csr_from_relation_unit_data(self) -> Optional[str]: certificate_relations = list(self.model.relations[self.relation_name]) if not certificate_relations: return None # unit_data format: # {"certificate_signing_requests": "['certificate_signing_request': 'CSRTEXT']"} unit_data = certificate_relations[0].data[self.charm.model.unit] csr = json.loads(unit_data.get("certificate_signing_requests", "[]")) if not csr: return None csr = csr[0].get("certificate_signing_request", None) return csr def _get_cert_from_relation_data(self, csr: str) -> dict: certificate_relations = list(self.model.relations[self.relation_name]) if not certificate_relations: return {} # app data format: # {"certificates": "['certificate_signing_request': 'CSR', # 'certificate': 'CERT', 'ca': 'CA', 'chain': 'CHAIN']"} certs = certificate_relations[0].data[certificate_relations[0].app] certs = json.loads(certs.get("certificates", "[]")) for certificate in certs: csr_from_app = certificate.get("certificate_signing_request", "") if csr.strip() == csr_from_app.strip(): return { "cert": certificate.get("certificate", None), "ca": certificate.get("ca", None), "chain": certificate.get("chain", []), } return {} @property def ready(self) -> bool: """Whether handler ready for use.""" csr_from_unit = self._get_csr_from_relation_unit_data() if not csr_from_unit: return False certs = self._get_cert_from_relation_data(csr_from_unit) return True if certs else False def context(self) -> dict: """Certificates context.""" csr_from_unit = self._get_csr_from_relation_unit_data() if not csr_from_unit: return {} certs = self._get_cert_from_relation_data(csr_from_unit) cert = certs["cert"] ca_cert = certs["ca"] + "\n" + "\n".join(certs["chain"]) ctxt = { "key": self.private_key, "cert": cert, "ca_cert": ca_cert, } return ctxt class IdentityCredentialsRequiresHandler(RelationHandler): """Handles the identity credentials relation on the requires side.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool = False, ) -> None: """Create a new identity-credentials handler. Create a new IdentityCredentialsRequiresHandler that handles initial events from the relation and invokes the provided callbacks based on the event raised. :param charm: the Charm class the handler is for :type charm: ops.charm.CharmBase :param relation_name: the relation the handler is bound to :type relation_name: str :param callback_f: the function to call when the nodes are connected :type callback_f: Callable """ super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for identity-credentials relation.""" import charms.keystone_k8s.v0.identity_credentials as identity_credentials logger.debug("Setting up the identity-credentials event handler") credentials_service = identity_credentials.IdentityCredentialsRequires( self.charm, self.relation_name, ) self.framework.observe( credentials_service.on.ready, self._credentials_ready ) self.framework.observe( credentials_service.on.goneaway, self._credentials_goneaway ) return credentials_service def _credentials_ready(self, event: ops.framework.EventBase) -> None: """React to credential ready event.""" self.callback_f(event) def _credentials_goneaway(self, event: ops.framework.EventBase) -> None: """React to credential goneaway event.""" self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) @property def ready(self) -> bool: """Whether handler is ready for use.""" try: return bool(self.interface.password) except (AttributeError, KeyError): return False class IdentityResourceRequiresHandler(RelationHandler): """Handles the identity resource relation on the requires side.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool = False, ): """Create a new identity-ops handler. Create a new IdentityResourceRequiresHandler that handles initial events from the relation and invokes the provided callbacks based on the event raised. :param charm: the Charm class the handler is for :type charm: ops.charm.CharmBase :param relation_name: the relation the handler is bound to :type relation_name: str :param callback_f: the function to call when the nodes are connected :type callback_f: Callable :param mandatory: If the relation is mandatory to proceed with configuring charm :type mandatory: bool """ super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self): """Configure event handlers for an Identity resource relation.""" import charms.keystone_k8s.v0.identity_resource as ops_svc logger.debug("Setting up Identity Resource event handler") ops_svc = ops_svc.IdentityResourceRequires( self.charm, self.relation_name, ) self.framework.observe( ops_svc.on.provider_ready, self._on_provider_ready, ) self.framework.observe( ops_svc.on.provider_goneaway, self._on_provider_goneaway, ) self.framework.observe( ops_svc.on.response_available, self._on_response_available, ) return ops_svc def _on_provider_ready(self, event) -> None: """Handles provider_ready event.""" logger.debug( "Identity ops provider available and ready to process any requests" ) self.callback_f(event) def _on_provider_goneaway(self, event) -> None: """Handles provider_goneaway event.""" logger.info("Keystone provider not available process any requests") self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) def _on_response_available(self, event) -> None: """Handles response available events.""" logger.info("Handle response from identity ops") self.callback_f(event) @property def ready(self) -> bool: """Whether handler is ready for use.""" return self.interface.ready() class CeilometerServiceRequiresHandler(RelationHandler): """Handle ceilometer service relation on the requires side.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool = False, ): """Create a new ceilometer-service handler. Create a new CeilometerServiceRequiresHandler that handles initial events from the relation and invokes the provided callbacks based on the event raised. :param charm: the Charm class the handler is for :type charm: ops.charm.CharmBase :param relation_name: the relation the handler is bound to :type relation_name: str :param callback_f: the function to call when the nodes are connected :type callback_f: Callable :param mandatory: If the relation is mandatory to proceed with configuring charm :type mandatory: bool """ super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> None: """Configure event handlers for Ceilometer service relation.""" import charms.ceilometer_k8s.v0.ceilometer_service as ceilometer_svc logger.debug("Setting up Ceilometer service event handler") svc = ceilometer_svc.CeilometerServiceRequires( self.charm, self.relation_name, ) self.framework.observe( svc.on.config_changed, self._on_config_changed, ) self.framework.observe( svc.on.goneaway, self._on_goneaway, ) return svc def _on_config_changed(self, event: ops.framework.EventBase) -> None: """Handle config_changed event.""" logger.debug( "Ceilometer service provider config changed event received" ) self.callback_f(event) def _on_goneaway(self, event: ops.framework.EventBase) -> None: """Handle gone_away event.""" logger.debug("Ceilometer service relation is departed/broken") self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) @property def ready(self) -> bool: """Whether handler is ready for use.""" try: return bool(self.interface.telemetry_secret) except (AttributeError, KeyError): return False class CephAccessRequiresHandler(RelationHandler): """Handles the ceph access relation on the requires side.""" def __init__( self, charm: ops.charm.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool = False, ) -> None: """Create a new ceph-access handler. Create a new CephAccessRequiresHandler that handles initial events from the relation and invokes the provided callbacks based on the event raised. :param charm: the Charm class the handler is for :type charm: ops.charm.CharmBase :param relation_name: the relation the handler is bound to :type relation_name: str :param callback_f: the function to call when the nodes are connected :type callback_f: Callable """ super().__init__(charm, relation_name, callback_f, mandatory) def setup_event_handler(self) -> ops.framework.Object: """Configure event handlers for ceph-access relation.""" import charms.cinder_ceph_k8s.v0.ceph_access as ceph_access logger.debug("Setting up the ceph-access event handler") ceph_access = ceph_access.CephAccessRequires( self.charm, self.relation_name, ) self.framework.observe(ceph_access.on.ready, self._ceph_access_ready) self.framework.observe( ceph_access.on.goneaway, self._ceph_access_goneaway ) return ceph_access def _ceph_access_ready(self, event: ops.framework.EventBase) -> None: """React to credential ready event.""" self.callback_f(event) def _ceph_access_goneaway(self, event: ops.framework.EventBase) -> None: """React to credential goneaway event.""" self.callback_f(event) if self.mandatory: self.status.set(BlockedStatus("integration missing")) @property def ready(self) -> bool: """Whether handler is ready for use.""" try: return bool(self.interface.ready) except (AttributeError, KeyError): return False def context(self) -> dict: """Context containing Ceph access data.""" ctxt = super().context() data = self.interface.ceph_access_data ctxt["key"] = data.get("key") ctxt["uuid"] = data.get("uuid") return ctxt ExtraOpsProcess = Callable[[ops.EventBase, dict], None] class UserIdentityResourceRequiresHandler(RelationHandler): """Handle user management on IdentityResource relation.""" CREDENTIALS_SECRET_PREFIX = "user-identity-resource-" CONFIGURE_SECRET_PREFIX = "configure-credential-" resource_identifiers: FrozenSet[str] = frozenset( { "name", "email", "description", "domain", "project", "project_domain", "enable", "may_exist", } ) def __init__( self, charm: ops.CharmBase, relation_name: str, callback_f: Callable, mandatory: bool, name: str, domain: str, email: Optional[str] = None, description: Optional[str] = None, project: Optional[str] = None, project_domain: Optional[str] = None, enable: bool = True, may_exist: bool = True, role: Optional[str] = None, add_suffix: bool = False, rotate: ops.SecretRotate = ops.SecretRotate.NEVER, extra_ops: Optional[List[Union[dict, Callable]]] = None, extra_ops_process: Optional[ExtraOpsProcess] = None, ): self.username = name super().__init__(charm, relation_name, callback_f, mandatory) self.charm = charm self.add_suffix = add_suffix # add_suffix is used to add suffix to username to create unique user self.role = role self.rotate = rotate self.extra_ops = extra_ops self.extra_ops_process = extra_ops_process self._params = {} _locals = locals() for keys in self.resource_identifiers: value = _locals.get(keys) if value is not None: self._params[keys] = value def setup_event_handler(self) -> ops.Object: """Configure event handlers for the relation.""" import charms.keystone_k8s.v0.identity_resource as id_ops logger.debug("Setting up Identity Resource event handler") ops_svc = id_ops.IdentityResourceRequires( self.charm, self.relation_name, ) self.framework.observe( ops_svc.on.provider_ready, self._on_provider_ready, ) self.framework.observe( ops_svc.on.provider_goneaway, self._on_provider_goneaway, ) self.framework.observe( ops_svc.on.response_available, self._on_response_available, ) self.framework.observe( self.charm.on.secret_changed, self._on_secret_changed ) self.framework.observe( self.charm.on.secret_rotate, self._on_secret_rotate ) self.framework.observe( self.charm.on.secret_remove, self._on_secret_remove ) return ops_svc def _hash_ops(self, ops: list) -> str: """Hash ops request.""" return hashlib.sha256(json.dumps(ops).encode()).hexdigest() @property def label(self) -> str: """Secret label to share over keystone resource relation.""" return self.CREDENTIALS_SECRET_PREFIX + self.username @property def config_label(self) -> str: """Secret label to template configuration from.""" return self.CONFIGURE_SECRET_PREFIX + self.username @property def _create_user_tag(self) -> str: return "create_user_" + self.username @property def _delete_user_tag(self) -> str: return "delete_user_" + self.username def random_string(self, length: int) -> str: """Utility function to generate secure random string.""" alphabet = string.ascii_letters + string.digits return "".join(secrets.choice(alphabet) for i in range(length)) def _ensure_credentials(self, refresh_user: bool = False) -> str: credentials_id = self.charm.leader_get(self.label) suffix_length = 6 password_length = 18 if credentials_id: if refresh_user: username = self.username if self.add_suffix: suffix = self.random_string(suffix_length) username += "-" + suffix secret = self.model.get_secret(id=credentials_id) secret.set_content( { "username": username, "password": self.random_string(password_length), } ) return credentials_id username = self.username password = self.random_string(password_length) if self.add_suffix: suffix = self.random_string(suffix_length) username += "-" + suffix secret = self.model.app.add_secret( {"username": username, "password": password}, label=self.label, rotate=self.rotate, ) self.charm.leader_set({self.label: secret.id}) return secret.id # type: ignore[union-attr] def _grant_ops_secret(self, relation: ops.Relation): secret = self.model.get_secret(id=self._ensure_credentials()) secret.grant(relation) def _get_credentials(self) -> Tuple[str, str]: credentials_id = self._ensure_credentials() secret = self.model.get_secret(id=credentials_id) content = secret.get_content() return content["username"], content["password"] def get_config_credentials(self) -> Optional[Tuple[str, str]]: """Get credential from config secret.""" credentials_id = self.charm.leader_get(self.config_label) if not credentials_id: return None secret = self.model.get_secret(id=credentials_id) content = secret.get_content() return content["username"], content["password"] def _update_config_credentials(self) -> bool: """Update config credentials. Returns True if credentials are updated, False otherwise. """ credentials_id = self.charm.leader_get(self.config_label) username, password = self._get_credentials() content = {"username": username, "password": password} if credentials_id is None: secret = self.model.app.add_secret( content, label=self.config_label ) self.charm.leader_set({self.config_label: secret.id}) return True secret = self.model.get_secret(id=credentials_id) old_content = secret.get_content() if old_content != content: secret.set_content(content) return True return False def _create_user_request(self) -> dict: credentials_id = self._ensure_credentials() username, _ = self._get_credentials() requests = [] domain = self._params["domain"] create_domain = { "name": "create_domain", "params": {"name": domain, "enable": True}, } requests.append(create_domain) if self.role: create_role = { "name": "create_role", "params": {"name": self.role}, } requests.append(create_role) params = self._params.copy() params.pop("name", None) create_user = { "name": "create_user", "params": { "name": username, "password": credentials_id, **params, }, } requests.append(create_user) requests.extend(self._create_role_requests(username, domain)) if self.extra_ops: for extra_op in self.extra_ops: if isinstance(extra_op, dict): requests.append(extra_op) elif callable(extra_op): requests.append(extra_op()) else: logger.debug(f"Invalid type of extra_op: {extra_op!r}") request = { "id": self._hash_ops(requests), "tag": self._create_user_tag, "ops": requests, } return request def _create_role_requests( self, username, domain: Optional[str] ) -> List[dict]: requests = [] if self.role: params = { "role": self.role, } if domain: params["domain"] = domain params["user_domain"] = domain project_domain = self._params.get("project_domain") if project_domain: params["project_domain"] = project_domain params["user"] = username grant_role_domain = {"name": "grant_role", "params": params} requests.append(grant_role_domain) project = self._params.get("project") if project: requests.append( { "name": "show_project", "params": { "name": project, "domain": project_domain or domain, }, } ) params = { "project": "{{ show_project[0].id }}", "role": "{{ create_role[0].id }}", "user": "{{ create_user[0].id }}", "user_domain": "{{ create_domain[0].id }}", } if project_domain: params[ "project_domain" ] = "{{ show_project[0].domain_id }}" requests.append( { "name": "grant_role", "params": params, } ) return requests def _delete_user_request(self, users: List[str]) -> dict: requests = [] for user in users: params = {"name": user} domain = self._params.get("domain") if domain: params["domain"] = domain requests.append( { "name": "delete_user", "params": params, } ) return { "id": self._hash_ops(requests), "tag": self._delete_user_tag, "ops": requests, } def _process_create_user_response(self, response: dict) -> None: if {op.get("return-code") for op in response.get("ops", [])} == {0}: logger.debug("Create user completed.") config_credentials = self.get_config_credentials() credentials_updated = self._update_config_credentials() if config_credentials and credentials_updated: username = config_credentials[0] self.add_user_to_delete_user_list(username) else: logger.debug("Error in creation of user ops " f"{response}") def add_user_to_delete_user_list(self, user: str) -> None: """Update users list to delete.""" logger.debug(f"Adding user to delete list {user}") old_users = self.charm.leader_get("old_users") delete_users = json.loads(old_users) if old_users else [] if user not in delete_users: delete_users.append(user) self.charm.leader_set({"old_users": json.dumps(delete_users)}) def _process_delete_user_response(self, response: dict) -> None: deleted_users = [] for op in response.get("ops", []): if op.get("return-code") == 0: deleted_users.append(op.get("value").get("name")) else: logger.debug(f"Error in running delete user for op {op}") if deleted_users: logger.debug(f"Deleted users: {deleted_users}") old_users = self.charm.leader_get("old_users") users_to_delete = json.loads(old_users) if old_users else [] new_users_to_delete = [ x for x in users_to_delete if x not in deleted_users ] self.charm.leader_set({"old_users": json.dumps(new_users_to_delete)}) def _on_secret_changed(self, event: ops.SecretChangedEvent): logger.debug( f"secret-changed triggered for label {event.secret.label}" ) # Secret change on configured user secret if event.secret.label == self.config_label: logger.debug( "Calling configure charm to populate user info in " "configuration files" ) self.callback_f(event) else: logger.debug( "Ignoring the secret-changed event for label " f"{event.secret.label}" ) def _on_secret_rotate(self, event: ops.SecretRotateEvent): # All the juju secrets are created on leader unit, so return # if unit is not leader at this stage instead of checking at # each secret. logger.debug(f"secret-rotate triggered for label {event.secret.label}") if not self.model.unit.is_leader(): logger.debug("Not leader unit, no action required") return # Secret rotate on stack user secret sent to ops if event.secret.label == self.label: self._ensure_credentials(refresh_user=True) request = self._create_user_request() logger.debug(f"Sending ops request: {request}") self.interface.request_ops(request) else: logger.debug( "Ignoring the secret-rotate event for label " f"{event.secret.label}" ) def _on_secret_remove(self, event: ops.SecretRemoveEvent): logger.debug(f"secret-remove triggered for label {event.secret.label}") if not self.model.unit.is_leader(): logger.debug("Not leader unit, no action required") return # Secret remove on configured stack admin secret if event.secret.label == self.config_label: old_users = self.charm.leader_get("old_users") users_to_delete = json.loads(old_users) if old_users else [] if not users_to_delete: return request = self._delete_user_request(users_to_delete) logger.debug(f"Sending ops request: {request}") self.interface.request_ops(request) else: logger.debug( "Ignoring the secret-remove event for label " f"{event.secret.label}" ) def _on_provider_ready(self, event) -> None: """Handles response available events.""" logger.info("Handle response from identity ops") if not self.model.unit.is_leader(): return self.interface.request_ops(self._create_user_request()) self._grant_ops_secret(event.relation) self.callback_f(event) def _on_response_available(self, event) -> None: """Handles response available events.""" if not self.model.unit.is_leader(): return logger.info("Handle response from identity ops") response = self.interface.response tag = response.get("tag") if tag == self._create_user_tag: self._process_create_user_response(response) if self.extra_ops_process is not None: self.extra_ops_process(event, response) elif tag == self._delete_user_tag: self._process_delete_user_response(response) self.callback_f(event) def _on_provider_goneaway(self, event) -> None: """Handle gone_away event.""" self.callback_f(event) @property def ready(self) -> bool: """Whether the relation is ready.""" return self.get_config_credentials() is not None