[ops-sunbeam] Implement mypy linter

Implement mypy in the most non-breaking way possible. There's still some
changes of behavior that crept in, merely due to incorrect edge case
handling.

Charm libraries are generally well typed, include py.typed marker for
all of the libraries, to allow mypy analyzing their usage.

Change-Id: I7bda1913fa08dd4954a606526272ac80b45197cc
Signed-off-by: Guillaume Boutry <guillaume.boutry@canonical.com>
This commit is contained in:
Guillaume Boutry 2024-08-07 17:25:48 +02:00
parent ba7f86f174
commit 8c674de50e
No known key found for this signature in database
GPG Key ID: E95E3326872E55DE
48 changed files with 593 additions and 408 deletions

View File

@ -83,8 +83,12 @@ class TestCinderCephOperatorCharm(test_utils.CharmTestCase):
"""Setup fixtures ready for testing."""
super().setUp(charm, self.PATCHES)
self.mock_event = MagicMock()
with open("config.yaml", "r") as f:
config_data = f.read()
self.harness = test_utils.get_harness(
_CinderCephOperatorCharm, container_calls=self.container_calls
_CinderCephOperatorCharm,
container_calls=self.container_calls,
charm_config=config_data,
)
mock_get_platform = patch(
"charmhelpers.osplatform.get_platform", return_value="ubuntu"

View File

@ -79,11 +79,11 @@ import json
import logging
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object,
ObjectEvents,
StoredState,
)
from ops.model import (
Relation,
@ -100,7 +100,7 @@ LIBAPI = 1
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
LIBPATCH = 4
logger = logging.getLogger(__name__)
@ -140,8 +140,13 @@ class IdentityServiceRequires(Object):
on = IdentityServiceServerEvents()
_stored = StoredState()
def __init__(self, charm, relation_name: str, service_endpoints: dict,
region: str):
def __init__(
self,
charm,
relation_name: str,
service_endpoints: list[dict],
region: str,
):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
@ -168,9 +173,7 @@ class IdentityServiceRequires(Object):
"""IdentityService relation joined."""
logging.debug("IdentityService on_joined")
self.on.connected.emit()
self.register_services(
self.service_endpoints,
self.region)
self.register_services(self.service_endpoints, self.region)
def _on_identity_service_relation_changed(self, event):
"""IdentityService relation changed."""
@ -199,92 +202,92 @@ class IdentityServiceRequires(Object):
@property
def api_version(self) -> str:
"""Return the api_version."""
return self.get_remote_app_data('api-version')
return self.get_remote_app_data("api-version")
@property
def auth_host(self) -> str:
"""Return the auth_host."""
return self.get_remote_app_data('auth-host')
return self.get_remote_app_data("auth-host")
@property
def auth_port(self) -> str:
"""Return the auth_port."""
return self.get_remote_app_data('auth-port')
return self.get_remote_app_data("auth-port")
@property
def auth_protocol(self) -> str:
"""Return the auth_protocol."""
return self.get_remote_app_data('auth-protocol')
return self.get_remote_app_data("auth-protocol")
@property
def internal_host(self) -> str:
"""Return the internal_host."""
return self.get_remote_app_data('internal-host')
return self.get_remote_app_data("internal-host")
@property
def internal_port(self) -> str:
"""Return the internal_port."""
return self.get_remote_app_data('internal-port')
return self.get_remote_app_data("internal-port")
@property
def internal_protocol(self) -> str:
"""Return the internal_protocol."""
return self.get_remote_app_data('internal-protocol')
return self.get_remote_app_data("internal-protocol")
@property
def admin_domain_name(self) -> str:
"""Return the admin_domain_name."""
return self.get_remote_app_data('admin-domain-name')
return self.get_remote_app_data("admin-domain-name")
@property
def admin_domain_id(self) -> str:
"""Return the admin_domain_id."""
return self.get_remote_app_data('admin-domain-id')
return self.get_remote_app_data("admin-domain-id")
@property
def admin_project_name(self) -> str:
"""Return the admin_project_name."""
return self.get_remote_app_data('admin-project-name')
return self.get_remote_app_data("admin-project-name")
@property
def admin_project_id(self) -> str:
"""Return the admin_project_id."""
return self.get_remote_app_data('admin-project-id')
return self.get_remote_app_data("admin-project-id")
@property
def admin_user_name(self) -> str:
"""Return the admin_user_name."""
return self.get_remote_app_data('admin-user-name')
return self.get_remote_app_data("admin-user-name")
@property
def admin_user_id(self) -> str:
"""Return the admin_user_id."""
return self.get_remote_app_data('admin-user-id')
return self.get_remote_app_data("admin-user-id")
@property
def service_domain_name(self) -> str:
"""Return the service_domain_name."""
return self.get_remote_app_data('service-domain-name')
return self.get_remote_app_data("service-domain-name")
@property
def service_domain_id(self) -> str:
"""Return the service_domain_id."""
return self.get_remote_app_data('service-domain-id')
return self.get_remote_app_data("service-domain-id")
@property
def service_host(self) -> str:
"""Return the service_host."""
return self.get_remote_app_data('service-host')
return self.get_remote_app_data("service-host")
@property
def service_credentials(self) -> str:
"""Return the service_credentials secret."""
return self.get_remote_app_data('service-credentials')
return self.get_remote_app_data("service-credentials")
@property
def service_password(self) -> str:
"""Return the service_password."""
credentials_id = self.get_remote_app_data('service-credentials')
credentials_id = self.get_remote_app_data("service-credentials")
if not credentials_id:
return None
@ -298,27 +301,27 @@ class IdentityServiceRequires(Object):
@property
def service_port(self) -> str:
"""Return the service_port."""
return self.get_remote_app_data('service-port')
return self.get_remote_app_data("service-port")
@property
def service_protocol(self) -> str:
"""Return the service_protocol."""
return self.get_remote_app_data('service-protocol')
return self.get_remote_app_data("service-protocol")
@property
def service_project_name(self) -> str:
"""Return the service_project_name."""
return self.get_remote_app_data('service-project-name')
return self.get_remote_app_data("service-project-name")
@property
def service_project_id(self) -> str:
"""Return the service_project_id."""
return self.get_remote_app_data('service-project-id')
return self.get_remote_app_data("service-project-id")
@property
def service_user_name(self) -> str:
"""Return the service_user_name."""
credentials_id = self.get_remote_app_data('service-credentials')
credentials_id = self.get_remote_app_data("service-credentials")
if not credentials_id:
return None
@ -332,30 +335,31 @@ class IdentityServiceRequires(Object):
@property
def service_user_id(self) -> str:
"""Return the service_user_id."""
return self.get_remote_app_data('service-user-id')
return self.get_remote_app_data("service-user-id")
@property
def internal_auth_url(self) -> str:
"""Return the internal_auth_url."""
return self.get_remote_app_data('internal-auth-url')
return self.get_remote_app_data("internal-auth-url")
@property
def admin_auth_url(self) -> str:
"""Return the admin_auth_url."""
return self.get_remote_app_data('admin-auth-url')
return self.get_remote_app_data("admin-auth-url")
@property
def public_auth_url(self) -> str:
"""Return the public_auth_url."""
return self.get_remote_app_data('public-auth-url')
return self.get_remote_app_data("public-auth-url")
@property
def admin_role(self) -> str:
"""Return the admin_role."""
return self.get_remote_app_data('admin-role')
return self.get_remote_app_data("admin-role")
def register_services(self, service_endpoints: dict,
region: str) -> None:
def register_services(
self, service_endpoints: list[dict], region: str
) -> None:
"""Request access to the IdentityService server."""
if self.model.unit.is_leader():
logging.debug("Requesting service registration")
@ -375,8 +379,15 @@ class HasIdentityServiceClientsEvent(EventBase):
class ReadyIdentityServiceClientsEvent(EventBase):
"""IdentityServiceClients Ready Event."""
def __init__(self, handle, relation_id, relation_name, service_endpoints,
region, client_app_name):
def __init__(
self,
handle,
relation_id,
relation_name,
service_endpoints,
region,
client_app_name,
):
super().__init__(handle)
self.relation_id = relation_id
self.relation_name = relation_name
@ -390,7 +401,8 @@ class ReadyIdentityServiceClientsEvent(EventBase):
"relation_name": self.relation_name,
"service_endpoints": self.service_endpoints,
"client_app_name": self.client_app_name,
"region": self.region}
"region": self.region,
}
def restore(self, snapshot):
super().restore(snapshot)
@ -405,7 +417,9 @@ class IdentityServiceClientEvents(ObjectEvents):
"""Events class for `on`"""
has_identity_service_clients = EventSource(HasIdentityServiceClientsEvent)
ready_identity_service_clients = EventSource(ReadyIdentityServiceClientsEvent)
ready_identity_service_clients = EventSource(
ReadyIdentityServiceClientsEvent
)
class IdentityServiceProvides(Object):
@ -441,9 +455,7 @@ class IdentityServiceProvides(Object):
def _on_identity_service_relation_changed(self, event):
"""Handle IdentityService changed."""
logging.debug("IdentityService on_changed")
REQUIRED_KEYS = [
'service-endpoints',
'region']
REQUIRED_KEYS = ["service-endpoints", "region"]
values = [
event.relation.data[event.relation.app].get(k)
@ -452,42 +464,47 @@ class IdentityServiceProvides(Object):
# Validate data on the relation
if all(values):
service_eps = json.loads(
event.relation.data[event.relation.app]['service-endpoints'])
event.relation.data[event.relation.app]["service-endpoints"]
)
self.on.ready_identity_service_clients.emit(
event.relation.id,
event.relation.name,
service_eps,
event.relation.data[event.relation.app]['region'],
event.relation.app.name)
event.relation.data[event.relation.app]["region"],
event.relation.app.name,
)
def _on_identity_service_relation_broken(self, event):
"""Handle IdentityService broken."""
logging.debug("IdentityServiceProvides on_departed")
# TODO clear data on the relation
def set_identity_service_credentials(self, relation_name: int,
relation_id: str,
api_version: str,
auth_host: str,
auth_port: str,
auth_protocol: str,
internal_host: str,
internal_port: str,
internal_protocol: str,
service_host: str,
service_port: str,
service_protocol: str,
admin_domain: dict,
admin_project: dict,
admin_user: dict,
service_domain: dict,
service_project: dict,
service_user: dict,
internal_auth_url: str,
admin_auth_url: str,
public_auth_url: str,
service_credentials: str,
admin_role: str):
def set_identity_service_credentials(
self,
relation_name: int,
relation_id: str,
api_version: str,
auth_host: str,
auth_port: str,
auth_protocol: str,
internal_host: str,
internal_port: str,
internal_protocol: str,
service_host: str,
service_port: str,
service_protocol: str,
admin_domain: dict,
admin_project: dict,
admin_user: dict,
service_domain: dict,
service_project: dict,
service_user: dict,
internal_auth_url: str,
admin_auth_url: str,
public_auth_url: str,
service_credentials: str,
admin_role: str,
):
logging.debug("Setting identity_service connection information.")
_identity_service_rel = None
for relation in self.framework.model.relations[relation_name]:

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

@ -32,10 +32,12 @@ containers and managing the service running in the container.
import ipaddress
import logging
import urllib
import urllib.parse
from typing import (
List,
Mapping,
Optional,
Sequence,
Set,
)
@ -52,7 +54,7 @@ import ops_sunbeam.guard as sunbeam_guard
import ops_sunbeam.job_ctrl as sunbeam_job_ctrl
import ops_sunbeam.relation_handlers as sunbeam_rhandlers
import tenacity
from lightkube import (
from lightkube.core.client import (
Client,
)
from lightkube.resources.core_v1 import (
@ -77,7 +79,8 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
_state = ops.framework.StoredState()
# Holds set of mandatory relations
mandatory_relations = set()
mandatory_relations: set[str] = set()
service_name: str
def __init__(self, framework: ops.framework.Framework) -> None:
"""Run constructor."""
@ -134,8 +137,8 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
return True
def get_relation_handlers(
self, handlers: List[sunbeam_rhandlers.RelationHandler] = None
) -> List[sunbeam_rhandlers.RelationHandler]:
self, handlers: list[sunbeam_rhandlers.RelationHandler] | None = None
) -> list[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
if self.can_add_handler("tracing", handlers):
@ -147,8 +150,8 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
self,
"amqp",
self.configure_charm,
self.config.get("rabbit-user") or self.service_name,
self.config.get("rabbit-vhost") or "openstack",
str(self.config.get("rabbit-user") or self.service_name),
str(self.config.get("rabbit-vhost") or "openstack"),
"amqp" in self.mandatory_relations,
)
handlers.append(self.amqp)
@ -220,26 +223,35 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
"""Return Subject Alternate Names to use in cert for service."""
return list(set(self.get_domain_name_sans()))
def _ip_sans(self) -> List[ipaddress.IPv4Address]:
"""Get IPv4 addresses for service."""
ip_sans = []
def _get_all_relation_addresses(self) -> list[ipaddress.IPv4Address]:
"""Return all bind/ingress addresses from all relations."""
addresses = []
for relation_name in self.meta.relations.keys():
for relation in self.framework.model.relations.get(
relation_name, []
):
binding = self.model.get_binding(relation)
if binding is None or binding.network is None:
continue
if isinstance(
binding.network.ingress_address, ipaddress.IPv4Address
):
ip_sans.append(binding.network.ingress_address)
addresses.append(binding.network.ingress_address)
if isinstance(
binding.network.bind_address, ipaddress.IPv4Address
):
ip_sans.append(binding.network.bind_address)
addresses.append(binding.network.bind_address)
return addresses
def _ip_sans(self) -> list[ipaddress.IPv4Address]:
"""Get IPv4 addresses for service."""
ip_sans = self._get_all_relation_addresses()
for binding_name in ["public"]:
try:
binding = self.model.get_binding(binding_name)
if binding is None or binding.network is None:
continue
if isinstance(
binding.network.ingress_address, ipaddress.IPv4Address
):
@ -337,7 +349,7 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
@property
def config_contexts(
self,
) -> List[sunbeam_config_contexts.CharmConfigContext]:
) -> list[sunbeam_config_contexts.ConfigContext]:
"""Return the configuration adapters for the operator."""
return [sunbeam_config_contexts.CharmConfigContext(self, "options")]
@ -537,15 +549,22 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
def bootstrapped(self) -> bool:
"""Determine whether the service has been bootstrapped."""
return self._state.unit_bootstrapped and self.is_leader_ready()
return (
self._state.unit_bootstrapped # type: ignore[truthy-function] # unit_bootstrapped is not a function
and self.is_leader_ready()
)
def leader_set(self, settings: dict = None, **kwargs) -> None:
def leader_set(
self,
settings: sunbeam_core.RelationDataMapping | None = None,
**kwargs,
) -> None:
"""Juju set data in peer data bag."""
settings = settings or {}
settings.update(kwargs)
self.peers.set_app_data(settings=settings)
def leader_get(self, key: str) -> str:
def leader_get(self, key: str) -> str | None:
"""Retrieve data from the peer relation."""
return self.peers.get_app_data(key)
@ -593,24 +612,24 @@ class OSBaseOperatorCharmK8S(OSBaseOperatorCharm):
def get_named_pebble_handler(
self, container_name: str
) -> sunbeam_chandlers.PebbleHandler:
) -> sunbeam_chandlers.PebbleHandler | None:
"""Get pebble handler matching container_name."""
pebble_handlers = [
h
for h in self.pebble_handlers
if h.container_name == container_name
]
assert len(pebble_handlers) < 2, (
"Multiple pebble handlers with the " "same name found."
)
assert (
len(pebble_handlers) < 2
), "Multiple pebble handlers with the same name found."
if pebble_handlers:
return pebble_handlers[0]
else:
return None
def get_named_pebble_handlers(
self, container_names: List[str]
) -> List[sunbeam_chandlers.PebbleHandler]:
self, container_names: Sequence[str]
) -> list[sunbeam_chandlers.PebbleHandler]:
"""Get pebble handlers matching container_names."""
return [
h
@ -644,7 +663,7 @@ class OSBaseOperatorCharmK8S(OSBaseOperatorCharm):
"Container service not ready"
)
def stop_services(self, relation: Optional[Set[str]] = None) -> None:
def stop_services(self, relation: set[str] | None = None) -> None:
"""Stop all running services."""
for ph in self.pebble_handlers:
if ph.pebble_ready:
@ -653,7 +672,7 @@ class OSBaseOperatorCharmK8S(OSBaseOperatorCharm):
)
ph.stop_all()
def configure_unit(self, event: ops.framework.EventBase) -> None:
def configure_unit(self, event: ops.EventBase) -> None:
"""Run configuration on this unit."""
self.check_leader_ready()
self.check_relation_handlers_ready(event)
@ -689,12 +708,12 @@ class OSBaseOperatorCharmK8S(OSBaseOperatorCharm):
self.status.set(ActiveStatus(""))
@property
def container_configs(self) -> List[sunbeam_core.ContainerConfigFile]:
def container_configs(self) -> list[sunbeam_core.ContainerConfigFile]:
"""Container configuration files for the operator."""
return []
@property
def container_names(self) -> List[str]:
def container_names(self) -> list[str]:
"""Names of Containers that form part of this service."""
return [self.service_name]
@ -746,17 +765,18 @@ class OSBaseOperatorCharmK8S(OSBaseOperatorCharm):
if not self.unit.is_leader():
logging.info("Not lead unit, skipping DB syncs")
return
try:
if self.db_sync_cmds:
if db_sync_cmds := getattr(self, "db_sync_cmds", None):
if db_sync_cmds:
logger.info("Syncing database...")
for cmd in self.db_sync_cmds:
for cmd in db_sync_cmds:
try:
self._retry_db_sync(cmd)
except tenacity.RetryError:
raise sunbeam_guard.BlockedExceptionError(
"DB sync failed"
)
except AttributeError:
else:
logger.warning(
"Not DB sync ran. Charm does not specify self.db_sync_cmds"
)
@ -770,19 +790,21 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
"""Base class for OpenStack API operators."""
mandatory_relations = {"database", "identity-service", "ingress-public"}
wsgi_admin_script: str
wsgi_public_script: str
def __init__(self, framework: ops.framework.Framework) -> None:
"""Run constructor."""
super().__init__(framework)
@property
def service_endpoints(self) -> List[dict]:
def service_endpoints(self) -> list[dict]:
"""List of endpoints for this service."""
return []
def get_relation_handlers(
self, handlers: List[sunbeam_rhandlers.RelationHandler] = None
) -> List[sunbeam_rhandlers.RelationHandler]:
self, handlers: list[sunbeam_rhandlers.RelationHandler] | None = None
) -> list[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
# Note: intentionally including the ingress handler here in order to
@ -813,7 +835,7 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
"identity-service",
self.configure_charm,
self.service_endpoints,
self.model.config["region"],
str(self.model.config["region"]),
"identity-service" in self.mandatory_relations,
)
handlers.append(self.id_svc)
@ -827,15 +849,11 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
call the configure_charm.
"""
logger.debug("Received an ingress_changed event")
try:
if self.id_svc.update_service_endpoints:
logger.debug(
"Updating service endpoints after ingress "
"relation changed."
)
self.id_svc.update_service_endpoints(self.service_endpoints)
except (AttributeError, KeyError):
pass
if hasattr(self, "id_svc"):
logger.debug(
"Updating service endpoints after ingress " "relation changed."
)
self.id_svc.update_service_endpoints(self.service_endpoints)
self.configure_charm(event)
@ -850,7 +868,7 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
charm_service = client.get(
Service, name=self.app.name, namespace=self.model.name
)
public_address = None
status = charm_service.status
if status:
load_balancer_status = status.loadBalancer
@ -867,12 +885,21 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
"Using ingress address from loadbalancer "
f"as {addr}"
)
return ingress_address.hostname or ingress_address.ip
public_address = (
ingress_address.hostname or ingress_address.ip
)
hostname = self.model.get_binding(
"identity-service"
).network.ingress_address
return hostname
if not public_address:
binding = self.model.get_binding("identity-service")
if binding and binding.network and binding.network.ingress_address:
public_address = str(binding.network.ingress_address)
if not public_address:
raise sunbeam_guard.WaitingExceptionError(
"No public address found for service"
)
return public_address
@property
def public_url(self) -> str:
@ -895,15 +922,19 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
@property
def admin_url(self) -> str:
"""Url for accessing the admin endpoint for this service."""
hostname = self.model.get_binding(
"identity-service"
).network.ingress_address
return self.add_explicit_port(self.service_url(hostname))
binding = self.model.get_binding("identity-service")
if binding and binding.network and binding.network.ingress_address:
return self.add_explicit_port(
self.service_url(str(binding.network.ingress_address))
)
raise sunbeam_guard.WaitingExceptionError(
"No admin address found for service"
)
@property
def internal_url(self) -> str:
"""Url for accessing the internal endpoint for this service."""
try:
if hasattr(self, "ingress_internal"):
if self.ingress_internal.url:
logger.debug(
"Ingress-internal relation found, returning "
@ -911,15 +942,17 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
self.ingress_internal.url,
)
return self.add_explicit_port(self.ingress_internal.url)
except (AttributeError, KeyError):
pass
hostname = self.model.get_binding(
"identity-service"
).network.ingress_address
return self.add_explicit_port(self.service_url(hostname))
binding = self.model.get_binding("identity-service")
if binding and binding.network and binding.network.ingress_address:
return self.add_explicit_port(
self.service_url(str(binding.network.ingress_address))
)
raise sunbeam_guard.WaitingExceptionError(
"No internal address found for service"
)
def get_pebble_handlers(self) -> List[sunbeam_chandlers.PebbleHandler]:
def get_pebble_handlers(self) -> list[sunbeam_chandlers.PebbleHandler]:
"""Pebble handlers for the service."""
return [
sunbeam_chandlers.WSGIPebbleHandler(
@ -934,7 +967,7 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
]
@property
def container_configs(self) -> List[sunbeam_core.ContainerConfigFile]:
def container_configs(self) -> list[sunbeam_core.ContainerConfigFile]:
"""Container configuration files for the service."""
_cconfigs = super().container_configs
_cconfigs.extend(
@ -964,7 +997,7 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharmK8S):
return f"/etc/{self.service_name}/{self.service_name}.conf"
@property
def config_contexts(self) -> List[sunbeam_config_contexts.ConfigContext]:
def config_contexts(self) -> list[sunbeam_config_contexts.ConfigContext]:
"""Generate list of configuration adapters for the charm."""
_cadapters = super().config_contexts
_cadapters.extend(

View File

@ -24,6 +24,7 @@ aspects of the application without clobbering other parts.
"""
import json
import logging
import typing
from typing import (
Callable,
Dict,
@ -157,12 +158,14 @@ class StatusPool(Object):
)
try:
self._state = charm.framework.load_snapshot(stored_handle)
status_state = json.loads(self._state["statuses"])
self._state = typing.cast(
StoredStateData, charm.framework.load_snapshot(stored_handle)
)
status_state: dict = json.loads(self._state["statuses"])
except NoSnapshotError:
self._state = StoredStateData(self, "_status_pool")
status_state = []
self._status_state = status_state
status_state = {}
self._status_state: dict = status_state
# 'commit' is an ops framework event
# that tells the object to save a snapshot of its state for later.

View File

@ -19,16 +19,15 @@ create reusable contexts which translate charm config, deployment state etc.
These are not specific to a relation.
"""
from __future__ import (
annotations,
)
import logging
from typing import (
TYPE_CHECKING,
)
import ops_sunbeam.tracing as sunbeam_tracing
from ops_sunbeam.core import (
ContextMapping,
)
if TYPE_CHECKING:
import ops_sunbeam.charm
@ -61,7 +60,7 @@ class ConfigContext:
"""Whether the context has all the data is needs."""
return True
def context(self) -> dict:
def context(self) -> ContextMapping:
"""Context used when rendering templates."""
raise NotImplementedError
@ -70,7 +69,7 @@ class ConfigContext:
class CharmConfigContext(ConfigContext):
"""A context containing all of the charms config options."""
def context(self) -> dict:
def context(self) -> ContextMapping:
"""Charms config options."""
return self.charm.config
@ -79,7 +78,9 @@ class CharmConfigContext(ConfigContext):
class WSGIWorkerConfigContext(ConfigContext):
"""Configuration context for WSGI configuration."""
def context(self) -> dict:
charm: "ops_sunbeam.charm.OSBaseOperatorAPICharm"
def context(self) -> ContextMapping:
"""WSGI configuration options."""
return {
"name": self.charm.service_name,
@ -97,7 +98,7 @@ class WSGIWorkerConfigContext(ConfigContext):
class CephConfigurationContext(ConfigContext):
"""Ceph configuration context."""
def context(self) -> None:
def context(self) -> ContextMapping:
"""Ceph configuration context."""
config = self.charm.model.config.get
ctxt = {}
@ -113,7 +114,7 @@ class CephConfigurationContext(ConfigContext):
class CinderCephConfigurationContext(ConfigContext):
"""Cinder Ceph configuration context."""
def context(self) -> None:
def context(self) -> ContextMapping:
"""Cinder Ceph configuration context."""
config = self.charm.model.config.get
data_pool_name = config("rbd-pool-name") or self.charm.app.name

View File

@ -21,13 +21,10 @@ in the container.
import collections
import logging
import typing
from collections.abc import (
Callable,
)
from typing import (
List,
TypedDict,
)
import ops.charm
import ops.pebble
@ -41,6 +38,12 @@ from ops.model import (
WaitingStatus,
)
if typing.TYPE_CHECKING:
from ops_sunbeam.charm import (
OSBaseOperatorAPICharm,
OSBaseOperatorCharm,
)
logger = logging.getLogger(__name__)
ContainerDir = collections.namedtuple(
@ -54,10 +57,10 @@ class PebbleHandler(ops.framework.Object):
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
container_name: str,
service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
container_configs: list[sunbeam_core.ContainerConfigFile],
template_dir: str,
callback_f: Callable,
) -> None:
@ -129,16 +132,16 @@ class PebbleHandler(ops.framework.Object):
logger.debug("No file changes detected")
return files_updated
def get_layer(self) -> dict:
def get_layer(self) -> ops.pebble.LayerDict:
"""Pebble configuration layer for the container."""
return {}
def get_healthcheck_layer(self) -> dict:
def get_healthcheck_layer(self) -> ops.pebble.LayerDict:
"""Pebble configuration for health check layer for the container."""
return {}
@property
def directories(self) -> List[ContainerDir]:
def directories(self) -> list[ContainerDir]:
"""List of directories to create in container."""
return []
@ -165,7 +168,7 @@ class PebbleHandler(ops.framework.Object):
def default_container_configs(
self,
) -> List[sunbeam_core.ContainerConfigFile]:
) -> list[sunbeam_core.ContainerConfigFile]:
"""Generate default container configurations.
These should be used by all inheriting classes and are
@ -189,7 +192,7 @@ class PebbleHandler(ops.framework.Object):
return all([s.is_running() for s in services.values()])
def execute(
self, cmd: List, exception_on_error: bool = False, **kwargs: TypedDict
self, cmd: list[str], exception_on_error: bool = False, **kwargs
) -> str:
"""Execute given command in container managed by this handler.
@ -214,10 +217,12 @@ class PebbleHandler(ops.framework.Object):
return stdout
except ops.pebble.ExecError as e:
logger.error("Exited with code %d. Stderr:", e.exit_code)
for line in e.stderr.splitlines():
logger.error(" %s", line)
if e.stderr:
for line in e.stderr.splitlines():
logger.error(" %s", line)
if exception_on_error:
raise
return ""
def add_healthchecks(self) -> None:
"""Add healthcheck layer to the plan."""
@ -363,12 +368,14 @@ class ServicePebbleHandler(PebbleHandler):
class WSGIPebbleHandler(PebbleHandler):
"""WSGI oriented handler for a Pebble managed container."""
charm: "OSBaseOperatorAPICharm"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorAPICharm",
container_name: str,
service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
container_configs: list[sunbeam_core.ContainerConfigFile],
template_dir: str,
callback_f: Callable,
wsgi_service_name: str,
@ -406,7 +413,7 @@ class WSGIPebbleHandler(PebbleHandler):
"""Start the service."""
self.start_wsgi()
def get_layer(self) -> dict:
def get_layer(self) -> ops.pebble.LayerDict:
"""Apache WSGI service pebble layer.
:returns: pebble layer configuration for wsgi service
@ -424,7 +431,7 @@ class WSGIPebbleHandler(PebbleHandler):
},
}
def get_healthcheck_layer(self) -> dict:
def get_healthcheck_layer(self) -> ops.pebble.LayerDict:
"""Apache WSGI health check pebble layer.
:returns: pebble health check layer configuration for wsgi service
@ -483,7 +490,7 @@ class WSGIPebbleHandler(PebbleHandler):
def default_container_configs(
self,
) -> List[sunbeam_core.ContainerConfigFile]:
) -> list[sunbeam_core.ContainerConfigFile]:
"""Container configs for WSGI service."""
return [
sunbeam_core.ContainerConfigFile(self.wsgi_conf, "root", "root")

View File

@ -18,8 +18,9 @@ import collections
from typing import (
TYPE_CHECKING,
Generator,
List,
Tuple,
Mapping,
MutableMapping,
Sequence,
Union,
)
@ -42,6 +43,10 @@ ContainerConfigFile = collections.namedtuple(
defaults=(None,),
)
RelationDataMapping = MutableMapping[str, str]
ConfigMapping = Mapping[str, bool | int | float | str | None]
ContextMapping = RelationDataMapping | ConfigMapping
@sunbeam_tracing.trace_type
class OPSCharmContexts:
@ -50,7 +55,7 @@ class OPSCharmContexts:
def __init__(self, charm: "OSBaseOperatorCharm") -> None:
"""Run constructor."""
self.charm = charm
self.namespaces = []
self.namespaces: list[str] = []
def add_relation_handler(self, handler: "RelationHandler") -> None:
"""Add relation handler."""
@ -67,7 +72,7 @@ class OPSCharmContexts:
setattr(self, "leader_db", obj)
def add_config_contexts(
self, config_adapters: List["ConfigContext"]
self, config_adapters: Sequence["ConfigContext"]
) -> None:
"""Add multiple config contexts."""
for config_adapter in config_adapters:
@ -83,7 +88,7 @@ class OPSCharmContexts:
def __iter__(
self,
) -> Generator[
Tuple[str, Union["ConfigContext", "RelationHandler"]], None, None
tuple[str, Union["ConfigContext", "RelationHandler"]], None, None
]:
"""Iterate over the relations presented to the charm."""
for namespace in self.namespaces:

View File

@ -15,19 +15,22 @@
"""Module to handle errors and bailing out of an event/hook."""
import logging
import typing
from contextlib import (
contextmanager,
)
from ops.charm import (
CharmBase,
)
from ops.model import (
BlockedStatus,
MaintenanceStatus,
WaitingStatus,
)
if typing.TYPE_CHECKING:
from ops_sunbeam.charm import (
OSBaseOperatorCharm,
)
logger = logging.getLogger(__name__)
@ -65,12 +68,12 @@ class WaitingExceptionError(BaseStatusExceptionError):
@contextmanager
def guard(
charm: "CharmBase",
charm: "OSBaseOperatorCharm",
section: str,
handle_exception: bool = True,
log_traceback: bool = True,
**__,
) -> None:
) -> typing.Generator:
"""Context manager to handle errors and bailing out of an event/hook.
The nature of Juju is that all the information may not be available to run

View File

@ -15,11 +15,7 @@
"""Common interfaces not charm specific."""
import logging
from typing import (
Dict,
List,
Optional,
)
import typing
import ops.model
from ops.framework import (
@ -29,6 +25,9 @@ from ops.framework import (
ObjectEvents,
StoredState,
)
from ops_sunbeam.core import (
RelationDataMapping,
)
class PeersRelationCreatedEvent(EventBase):
@ -66,7 +65,7 @@ class PeersEvents(ObjectEvents):
class OperatorPeers(Object):
"""Interface for the peers relation."""
on = PeersEvents()
on = PeersEvents() # type: ignore
state = StoredState()
def __init__(self, charm: ops.charm.CharmBase, relation_name: str) -> None:
@ -89,50 +88,50 @@ class OperatorPeers(Object):
return self.framework.model.get_relation(self.relation_name)
@property
def _app_data_bag(self) -> Dict[str, str]:
def _app_data_bag(self) -> typing.MutableMapping[str, str]:
"""Return all app data on peer relation."""
if not self.peers_rel:
return {}
return self.peers_rel.data[self.peers_rel.app]
def on_joined(self, event: ops.framework.EventBase) -> None:
def on_joined(self, event: ops.EventBase) -> None:
"""Handle relation joined event."""
logging.info("Peer joined")
self.on.peers_relation_joined.emit()
def on_created(self, event: ops.framework.EventBase) -> None:
def on_created(self, event: ops.EventBase) -> None:
"""Handle relation created event."""
logging.info("Peers on_created")
self.on.peers_relation_created.emit()
def on_changed(self, event: ops.framework.EventBase) -> None:
def on_changed(self, event: ops.EventBase) -> None:
"""Handle relation changed event."""
logging.info("Peers on_changed")
self.on.peers_data_changed.emit()
def set_app_data(self, settings: Dict[str, str]) -> None:
def set_app_data(self, settings: RelationDataMapping) -> None:
"""Publish settings on the peer app data bag."""
for k, v in settings.items():
self._app_data_bag[k] = v
def get_app_data(self, key: str) -> Optional[str]:
def get_app_data(self, key: str) -> str | None:
"""Get the value corresponding to key from the app data bag."""
if not self.peers_rel:
return None
return self._app_data_bag.get(key)
def get_all_app_data(self) -> Dict[str, str]:
def get_all_app_data(self) -> typing.MutableMapping[str, str]:
"""Return all the app data from the relation."""
return self._app_data_bag
def get_all_unit_values(
self, key: str, include_local_unit: bool = False
) -> List[str]:
) -> list[str]:
"""Retrieve value for key from all related units.
:param include_local_unit: Include value set by local unit
"""
values = []
values: list[str] = []
if not self.peers_rel:
return values
for unit in self.peers_rel.units:
@ -144,17 +143,17 @@ class OperatorPeers(Object):
values.append(local_unit_value)
return values
def set_unit_data(self, settings: Dict[str, str]) -> None:
def set_unit_data(self, settings: typing.Mapping[str, str]) -> None:
"""Publish settings on the peer unit data bag."""
if not self.peers_rel:
return
for k, v in settings.items():
self.peers_rel.data[self.model.unit][k] = v
def all_joined_units(self) -> List[ops.model.Unit]:
def all_joined_units(self) -> set[ops.model.Unit]:
"""All remote units joined to the peer relation."""
if not self.peers_rel:
return []
return set()
return set(self.peers_rel.units)
def expected_peer_units(self) -> int:

View File

@ -20,16 +20,14 @@ case these helpers can limit how frequently they are run.
import logging
import time
import typing
from functools import (
wraps,
)
from typing import (
TYPE_CHECKING,
)
import ops.framework
if TYPE_CHECKING:
if typing.TYPE_CHECKING:
import ops_sunbeam.charm
logger = logging.getLogger(__name__)

View File

@ -14,10 +14,6 @@
"""Base classes for defining an OVN charm using the Operator framework."""
from typing import (
List,
)
from .. import charm as sunbeam_charm
from .. import relation_handlers as sunbeam_rhandlers
from . import relation_handlers as ovn_relation_handlers
@ -27,8 +23,8 @@ class OSBaseOVNOperatorCharm(sunbeam_charm.OSBaseOperatorCharmK8S):
"""Base charms for OpenStack operators."""
def get_relation_handlers(
self, handlers: List[sunbeam_rhandlers.RelationHandler] = None
) -> List[sunbeam_rhandlers.RelationHandler]:
self, handlers: list[sunbeam_rhandlers.RelationHandler] | None = None
) -> list[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
if self.can_add_handler("ovsdb-cms", handlers):

View File

@ -14,17 +14,10 @@
"""Base classes for defining OVN Pebble handlers."""
from typing import (
List,
)
from ops.model import (
ActiveStatus,
)
from .. import container_handlers as sunbeam_chandlers
from .. import core as sunbeam_core
from .. import tracing as sunbeam_tracing
import ops
import ops_sunbeam.container_handlers as sunbeam_chandlers
import ops_sunbeam.core as sunbeam_core
import ops_sunbeam.tracing as sunbeam_tracing
@sunbeam_tracing.trace_type
@ -52,14 +45,14 @@ class OVNPebbleHandler(sunbeam_chandlers.ServicePebbleHandler):
self.setup_dirs()
changes = self.write_config(context)
self.files_changed(changes)
self.status.set(ActiveStatus(""))
self.status.set(ops.ActiveStatus(""))
@property
def service_description(self) -> str:
"""Return a short description of service e.g. OVN Southbound DB."""
raise NotImplementedError
def get_layer(self) -> dict:
def get_layer(self) -> ops.pebble.LayerDict:
"""Pebble configuration layer for OVN service.
:returns: pebble layer configuration for service
@ -80,7 +73,7 @@ class OVNPebbleHandler(sunbeam_chandlers.ServicePebbleHandler):
},
}
def get_healthcheck_layer(self) -> dict:
def get_healthcheck_layer(self) -> ops.pebble.LayerDict:
"""Health check pebble layer.
:returns: pebble health check layer configuration for OVN service
@ -97,7 +90,7 @@ class OVNPebbleHandler(sunbeam_chandlers.ServicePebbleHandler):
}
@property
def directories(self) -> List[sunbeam_chandlers.ContainerDir]:
def directories(self) -> list[sunbeam_chandlers.ContainerDir]:
"""Directories to creete in container."""
return [
sunbeam_chandlers.ContainerDir("/etc/ovn", "root", "root"),
@ -108,7 +101,7 @@ class OVNPebbleHandler(sunbeam_chandlers.ServicePebbleHandler):
def default_container_configs(
self,
) -> List[sunbeam_core.ContainerConfigFile]:
) -> list[sunbeam_core.ContainerConfigFile]:
"""Files to render into containers."""
return [
sunbeam_core.ContainerConfigFile(

View File

@ -18,29 +18,44 @@ import ipaddress
import itertools
import logging
import socket
import typing
from typing import (
Callable,
Dict,
Iterator,
List,
)
import ops.charm
import ops.framework
import ops_sunbeam.interfaces as sunbeam_interfaces
import ops_sunbeam.relation_handlers as sunbeam_rhandlers
import ops_sunbeam.tracing as sunbeam_tracing
from ops.model import (
BlockedStatus,
)
from ops_sunbeam.charm import (
OSBaseOperatorCharm,
)
from .. import relation_handlers as sunbeam_rhandlers
from .. import tracing as sunbeam_tracing
if typing.TYPE_CHECKING:
import charms.ovn_central_k8s.v0.ovsdb as ovsdb
logger = logging.getLogger(__name__)
IPAddress = ipaddress.IPv4Address | ipaddress.IPv6Address | str
@sunbeam_tracing.trace_type
class OVNRelationUtils:
"""Common utilities for processing OVN relations."""
charm: OSBaseOperatorCharm
relation_name: str
interface: typing.Union[
"ovsdb.OVSDBCMSRequires",
"ovsdb.OVSDBCMSProvides",
sunbeam_interfaces.OperatorPeers,
]
DB_NB_PORT = 6641
DB_SB_PORT = 6642
DB_SB_ADMIN_PORT = 16642
@ -71,7 +86,7 @@ class OVNRelationUtils:
:returns: addresses published by remote units.
:rtype: Iterator[str]
"""
for addr in self.interface.get_all_unit_values(key):
for addr in self.interface.get_all_unit_values(key): # type: ignore
try:
addr = self._format_addr(addr)
yield addr
@ -86,7 +101,7 @@ class OVNRelationUtils:
:returns: hostnames published by remote units.
:rtype: Iterator[str]
"""
for hostname in self.interface.get_all_unit_values(key):
for hostname in self.interface.get_all_unit_values(key): # type: ignore
yield hostname
@property
@ -117,7 +132,7 @@ class OVNRelationUtils:
return self._remote_addrs("ingress-bound-address")
def db_connection_strs(
self, hostnames: List[str], port: int, proto: str = "ssl"
self, hostnames: typing.Iterable[str], port: int, proto: str = "ssl"
) -> Iterator[str]:
"""Provide connection strings.
@ -249,7 +264,7 @@ class OVNRelationUtils:
)
@property
def cluster_local_addr(self) -> ipaddress.IPv4Address:
def cluster_local_addr(self) -> IPAddress | None:
"""Retrieve local address bound to endpoint.
:returns: IPv4 or IPv6 address bound to endpoint
@ -258,7 +273,7 @@ class OVNRelationUtils:
return self._endpoint_local_bound_addr()
@property
def cluster_ingress_addr(self) -> ipaddress.IPv4Address:
def cluster_ingress_addr(self) -> IPAddress | None:
"""Retrieve local address bound to endpoint.
:returns: IPv4 or IPv6 address bound to endpoint
@ -284,7 +299,7 @@ class OVNRelationUtils:
"""
return socket.getfqdn()
def _endpoint_local_bound_addr(self) -> ipaddress.IPv4Address:
def _endpoint_local_bound_addr(self) -> IPAddress | None:
"""Retrieve local address bound to endpoint.
:returns: IPv4 or IPv6 address bound to endpoint
@ -292,19 +307,25 @@ class OVNRelationUtils:
addr = None
for relation in self.charm.model.relations.get(self.relation_name, []):
binding = self.charm.model.get_binding(relation)
addr = binding.network.bind_address
if binding and binding.network and binding.network.bind_address:
addr = binding.network.bind_address
break
return addr
def _endpoint_ingress_bound_addresses(self) -> ipaddress.IPv4Address:
def _endpoint_ingress_bound_addresses(self) -> list[IPAddress]:
"""Retrieve local address bound to endpoint.
:returns: IPv4 or IPv6 address bound to endpoint
"""
addresses = []
addresses: list[IPAddress] = []
for relation in self.charm.model.relations.get(self.relation_name, []):
binding = self.charm.model.get_binding(relation)
addresses.extend(binding.network.ingress_addresses)
if (
binding
and binding.network
and binding.network.ingress_addresses
):
addresses.extend(binding.network.ingress_addresses)
return list(set(addresses))
@ -314,7 +335,9 @@ class OVNDBClusterPeerHandler(
):
"""Handle OVN peer relation."""
def publish_cluster_local_hostname(self, hostname: str = None) -> Dict:
interface: sunbeam_interfaces.OperatorPeers
def publish_cluster_local_hostname(self, hostname: str | None = None):
"""Announce hostname on relation.
This will be used by our peers and clients to build a connection
@ -467,9 +490,11 @@ class OVSDBCMSProvidesHandler(
):
"""Handle provides side of ovsdb-cms."""
interface: "ovsdb.OVSDBCMSProvides"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -520,9 +545,11 @@ class OVSDBCMSRequiresHandler(
):
"""Handle provides side of ovsdb-cms."""
interface: "ovsdb.OVSDBCMSRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,

View File

View File

@ -23,12 +23,6 @@ import string
import typing
from typing import (
Callable,
Dict,
FrozenSet,
List,
Optional,
Tuple,
Union,
)
from urllib.parse import (
urlparse,
@ -47,6 +41,30 @@ from ops.model import (
UnknownStatus,
WaitingStatus,
)
from ops_sunbeam.core import (
RelationDataMapping,
)
if typing.TYPE_CHECKING:
import charms.ceilometer_k8s.v0.ceilometer_service as ceilometer_service
import charms.certificate_transfer_interface.v0.certificate_transfer as certificate_transfer
import charms.cinder_ceph_k8s.v0.ceph_access as ceph_access
import charms.data_platform_libs.v0.data_interfaces as data_interfaces
import charms.keystone_k8s.v0.identity_credentials as identity_credentials
import charms.keystone_k8s.v0.identity_resource as identity_resource
import charms.keystone_k8s.v1.identity_service as identity_service
import charms.loki_k8s.v1.loki_push_api as loki_push_api
import charms.nova_k8s.v0.nova_service as nova_service
import charms.rabbitmq_k8s.v0.rabbitmq as rabbitmq
import charms.tempo_k8s.v2.tracing as tracing
import charms.tls_certificates_interface.v3.tls_certificates as tls_certificates
import charms.traefik_k8s.v2.ingress as ingress
import charms.traefik_route_k8s.v0.traefik_route as traefik_route
import interface_ceph_client.ceph_client as ceph_client # type: ignore [import-untyped]
from ops_sunbeam.charm import (
OSBaseOperatorCharm,
)
logger = logging.getLogger(__name__)
@ -71,7 +89,7 @@ class RelationHandler(ops.framework.Object):
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -109,7 +127,7 @@ class RelationHandler(ops.framework.Object):
else:
status.set(WaitingStatus("integration incomplete"))
def setup_event_handler(self) -> ops.framework.Object:
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for the relation.
This method must be overridden in concrete class
@ -117,7 +135,7 @@ class RelationHandler(ops.framework.Object):
"""
raise NotImplementedError
def get_interface(self) -> Tuple[ops.framework.Object, str]:
def get_interface(self) -> tuple[ops.Object, str]:
"""Return the interface that this handler encapsulates.
This is a combination of the interface object and the
@ -157,9 +175,11 @@ class RelationHandler(ops.framework.Object):
class IngressHandler(RelationHandler):
"""Base class to handle Ingress relations."""
interface: "ingress.IngressPerAppRequirer"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
service_name: str,
default_ingress_port: int,
@ -235,7 +255,7 @@ class IngressHandler(RelationHandler):
return False
@property
def url(self) -> Optional[str]:
def url(self) -> str | None:
"""Return the URL used by the remote ingress service."""
if not self.ready:
return None
@ -264,9 +284,11 @@ class IngressPublicHandler(IngressHandler):
class DBHandler(RelationHandler):
"""Handler for DB relations."""
interface: "data_interfaces.DatabaseRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
database: str,
@ -323,7 +345,14 @@ class DBHandler(RelationHandler):
# this will be set to self.interface in parent class
return db
def _on_database_updated(self, event: ops.framework.EventBase) -> None:
def _on_database_updated(
self,
event: typing.Union[
"data_interfaces.DatabaseCreatedEvent",
"data_interfaces.DatabaseEndpointsChangedEvent",
"data_interfaces.DatabaseReadOnlyEndpointsChangedEvent",
],
) -> None:
"""Handle database change events."""
if not (event.username or event.password or event.endpoints):
return
@ -342,7 +371,7 @@ class DBHandler(RelationHandler):
if self.mandatory:
self.status.set(BlockedStatus("integration missing"))
def get_relation_data(self) -> dict:
def get_relation_data(self) -> RelationDataMapping:
"""Load the data from the relation for consumption in the handler."""
# there is at most one relation for a database
for relation in self.model.relations[self.relation_name]:
@ -400,15 +429,16 @@ class DBHandler(RelationHandler):
class RabbitMQHandler(RelationHandler):
"""Handler for managing a rabbitmq relation."""
interface: "rabbitmq.RabbitMQRequires"
DEFAULT_PORT = "5672"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
username: str,
vhost: int,
vhost: str,
mandatory: bool = False,
) -> None:
"""Run constructor."""
@ -495,12 +525,14 @@ class AMQPHandler(RabbitMQHandler):
class IdentityServiceRequiresHandler(RelationHandler):
"""Handler for managing a identity-service relation."""
interface: "identity_service.IdentityServiceRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
service_endpoints: dict,
service_endpoints: list[dict],
region: str,
mandatory: bool = False,
) -> None:
@ -543,7 +575,7 @@ class IdentityServiceRequiresHandler(RelationHandler):
if self.mandatory:
self.status.set(BlockedStatus("integration missing"))
def update_service_endpoints(self, service_endpoints: dict) -> None:
def update_service_endpoints(self, service_endpoints: list[dict]) -> None:
"""Update service endpoints on the relation."""
self.service_endpoints = service_endpoints
self.interface.register_services(service_endpoints, self.region)
@ -561,9 +593,10 @@ class IdentityServiceRequiresHandler(RelationHandler):
class BasePeerHandler(RelationHandler):
"""Base handler for managing a peers relation."""
interface: sunbeam_interfaces.OperatorPeers
LEADER_READY_KEY = "leader_ready"
def setup_event_handler(self) -> None:
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for peer relation."""
logger.debug("Setting up peer event handler")
# Lazy import to ensure this lib is only required if the charm
@ -609,19 +642,21 @@ class BasePeerHandler(RelationHandler):
except (AttributeError, KeyError):
return {}
def set_app_data(self, settings: dict) -> None:
def set_app_data(self, settings: RelationDataMapping) -> None:
"""Store data in peer app db."""
self.interface.set_app_data(settings)
def get_app_data(self, key: str) -> Optional[str]:
def get_app_data(self, key: str) -> str | None:
"""Retrieve data from the peer relation."""
return self.interface.get_app_data(key)
def leader_get(self, key: str) -> str:
def leader_get(self, key: str) -> str | None:
"""Retrieve data from the peer relation."""
return self.peers.get_app_data(key)
return self.interface.get_app_data(key)
def leader_set(self, settings: dict, **kwargs) -> None:
def leader_set(
self, settings: RelationDataMapping | None, **kwargs
) -> None:
"""Store data in peer app db."""
settings = settings or {}
settings.update(kwargs)
@ -639,13 +674,13 @@ class BasePeerHandler(RelationHandler):
else:
return json.loads(ready)
def set_unit_data(self, settings: Dict[str, str]) -> None:
def set_unit_data(self, settings: dict[str, str]) -> None:
"""Publish settings on the peer unit data bag."""
self.interface.set_unit_data(settings)
def get_all_unit_values(
self, key: str, include_local_unit: bool = False
) -> List[str]:
) -> list[str]:
"""Retrieve value for key from all related units.
:param include_local_unit: Include value set by local unit
@ -659,13 +694,15 @@ class BasePeerHandler(RelationHandler):
class CephClientHandler(RelationHandler):
"""Handler for ceph-client interface."""
interface: "ceph_client.CephClientRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
allow_ec_overwrites: bool = True,
app_name: str = None,
app_name: str | None = None,
mandatory: bool = False,
) -> None:
"""Run constructor."""
@ -713,15 +750,22 @@ class CephClientHandler(RelationHandler):
or config("rbd-pool")
or self.charm.app.name
)
metadata_pool_name = (
config("ec-rbd-metadata-pool") or f"{self.charm.app.name}-metadata"
# schema defined as str
metadata_pool_name: str = typing.cast(
str,
config("ec-rbd-metadata-pool")
or f"{self.charm.app.name}-metadata",
)
weight = config("ceph-pool-weight")
replicas = config("ceph-osd-replication-count")
# schema defined as int and with a default
# weight is then managed as a float.
weight = float(typing.cast(int, config("ceph-pool-weight")))
# schema defined as int and with a default
replicas = typing.cast(int, config("ceph-osd-replication-count"))
# TODO: add bluestore compression options
if config("pool-type") == ERASURE_CODED:
# General EC plugin config
plugin = config("ec-profile-plugin")
# schema defined as str and with a default
plugin = typing.cast(str, config("ec-profile-plugin"))
technique = config("ec-profile-technique")
device_class = config("ec-profile-device-class")
bdm_k = config("ec-profile-k")
@ -788,7 +832,7 @@ class CephClientHandler(RelationHandler):
return self.interface.pools_available
@property
def key(self) -> str:
def key(self) -> str | None:
"""Retrieve the cephx key provided for the application."""
return self.interface.get_relation_data().get("key")
@ -796,7 +840,11 @@ class CephClientHandler(RelationHandler):
"""Context containing Ceph connection data."""
ctxt = super().context()
data = self.interface.get_relation_data()
ctxt["mon_hosts"] = ",".join(sorted(data.get("mon_hosts")))
# mon_hosts is a list of sorted host strings
mon_hosts = typing.cast(list[str] | None, data.get("mon_hosts"))
if not mon_hosts:
return {}
ctxt["mon_hosts"] = ",".join(mon_hosts)
ctxt["auth"] = data.get("auth")
ctxt["key"] = data.get("key")
ctxt["rbd_features"] = None
@ -878,12 +926,8 @@ class _Store(abc.ABC):
class TlsCertificatesHandler(RelationHandler):
"""Handler for certificates interface."""
if typing.TYPE_CHECKING:
from charms.tls_certificates_interface.v3.tls_certificates import (
TLSCertificatesRequiresV3,
)
interface: TLSCertificatesRequiresV3
interface: "tls_certificates.TLSCertificatesRequiresV3"
store: _Store
class PeerStore(_Store):
"""Store private key secret id in peer storage relation."""
@ -943,7 +987,7 @@ class TlsCertificatesHandler(RelationHandler):
def __init__(
self,
charm: ops.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
sans_dns: list[str] | None = None,
@ -956,9 +1000,9 @@ class TlsCertificatesHandler(RelationHandler):
self.sans_ips = sans_ips
super().__init__(charm, relation_name, callback_f, mandatory)
try:
self.store = self.PeerStore(
self.model.get_relation("peers"), self.get_entity()
)
peer_relation = self.model.get_relation("peers")
# TODO(gboutry): fix type ignore
self.store = self.PeerStore(peer_relation, self.get_entity()) # type: ignore[arg-type]
except KeyError:
if self.app_managed_certificates():
raise RuntimeError(
@ -1269,9 +1313,11 @@ class TlsCertificatesHandler(RelationHandler):
class IdentityCredentialsRequiresHandler(RelationHandler):
"""Handles the identity credentials relation on the requires side."""
interface: "identity_credentials.IdentityCredentialsRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -1333,9 +1379,11 @@ class IdentityCredentialsRequiresHandler(RelationHandler):
class IdentityResourceRequiresHandler(RelationHandler):
"""Handles the identity resource relation on the requires side."""
interface: "identity_resource.IdentityResourceRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -1410,9 +1458,11 @@ class IdentityResourceRequiresHandler(RelationHandler):
class CeilometerServiceRequiresHandler(RelationHandler):
"""Handle ceilometer service relation on the requires side."""
interface: "ceilometer_service.CeilometerServiceRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -1435,7 +1485,7 @@ class CeilometerServiceRequiresHandler(RelationHandler):
"""
super().__init__(charm, relation_name, callback_f, mandatory)
def setup_event_handler(self) -> None:
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for Ceilometer service relation."""
import charms.ceilometer_k8s.v0.ceilometer_service as ceilometer_svc
@ -1483,9 +1533,11 @@ class CeilometerServiceRequiresHandler(RelationHandler):
class CephAccessRequiresHandler(RelationHandler):
"""Handles the ceph access relation on the requires side."""
interface: "ceph_access.CephAccessRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -1510,17 +1562,19 @@ class CephAccessRequiresHandler(RelationHandler):
import charms.cinder_ceph_k8s.v0.ceph_access as ceph_access
logger.debug("Setting up the ceph-access event handler")
ceph_access = sunbeam_tracing.trace_type(
ceph_access_requires = sunbeam_tracing.trace_type(
ceph_access.CephAccessRequires
)(
self.charm,
self.relation_name,
)
self.framework.observe(ceph_access.on.ready, self._ceph_access_ready)
self.framework.observe(
ceph_access.on.goneaway, self._ceph_access_goneaway
ceph_access_requires.on.ready, self._ceph_access_ready
)
return ceph_access
self.framework.observe(
ceph_access_requires.on.goneaway, self._ceph_access_goneaway
)
return ceph_access_requires
def _ceph_access_ready(self, event: ops.framework.EventBase) -> None:
"""React to credential ready event."""
@ -1556,10 +1610,12 @@ ExtraOpsProcess = Callable[[ops.EventBase, dict], None]
class UserIdentityResourceRequiresHandler(RelationHandler):
"""Handle user management on IdentityResource relation."""
interface: "identity_resource.IdentityResourceRequires"
CREDENTIALS_SECRET_PREFIX = "user-identity-resource-"
CONFIGURE_SECRET_PREFIX = "configure-credential-"
resource_identifiers: FrozenSet[str] = frozenset(
resource_identifiers: frozenset[str] = frozenset(
{
"name",
"email",
@ -1574,23 +1630,23 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
def __init__(
self,
charm: ops.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool,
name: str,
domain: str,
email: Optional[str] = None,
description: Optional[str] = None,
project: Optional[str] = None,
project_domain: Optional[str] = None,
email: str | None = None,
description: str | None = None,
project: str | None = None,
project_domain: str | None = None,
enable: bool = True,
may_exist: bool = True,
role: Optional[str] = None,
role: str | None = None,
add_suffix: bool = False,
rotate: ops.SecretRotate = ops.SecretRotate.NEVER,
extra_ops: Optional[List[Union[dict, Callable]]] = None,
extra_ops_process: Optional[ExtraOpsProcess] = None,
extra_ops: list[dict | Callable] | None = None,
extra_ops_process: ExtraOpsProcess | None = None,
):
self.username = name
super().__init__(charm, relation_name, callback_f, mandatory)
@ -1698,20 +1754,23 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
label=self.label,
rotate=self.rotate,
)
if not secret.id:
# We just created the secret, therefore id is always set
raise RuntimeError("Secret id not set")
self.charm.leader_set({self.label: secret.id})
return secret.id # type: ignore[union-attr]
return secret.id
def _grant_ops_secret(self, relation: ops.Relation):
secret = self.model.get_secret(id=self._ensure_credentials())
secret.grant(relation)
def _get_credentials(self) -> Tuple[str, str]:
def _get_credentials(self) -> tuple[str, str]:
credentials_id = self._ensure_credentials()
secret = self.model.get_secret(id=credentials_id)
content = secret.get_content(refresh=True)
return content["username"], content["password"]
def get_config_credentials(self) -> Optional[Tuple[str, str]]:
def get_config_credentials(self) -> tuple[str, str] | None:
"""Get credential from config secret."""
credentials_id = self.charm.leader_get(self.config_label)
if not credentials_id:
@ -1732,6 +1791,9 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
secret = self.model.app.add_secret(
content, label=self.config_label
)
if not secret.id:
# We just created the secret, therefore id is always set
raise RuntimeError("Secret id not set")
self.charm.leader_set({self.config_label: secret.id})
return True
@ -1787,8 +1849,8 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
return request
def _create_role_requests(
self, username, domain: Optional[str]
) -> List[dict]:
self, username, domain: str | None
) -> list[dict]:
requests = []
if self.role:
params = {
@ -1832,7 +1894,7 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
)
return requests
def _delete_user_request(self, users: List[str]) -> dict:
def _delete_user_request(self, users: list[str]) -> dict:
requests = []
for user in users:
params = {"name": user}
@ -1991,9 +2053,11 @@ class UserIdentityResourceRequiresHandler(RelationHandler):
class CertificateTransferRequiresHandler(RelationHandler):
"""Handle certificate transfer relation on the requires side."""
interface: "certificate_transfer.CertificateTransferRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -2016,7 +2080,7 @@ class CertificateTransferRequiresHandler(RelationHandler):
"""
super().__init__(charm, relation_name, callback_f, mandatory)
def setup_event_handler(self) -> None:
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for tls relation."""
logger.debug("Setting up certificate transfer event handler")
@ -2073,9 +2137,11 @@ class CertificateTransferRequiresHandler(RelationHandler):
class TraefikRouteHandler(RelationHandler):
"""Base class to handle traefik route relations."""
interface: "traefik_route.TraefikRouteRequirer"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -2094,7 +2160,7 @@ class TraefikRouteHandler(RelationHandler):
interface = sunbeam_tracing.trace_type(TraefikRouteRequirer)(
self.charm,
self.model.get_relation(self.relation_name),
self.model.get_relation(self.relation_name), # type: ignore # TraefikRouteRequirer has safeguards against None
self.relation_name,
)
@ -2147,9 +2213,11 @@ class TraefikRouteHandler(RelationHandler):
class NovaServiceRequiresHandler(RelationHandler):
"""Handle nova service relation on the requires side."""
interface: "nova_service.NovaServiceRequires"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
callback_f: Callable,
mandatory: bool = False,
@ -2172,7 +2240,7 @@ class NovaServiceRequiresHandler(RelationHandler):
"""
super().__init__(charm, relation_name, callback_f, mandatory)
def setup_event_handler(self) -> None:
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for Nova service relation."""
import charms.nova_k8s.v0.nova_service as nova_svc
@ -2216,9 +2284,11 @@ class NovaServiceRequiresHandler(RelationHandler):
class LogForwardHandler(RelationHandler):
"""Handle log forward relation on the requires side."""
interface: "loki_push_api.LogForwarder"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
mandatory: bool = False,
):
@ -2259,9 +2329,11 @@ class LogForwardHandler(RelationHandler):
class TracingRequireHandler(RelationHandler):
"""Handle tracing relation on the requires side."""
interface: "tracing.TracingEndpointRequirer"
def __init__(
self,
charm: ops.charm.CharmBase,
charm: "OSBaseOperatorCharm",
relation_name: str,
mandatory: bool = False,
protocols: list[str] | None = None,
@ -2297,10 +2369,11 @@ class TracingRequireHandler(RelationHandler):
def tracing_endpoint(self) -> str | None:
"""Otlp endpoint for charm tracing."""
if self.ready():
if self.ready:
return self.interface.get_endpoint("otlp_http")
return None
@property
def ready(self) -> bool:
"""Whether handler is ready for use."""
return self.interface.is_ready()

View File

@ -21,7 +21,6 @@ from pathlib import (
)
from typing import (
TYPE_CHECKING,
List,
)
import ops.pebble
@ -35,15 +34,15 @@ import jinja2
log = logging.getLogger(__name__)
def get_container(
containers: List["ops.model.Container"], name: str
) -> "ops.model.Container":
"""Search for container with given name inlist of containers."""
container = None
for c in containers:
if c.name == name:
container = c
return container
# def get_container(
# containers: list[ops.model.Container], name: str
# ) -> ops.model.Container | None:
# """Search for container with given name inlist of containers."""
# container = None
# for c in containers:
# if c.name == name:
# container = c
# return container
def sidecar_config_render(

View File

@ -25,8 +25,12 @@ import sys
import typing
import unittest
from typing import (
BinaryIO,
Dict,
List,
Optional,
TextIO,
Union,
)
from unittest.mock import (
MagicMock,
@ -35,6 +39,10 @@ from unittest.mock import (
)
import ops
import ops.storage
from ops_sunbeam.charm import (
OSBaseOperatorCharm,
)
sys.path.append("lib") # noqa
sys.path.append("src") # noqa
@ -166,20 +174,22 @@ class ContainerCalls:
def __init__(self) -> None:
"""Init container calls."""
self.start = collections.defaultdict(list)
self.stop = collections.defaultdict(list)
self.push = collections.defaultdict(list)
self.pull = collections.defaultdict(list)
self.execute = collections.defaultdict(list)
self.remove_path = collections.defaultdict(list)
self.start: dict[str, list[list[str]]] = collections.defaultdict(list)
self.stop: dict[str, list[list[str]]] = collections.defaultdict(list)
self.push: dict[str, list[dict]] = collections.defaultdict(list)
self.pull: dict[str, list[str]] = collections.defaultdict(list)
self.execute: dict[str, list[list[str]]] = collections.defaultdict(
list
)
self.remove_path: dict[str, list[str]] = collections.defaultdict(list)
def add_start(self, container_name: str, call: typing.Dict) -> None:
def add_start(self, container_name: str, call: list[str]) -> None:
"""Log a start call."""
self.start[container_name].append(call)
def add_stop(self, container_name: str, call: typing.Dict) -> None:
"""Log a start call."""
self.start[container_name].append(call)
def add_stop(self, container_name: str, call: list[str]) -> None:
"""Log a stop call."""
self.stop[container_name].append(call)
def started_services(self, container_name: str) -> List:
"""Distinct unordered list of services that were started."""
@ -205,15 +215,15 @@ class ContainerCalls:
)
)
def add_push(self, container_name: str, call: typing.Dict) -> None:
def add_push(self, container_name: str, call: dict) -> None:
"""Log a push call."""
self.push[container_name].append(call)
def add_pull(self, container_name: str, call: typing.Dict) -> None:
def add_pull(self, container_name: str, call: str) -> None:
"""Log a pull call."""
self.pull[container_name].append(call)
def add_execute(self, container_name: str, call: typing.List) -> None:
def add_execute(self, container_name: str, call: list[str]) -> None:
"""Log a execute call."""
self.execute[container_name].append(call)
@ -221,13 +231,11 @@ class ContainerCalls:
"""Log a remove path call."""
self.remove_path[container_name].append(call)
def updated_files(self, container_name: str) -> typing.List:
def updated_files(self, container_name: str) -> list:
"""Return a list of files that have been updated in a container."""
return [c["path"] for c in self.push.get(container_name, [])]
def file_update_calls(
self, container_name: str, file_name: str
) -> typing.List:
def file_update_calls(self, container_name: str, file_name: str) -> list:
"""Return the update call for File_name in container_name."""
return [
c
@ -241,21 +249,21 @@ class CharmTestCase(unittest.TestCase):
container_calls = ContainerCalls()
def setUp(self, obj: "typing.ANY", patches: "typing.List") -> None:
def setUp(self, obj: typing.Any, patches: list) -> None: # type: ignore
"""Run constructor."""
super().setUp()
self.patches = patches
self.obj = obj
self.patch_all()
def patch(self, method: "typing.ANY") -> Mock:
def patch(self, method: typing.Any) -> Mock:
"""Patch the named method on self.obj."""
_m = patch.object(self.obj, method)
mock = _m.start()
self.addCleanup(_m.stop)
return mock
def patch_obj(self, obj: "typing.ANY", method: "typing.ANY") -> Mock:
def patch_obj(self, obj: "typing.Any", method: "typing.Any") -> Mock:
"""Patch the named method on obj."""
_m = patch.object(obj, method)
mock = _m.start()
@ -271,13 +279,13 @@ class CharmTestCase(unittest.TestCase):
self,
container: str,
path: str,
contents: typing.List = None,
user: str = None,
group: str = None,
permissions: str = None,
contents: list | None = None,
user: str | None = None,
group: str | None = None,
permissions: str | None = None,
) -> None:
"""Check the attributes of a file."""
client = self.harness.charm.unit.get_container(container)._pebble
client = self.harness.charm.unit.get_container(container)._pebble # type: ignore
files = client.list_files(path, itself=True)
self.assertEqual(len(files), 1)
test_file = files[0]
@ -294,7 +302,7 @@ class CharmTestCase(unittest.TestCase):
self.assertEqual(test_file.permissions, permissions)
def add_ingress_relation(harness: Harness, endpoint_type: str) -> str:
def add_ingress_relation(harness: Harness, endpoint_type: str) -> int:
"""Add ingress relation."""
app_name = "traefik-" + endpoint_type
unit_name = app_name + "/0"
@ -305,7 +313,7 @@ def add_ingress_relation(harness: Harness, endpoint_type: str) -> str:
def add_ingress_relation_data(
harness: Harness, rel_id: str, endpoint_type: str
harness: Harness, rel_id: int, endpoint_type: str
) -> None:
"""Add ingress data to ingress relation."""
app_name = "traefik-" + endpoint_type
@ -324,7 +332,7 @@ def add_complete_ingress_relation(harness: Harness) -> None:
add_ingress_relation_data(harness, rel_id, endpoint_type)
def add_base_amqp_relation(harness: Harness) -> str:
def add_base_amqp_relation(harness: Harness) -> int:
"""Add amqp relation."""
rel_id = harness.add_relation("amqp", "rabbitmq")
harness.add_relation_unit(rel_id, "rabbitmq/0")
@ -335,7 +343,7 @@ def add_base_amqp_relation(harness: Harness) -> str:
return rel_id
def add_amqp_relation_credentials(harness: Harness, rel_id: str) -> None:
def add_amqp_relation_credentials(harness: Harness, rel_id: int) -> None:
"""Add amqp data to amqp relation."""
harness.update_relation_data(
rel_id,
@ -344,7 +352,7 @@ def add_amqp_relation_credentials(harness: Harness, rel_id: str) -> None:
)
def add_base_ceph_access_relation(harness: Harness) -> str:
def add_base_ceph_access_relation(harness: Harness) -> int:
"""Add ceph-access relation."""
rel_id = harness.add_relation(
"ceph-access", "cinder-ceph", app_data={"a": "b"}
@ -352,7 +360,7 @@ def add_base_ceph_access_relation(harness: Harness) -> str:
return rel_id
def add_ceph_access_relation_response(harness: Harness, rel_id: str) -> None:
def add_ceph_access_relation_response(harness: Harness, rel_id: int) -> None:
"""Add secret data to cinder-access relation."""
credentials_content = {"uuid": "svcuser1", "key": "svcpass1"}
credentials_id = harness.add_model_secret(
@ -364,7 +372,7 @@ def add_ceph_access_relation_response(harness: Harness, rel_id: str) -> None:
)
def add_base_identity_service_relation(harness: Harness) -> str:
def add_base_identity_service_relation(harness: Harness) -> int:
"""Add identity-service relation."""
rel_id = harness.add_relation("identity-service", "keystone")
harness.add_relation_unit(rel_id, "keystone/0")
@ -376,7 +384,7 @@ def add_base_identity_service_relation(harness: Harness) -> str:
def add_identity_service_relation_response(
harness: Harness, rel_id: str
harness: Harness, rel_id: int
) -> None:
"""Add id service data to identity-service relation."""
credentials_content = {"username": "svcuser1", "password": "svcpass1"}
@ -408,7 +416,7 @@ def add_identity_service_relation_response(
)
def add_base_identity_credentials_relation(harness: Harness) -> str:
def add_base_identity_credentials_relation(harness: Harness) -> int:
"""Add identity-service relation."""
rel_id = harness.add_relation("identity-credentials", "keystone")
harness.add_relation_unit(rel_id, "keystone/0")
@ -420,7 +428,7 @@ def add_base_identity_credentials_relation(harness: Harness) -> str:
def add_identity_credentials_relation_response(
harness: Harness, rel_id: str
harness: Harness, rel_id: int
) -> None:
"""Add id service data to identity-service relation."""
credentials_content = {"username": "username", "password": "user-password"}
@ -451,7 +459,7 @@ def add_identity_credentials_relation_response(
)
def add_base_db_relation(harness: Harness) -> str:
def add_base_db_relation(harness: Harness) -> int:
"""Add db relation."""
rel_id = harness.add_relation("database", "mysql")
harness.add_relation_unit(rel_id, "mysql/0")
@ -462,7 +470,7 @@ def add_base_db_relation(harness: Harness) -> str:
return rel_id
def add_db_relation_credentials(harness: Harness, rel_id: str) -> None:
def add_db_relation_credentials(harness: Harness, rel_id: int) -> None:
"""Add db credentials data to db relation."""
secret_id = harness.add_model_secret(
"mysql", {"username": "foo", "password": "hardpassword"}
@ -487,35 +495,35 @@ def add_api_relations(harness: Harness) -> None:
)
def add_complete_db_relation(harness: Harness) -> None:
def add_complete_db_relation(harness: Harness) -> int:
"""Add complete DB relation."""
rel_id = add_base_db_relation(harness)
add_db_relation_credentials(harness, rel_id)
return rel_id
def add_complete_identity_relation(harness: Harness) -> None:
def add_complete_identity_relation(harness: Harness) -> int:
"""Add complete Identity relation."""
rel_id = add_base_identity_service_relation(harness)
add_identity_service_relation_response(harness, rel_id)
return rel_id
def add_complete_identity_credentials_relation(harness: Harness) -> None:
def add_complete_identity_credentials_relation(harness: Harness) -> int:
"""Add complete identity-credentials relation."""
rel_id = add_base_identity_credentials_relation(harness)
add_identity_credentials_relation_response(harness, rel_id)
return rel_id
def add_complete_amqp_relation(harness: Harness) -> None:
def add_complete_amqp_relation(harness: Harness) -> int:
"""Add complete AMQP relation."""
rel_id = add_base_amqp_relation(harness)
add_amqp_relation_credentials(harness, rel_id)
return rel_id
def add_ceph_relation_credentials(harness: Harness, rel_id: str) -> None:
def add_ceph_relation_credentials(harness: Harness, rel_id: int) -> None:
"""Add amqp data to amqp relation."""
# During tests the charm class is never destroyed and recreated as it
# would be between hook executions. This means request is never marked
@ -546,7 +554,7 @@ def add_ceph_relation_credentials(harness: Harness, rel_id: str) -> None:
harness.add_relation_unit(rel_id, "ceph-mon/1")
def add_base_ceph_relation(harness: Harness) -> str:
def add_base_ceph_relation(harness: Harness) -> int:
"""Add identity-service relation."""
rel_id = harness.add_relation("ceph", "ceph-mon")
harness.add_relation_unit(rel_id, "ceph-mon/0")
@ -556,14 +564,14 @@ def add_base_ceph_relation(harness: Harness) -> str:
return rel_id
def add_complete_ceph_relation(harness: Harness) -> None:
def add_complete_ceph_relation(harness: Harness) -> int:
"""Add complete ceph relation."""
rel_id = add_base_ceph_relation(harness)
add_ceph_relation_credentials(harness, rel_id)
return rel_id
def add_certificates_relation_certs(harness: Harness, rel_id: str) -> None:
def add_certificates_relation_certs(harness: Harness, rel_id: int) -> None:
"""Add cert data to certificates relation."""
cert = {
"certificate": TEST_SERVER_CERT,
@ -576,7 +584,7 @@ def add_certificates_relation_certs(harness: Harness, rel_id: str) -> None:
)
def add_base_certificates_relation(harness: Harness) -> str:
def add_base_certificates_relation(harness: Harness) -> int:
"""Add certificates relation."""
rel_id = harness.add_relation("certificates", "vault")
harness.add_relation_unit(rel_id, "vault/0")
@ -593,14 +601,14 @@ def add_base_certificates_relation(harness: Harness) -> str:
return rel_id
def add_complete_certificates_relation(harness: Harness) -> None:
def add_complete_certificates_relation(harness: Harness) -> int:
"""Add complete certificates relation."""
rel_id = add_base_certificates_relation(harness)
add_certificates_relation_certs(harness, rel_id)
return rel_id
def add_complete_peer_relation(harness: Harness) -> None:
def add_complete_peer_relation(harness: Harness) -> int:
"""Add complete peer relation."""
rel_id = harness.add_relation("peers", harness.charm.app.name)
new_unit = f"{harness.charm.app.name}/1"
@ -643,12 +651,12 @@ test_relations = {
}
def add_all_relations(harness: Harness) -> None:
def add_all_relations(harness: Harness) -> dict[str, int]:
"""Add all the relations there are test relations for."""
rel_ids = {}
for key in harness._meta.relations.keys():
if test_relations.get(key):
rel_id = test_relations[key](harness)
if add_complete_relation := test_relations.get(key):
rel_id = add_complete_relation(harness)
rel_ids[key] = rel_id
return rel_ids
@ -670,50 +678,52 @@ def set_remote_leader_ready(
def get_harness(
charm_class: ops.charm.CharmBase,
charm_metadata: str = None,
container_calls: dict = None,
charm_config: str = None,
charm_actions: str = None,
initial_charm_config: dict = None,
charm_class: type[OSBaseOperatorCharm],
charm_metadata: str | None = None,
container_calls: ContainerCalls | None = None,
charm_config: str | None = None,
charm_actions: str | None = None,
initial_charm_config: dict | None = None,
) -> Harness:
"""Return a testing harness."""
if container_calls is None:
container_calls = ContainerCalls()
class _OSTestingPebbleClient(_TestingPebbleClient):
container_name: str
def exec(
self,
command: typing.List[str],
command: list[str],
*,
service_context: Optional[str] = None,
environment: typing.Dict[str, str] = None,
working_dir: str = None,
timeout: float = None,
user_id: int = None,
user: str = None,
group_id: int = None,
group: str = None,
stdin: typing.Union[
str, bytes, typing.TextIO, typing.BinaryIO
] = None,
stdout: typing.Union[typing.TextIO, typing.BinaryIO] = None,
stderr: typing.Union[typing.TextIO, typing.BinaryIO] = None,
encoding: str = "utf-8",
environment: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
timeout: Optional[float] = None,
user_id: Optional[int] = None,
user: Optional[str] = None,
group_id: Optional[int] = None,
group: Optional[str] = None,
stdin: Optional[Union[str, bytes, TextIO, BinaryIO]] = None,
stdout: Optional[Union[TextIO, BinaryIO]] = None,
stderr: Optional[Union[TextIO, BinaryIO]] = None,
encoding: Optional[str] = "utf-8",
combine_stderr: bool = False,
) -> None:
container_calls.add_execute(self.container_name, command)
) -> ops.pebble.ExecProcess[typing.Any]:
container_calls.add_execute(self.container_name, command) # type: ignore
process_mock = MagicMock()
process_mock.wait_output.return_value = ("", None)
return process_mock
def start_services(
self,
services: List[str],
services: list[str],
timeout: float = 30.0,
delay: float = 0.1,
) -> None:
"""Record start service events."""
super().start_services(services, timeout, delay)
container_calls.add_start(self.container_name, services)
container_calls.add_start(self.container_name, services) # type: ignore
def stop_services(
self,
@ -723,7 +733,7 @@ def get_harness(
) -> None:
"""Record stop service events."""
super().stop_services(services, timeout, delay)
container_calls.add_stop(self.container_name, services)
container_calls.add_stop(self.container_name, services) # type: ignore
class _OSTestingModelBackend(_TestingModelBackend):
def get_pebble(self, socket_path: str) -> _OSTestingPebbleClient:
@ -749,10 +759,10 @@ def get_harness(
self._pebble_clients[container] = client
self._pebble_clients_can_connect[client] = False
return client
return client # type: ignore
def network_get(
self, endpoint_name: str, relation_id: str = None
def network_get( # type: ignore
self, endpoint_name: str, relation_id: int | None = None
) -> dict:
"""Return a fake set of network data."""
network_data = {
@ -769,7 +779,7 @@ def get_harness(
}
return network_data
filename = inspect.getfile(charm_class)
filename = inspect.getfile(charm_class) # type: ignore
# Use pathlib.Path(filename).parents[1] if tests structure is
# <charm>/unit_tests
# Use pathlib.Path(filename).parents[2] if tests structure is
@ -792,9 +802,12 @@ def get_harness(
harness._backend = _OSTestingModelBackend(
harness._unit_name, harness._meta, harness._get_config(charm_config)
)
harness._model = model.Model(harness._meta, harness._backend)
harness._model = model.Model(harness._meta, harness._backend) # type: ignore
harness._framework = framework.Framework(
":memory:", harness._charm_dir, harness._meta, harness._model
ops.storage.SQLiteStorage(":memory:"),
harness._charm_dir,
harness._meta,
harness._model,
)
harness.set_model_name("test-model")

View File

@ -24,7 +24,7 @@ from typing import (
overload,
)
_T = TypeVar("_T")
_T = TypeVar("_T", bound=type)
try:
from charms.tempo_k8s.v1.charm_tracing import (

View File

@ -62,6 +62,11 @@ then
pflake8 --config pyproject.toml ${src_path} ${tst_path} ${ops_sunbeam_src_path} ${ops_sunbeam_tst_path} || exit 1
isort --check-only --diff ${src_path} ${tst_path} ${ops_sunbeam_src_path} ${ops_sunbeam_tst_path} || exit 1
black --config pyproject.toml --check --diff ${src_path} ${tst_path} ${ops_sunbeam_src_path} ${ops_sunbeam_tst_path} || exit 1
elif [[ $1 == "linters" ]]
then
ops_sunbeam_src_path="ops-sunbeam/ops_sunbeam"
PYTHONPATH=$(python3 ./repository.py pythonpath) mypy ${ops_sunbeam_src_path}
elif [[ $1 =~ ^(py3|py310|py311)$ ]];
then

View File

@ -70,6 +70,13 @@ deps = {[testenv:py3]deps}
commands =
{toxinidir}/run_tox.sh cover {posargs}
[testenv:linters]
deps =
{[testenv:py3]deps}
mypy
commands =
{toxinidir}/run_tox.sh linters
[testenv:build]
basepython = python3
deps = pyyaml

View File

@ -8,6 +8,7 @@
version 3 releases designated for testing the latest release.
check:
jobs:
- openstack-tox-linters
- openstack-tox-pep8
- openstack-tox-py310:
branches:
@ -19,6 +20,7 @@
- main
gate:
jobs:
- openstack-tox-linters
- openstack-tox-pep8
- openstack-tox-py310:
branches: