Add docstring and type hint linting

This commit is contained in:
Liam Young 2021-10-21 15:12:01 +01:00
parent 9c76f7f9f2
commit be3b333bb0
16 changed files with 931 additions and 742 deletions

View File

@ -0,0 +1,15 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for shared code for ops charms."""

View File

@ -51,59 +51,66 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
_state = ops.framework.StoredState()
def __init__(self, framework):
def __init__(self, framework: ops.framework.Framework) -> None:
"""Run constructor."""
super().__init__(framework)
self._state.set_default(bootstrapped=False)
self.relation_handlers = self.get_relation_handlers()
self.pebble_handlers = self.get_pebble_handlers()
self.framework.observe(self.on.config_changed,
self._on_config_changed)
self.framework.observe(self.on.config_changed, self._on_config_changed)
def can_add_handler(self, relation_name, handlers):
def can_add_handler(
self,
relation_name: str,
handlers: List[sunbeam_rhandlers.RelationHandler],
) -> bool:
"""Whether a handler for the given relation can be added."""
if relation_name not in self.meta.relations.keys():
logging.debug(
f"Cannot add handler for relation {relation_name}, relation "
"not present in charm metadata")
"not present in charm metadata"
)
return False
if relation_name in [h.relation_name for h in handlers]:
logging.debug(
f"Cannot add handler for relation {relation_name}, handler "
"already present")
"already present"
)
return False
return True
def get_relation_handlers(self, handlers=None) -> List[
sunbeam_rhandlers.RelationHandler]:
def get_relation_handlers(
self, handlers: List[sunbeam_rhandlers.RelationHandler] = None
) -> List[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
if self.can_add_handler('amqp', handlers):
if self.can_add_handler("amqp", handlers):
self.amqp = sunbeam_rhandlers.AMQPHandler(
self,
'amqp',
"amqp",
self.configure_charm,
self.config.get('rabbit-user') or self.service_name,
self.config.get('rabbit-vhost') or 'openstack')
self.config.get("rabbit-user") or self.service_name,
self.config.get("rabbit-vhost") or "openstack",
)
handlers.append(self.amqp)
if self.can_add_handler('shared-db', handlers):
if self.can_add_handler("shared-db", handlers):
self.db = sunbeam_rhandlers.DBHandler(
self,
'shared-db',
self.configure_charm,
self.databases)
self, "shared-db", self.configure_charm, self.databases
)
handlers.append(self.db)
if self.can_add_handler('ingress', handlers):
if self.can_add_handler("ingress", handlers):
self.ingress = sunbeam_rhandlers.IngressHandler(
self,
'ingress',
"ingress",
self.service_name,
self.default_public_ingress_port,
self.configure_charm)
self.configure_charm,
)
handlers.append(self.ingress)
if self.can_add_handler('peers', handlers):
if self.can_add_handler("peers", handlers):
self.peers = sunbeam_rhandlers.BasePeerHandler(
self,
'peers',
self.configure_charm)
self, "peers", self.configure_charm
)
handlers.append(self.peers)
return handlers
@ -117,12 +124,15 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
self.container_configs,
self.template_dir,
self.openstack_release,
self.configure_charm)]
self.configure_charm,
)
]
def configure_charm(self, event) -> None:
def configure_charm(self, event: ops.framework.EventBase) -> None:
"""Catchall handler to cconfigure charm services."""
if self.supports_peer_relation and not (self.unit.is_leader() or
self.is_leader_ready()):
if self.supports_peer_relation and not (
self.unit.is_leader() or self.is_leader_ready()
):
logging.debug("Leader not ready")
return
@ -148,8 +158,9 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
self._state.bootstrapped = True
@property
def supports_peer_relation(self):
return 'peers' in self.meta.relations.keys()
def supports_peer_relation(self) -> bool:
"""Whether the charm support the peers relation."""
return "peers" in self.meta.relations.keys()
@property
def container_configs(self) -> List[sunbeam_core.ContainerConfigFile]:
@ -157,26 +168,26 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
return []
@property
def config_contexts(self) -> List[
sunbeam_config_contexts.CharmConfigContext]:
"""Configuration adapters for the operator."""
return [
sunbeam_config_contexts.CharmConfigContext(self, 'options')]
def config_contexts(
self,
) -> List[sunbeam_config_contexts.CharmConfigContext]:
"""Return the configuration adapters for the operator."""
return [sunbeam_config_contexts.CharmConfigContext(self, "options")]
@property
def handler_prefix(self) -> str:
"""Prefix for handlers??"""
return self.service_name.replace('-', '_')
def _unused_handler_prefix(self) -> str:
"""Prefix for handlers."""
return self.service_name.replace("-", "_")
@property
def container_names(self):
"""Containers that form part of this service."""
def container_names(self) -> List[str]:
"""Names of Containers that form part of this service."""
return [self.service_name]
@property
def template_dir(self) -> str:
"""Directory containing Jinja2 templates."""
return 'src/templates'
return "src/templates"
@property
def databases(self) -> List[str]:
@ -184,9 +195,9 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
Defaults to a single database matching the app name.
"""
return [self.service_name.replace('-', '_')]
return [self.service_name.replace("-", "_")]
def _on_config_changed(self, event):
def _on_config_changed(self, event: ops.framework.EventBase) -> None:
self.configure_charm(None)
def containers_ready(self) -> bool:
@ -212,7 +223,8 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
if handler.relation_name not in self.meta.relations.keys():
logger.info(
f"Dropping handler for relation {handler.relation_name}, "
"relation not present in charm metadata")
"relation not present in charm metadata"
)
continue
if handler.ready:
ra.add_relation_handler(handler)
@ -231,74 +243,85 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
"""Determine whether the service has been boostrapped."""
return self._state.bootstrapped
def leader_set(self, settings=None, **kwargs):
"""Juju set data in peer data bag"""
def leader_set(self, settings: dict = None, **kwargs) -> None:
"""Juju set data in peer data bag."""
settings = settings or {}
settings.update(kwargs)
self.peers.set_app_data(
settings=settings)
self.peers.set_app_data(settings=settings)
def leader_get(self, key: str) -> str:
"""Retrieve data from the peer relation."""
return self.peers.get_app_data(key)
def set_leader_ready(self):
def set_leader_ready(self) -> None:
"""Tell peers that the leader is ready."""
self.peers.set_leader_ready()
def is_leader_ready(self):
def is_leader_ready(self) -> bool:
"""Has the lead unit announced that it is ready."""
return self.peers.is_leader_ready()
class OSBaseOperatorAPICharm(OSBaseOperatorCharm):
"""Base class for OpenStack API operators"""
"""Base class for OpenStack API operators."""
def __init__(self, framework):
def __init__(self, framework: ops.framework.Framework) -> None:
"""Run constructor."""
super().__init__(framework)
self._state.set_default(db_ready=False)
@property
def service_endpoints(self):
def service_endpoints(self) -> List[dict]:
"""List of endpoints for this service."""
return []
def get_relation_handlers(self, handlers=None) -> List[
sunbeam_rhandlers.RelationHandler]:
def get_relation_handlers(
self, handlers: List[sunbeam_rhandlers.RelationHandler] = None
) -> List[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
if self.can_add_handler('identity-service', handlers):
if self.can_add_handler("identity-service", handlers):
self.id_svc = sunbeam_rhandlers.IdentityServiceRequiresHandler(
self,
'identity-service',
"identity-service",
self.configure_charm,
self.service_endpoints,
self.model.config['region'])
self.model.config["region"],
)
handlers.append(self.id_svc)
handlers = super().get_relation_handlers(handlers)
return handlers
def service_url(self, hostname):
return f'http://{hostname}:{self.default_public_ingress_port}'
def service_url(self, hostname: str) -> str:
"""Service url for accessing this service via the given hostname."""
return f"http://{hostname}:{self.default_public_ingress_port}"
@property
def public_url(self):
def public_url(self) -> str:
"""Url for accessing the public endpoint for this service."""
svc_hostname = self.model.config.get(
'os-public-hostname',
self.service_name)
"os-public-hostname", self.service_name
)
return self.service_url(svc_hostname)
@property
def admin_url(self):
def admin_url(self) -> str:
"""Url for accessing the admin endpoint for this service."""
hostname = self.model.get_binding(
'identity-service').network.ingress_address
"identity-service"
).network.ingress_address
return self.service_url(hostname)
@property
def internal_url(self):
def internal_url(self) -> str:
"""Url for accessing the internal endpoint for this service."""
hostname = self.model.get_binding(
'identity-service').network.ingress_address
"identity-service"
).network.ingress_address
return self.service_url(hostname)
def get_pebble_handlers(self) -> List[sunbeam_chandlers.PebbleHandler]:
"""Pebble handlers for the service"""
"""Pebble handlers for the service."""
return [
sunbeam_chandlers.WSGIPebbleHandler(
self,
@ -308,18 +331,24 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharm):
self.template_dir,
self.openstack_release,
self.configure_charm,
f'wsgi-{self.service_name}')]
f"wsgi-{self.service_name}",
)
]
@property
def container_configs(self) -> List[sunbeam_core.ContainerConfigFile]:
"""Container configuration files for the service."""
_cconfigs = super().container_configs
_cconfigs.extend([
sunbeam_core.ContainerConfigFile(
[self.wsgi_container_name],
self.service_conf,
self.service_user,
self.service_group)])
_cconfigs.extend(
[
sunbeam_core.ContainerConfigFile(
[self.wsgi_container_name],
self.service_conf,
self.service_user,
self.service_group,
)
]
)
return _cconfigs
@property
@ -335,16 +364,19 @@ class OSBaseOperatorAPICharm(OSBaseOperatorCharm):
@property
def service_conf(self) -> str:
"""Service default configuration file."""
return f'/etc/{self.service_name}/{self.service_name}.conf'
return f"/etc/{self.service_name}/{self.service_name}.conf"
@property
def config_contexts(self) -> List[sunbeam_config_contexts.ConfigContext]:
"""Generate list of configuration adapters for the charm."""
_cadapters = super().config_contexts
_cadapters.extend([
sunbeam_config_contexts.WSGIWorkerConfigContext(
self,
'wsgi_config')])
_cadapters.extend(
[
sunbeam_config_contexts.WSGIWorkerConfigContext(
self, "wsgi_config"
)
]
)
return _cadapters
@property

View File

@ -19,40 +19,56 @@ create reusable contexts which translate charm config, deployment state etc.
These are not specific to a relation.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import advanced_sunbeam_openstack.charm
logger = logging.getLogger(__name__)
class ConfigContext():
class ConfigContext:
"""Base class used for creating a config context."""
def __init__(self, charm, namespace):
def __init__(
self,
charm: "advanced_sunbeam_openstack.charm.OSBaseOperatorCharm",
namespace: str,
) -> None:
"""Run constructor."""
self.charm = charm
self.namespace = namespace
for k, v in self.context().items():
k = k.replace('-', '_')
k = k.replace("-", "_")
setattr(self, k, v)
@property
def ready(self):
def ready(self) -> bool:
"""Whether the context has all the data is needs."""
return True
def context(self):
def context(self) -> dict:
"""Context used when rendering templates."""
raise NotImplementedError
class CharmConfigContext(ConfigContext):
"""A context containing all of the charms config options"""
"""A context containing all of the charms config options."""
def context(self) -> dict:
"""Charms config options."""
return self.charm.config
class WSGIWorkerConfigContext(ConfigContext):
"""Configuration context for WSGI configuration."""
def context(self) -> dict:
"""A context containing WSGI configuration options"""
"""WSGI configuration options."""
return {
'name': self.charm.service_name,
'wsgi_admin_script': self.charm.wsgi_admin_script,
'wsgi_public_script': self.charm.wsgi_public_script}
"name": self.charm.service_name,
"wsgi_admin_script": self.charm.wsgi_admin_script,
"wsgi_public_script": self.charm.wsgi_public_script,
}

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base classes for defining Pebble handlers
"""Base classes for defining Pebble handlers.
The PebbleHandler defines the pebble layers, manages pushing
configuration to the containers and managing the service running
@ -37,11 +37,17 @@ class PebbleHandler(ops.charm.Object):
_state = ops.framework.StoredState()
def __init__(self, charm: ops.charm.CharmBase,
container_name: str, service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
template_dir: str, openstack_release: str,
callback_f: Callable):
def __init__(
self,
charm: ops.charm.CharmBase,
container_name: str,
service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
template_dir: str,
openstack_release: str,
callback_f: Callable,
) -> None:
"""Run constructor."""
super().__init__(charm, None)
self._state.set_default(pebble_ready=False)
self._state.set_default(config_pushed=False)
@ -58,52 +64,48 @@ class PebbleHandler(ops.charm.Object):
def setup_pebble_handler(self) -> None:
"""Configure handler for pebble ready event."""
prefix = self.container_name.replace('-', '_')
pebble_ready_event = getattr(
self.charm.on,
f'{prefix}_pebble_ready')
self.framework.observe(pebble_ready_event,
self._on_service_pebble_ready)
prefix = self.container_name.replace("-", "_")
pebble_ready_event = getattr(self.charm.on, f"{prefix}_pebble_ready")
self.framework.observe(
pebble_ready_event, self._on_service_pebble_ready
)
def _on_service_pebble_ready(self,
event: ops.charm.PebbleReadyEvent) -> None:
def _on_service_pebble_ready(
self, event: ops.charm.PebbleReadyEvent
) -> None:
"""Handle pebble ready event."""
container = event.workload
container.add_layer(
self.service_name,
self.get_layer(),
combine=True)
logger.debug(f'Plan: {container.get_plan()}')
container.add_layer(self.service_name, self.get_layer(), combine=True)
logger.debug(f"Plan: {container.get_plan()}")
self.ready = True
self._state.pebble_ready = True
self.charm.configure_charm(event)
def write_config(self, context) -> None:
def write_config(self, context: sunbeam_core.OPSCharmContexts) -> None:
"""Write configuration files into the container.
On the pre-condition that all relation adapters are ready
for use, write all configuration files into the container
so that the underlying service may be started.
"""
container = self.charm.unit.get_container(
self.container_name)
container = self.charm.unit.get_container(self.container_name)
if container:
sunbeam_templating.sidecar_config_render(
[container],
self.container_configs,
self.template_dir,
self.openstack_release,
context)
context,
)
self._state.config_pushed = True
else:
logger.debug(
'Container not ready')
logger.debug("Container not ready")
def get_layer(self) -> dict:
"""Pebble configuration layer for the container"""
"""Pebble configuration layer for the container."""
return {}
def init_service(self, context) -> None:
def init_service(self, context: sunbeam_core.OPSCharmContexts) -> None:
"""Initialise service ready for use.
Write configuration files to the container and record
@ -112,8 +114,9 @@ class PebbleHandler(ops.charm.Object):
self.write_config(context)
self._state.service_ready = True
def default_container_configs(self) -> List[
sunbeam_core.ContainerConfigFile]:
def default_container_configs(
self,
) -> List[sunbeam_core.ContainerConfigFile]:
"""Generate default container configurations.
These should be used by all inheriting classes and are
@ -141,23 +144,37 @@ class PebbleHandler(ops.charm.Object):
class WSGIPebbleHandler(PebbleHandler):
"""WSGI oriented handler for a Pebble managed container."""
def __init__(self, charm: ops.charm.CharmBase,
container_name: str, service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
template_dir: str, openstack_release: str,
callback_f: Callable,
wsgi_service_name: str):
super().__init__(charm, container_name, service_name,
container_configs, template_dir, openstack_release,
callback_f)
def __init__(
self,
charm: ops.charm.CharmBase,
container_name: str,
service_name: str,
container_configs: List[sunbeam_core.ContainerConfigFile],
template_dir: str,
openstack_release: str,
callback_f: Callable,
wsgi_service_name: str,
) -> None:
"""Run constructor."""
super().__init__(
charm,
container_name,
service_name,
container_configs,
template_dir,
openstack_release,
callback_f,
)
self.wsgi_service_name = wsgi_service_name
def start_wsgi(self) -> None:
"""Start WSGI service"""
"""Start WSGI service."""
container = self.charm.unit.get_container(self.container_name)
if not container:
logger.debug(f'{self.container_name} container is not ready. '
'Cannot start wgi service.')
logger.debug(
f"{self.container_name} container is not ready. "
"Cannot start wgi service."
)
return
service = container.get_service(self.wsgi_service_name)
if service.is_running():
@ -166,34 +183,35 @@ class WSGIPebbleHandler(PebbleHandler):
container.start(self.wsgi_service_name)
def get_layer(self) -> dict:
"""Apache WSGI service pebble layer
"""Apache WSGI service pebble layer.
:returns: pebble layer configuration for wsgi service
"""
return {
'summary': f'{self.service_name} layer',
'description': 'pebble config layer for apache wsgi',
'services': {
f'{self.wsgi_service_name}': {
'override': 'replace',
'summary': f'{self.service_name} wsgi',
'command': '/usr/sbin/apache2ctl -DFOREGROUND',
'startup': 'disabled',
"summary": f"{self.service_name} layer",
"description": "pebble config layer for apache wsgi",
"services": {
f"{self.wsgi_service_name}": {
"override": "replace",
"summary": f"{self.service_name} wsgi",
"command": "/usr/sbin/apache2ctl -DFOREGROUND",
"startup": "disabled",
},
},
}
def init_service(self, context) -> None:
"""Enable and start WSGI service"""
def init_service(self, context: sunbeam_core.OPSCharmContexts) -> None:
"""Enable and start WSGI service."""
container = self.charm.unit.get_container(self.container_name)
self.write_config(context)
try:
sunbeam_cprocess.check_output(
container,
f'a2ensite {self.wsgi_service_name} && sleep 1')
container, f"a2ensite {self.wsgi_service_name} && sleep 1"
)
except sunbeam_cprocess.ContainerProcessError:
logger.exception(
f'Failed to enable {self.wsgi_service_name} site in apache')
f"Failed to enable {self.wsgi_service_name} site in apache"
)
# ignore for now - pebble is raising an exited too quickly, but it
# appears to work properly.
self.start_wsgi()
@ -201,13 +219,15 @@ class WSGIPebbleHandler(PebbleHandler):
@property
def wsgi_conf(self) -> str:
return f'/etc/apache2/sites-available/wsgi-{self.service_name}.conf'
"""Location of WSGI config file."""
return f"/etc/apache2/sites-available/wsgi-{self.service_name}.conf"
def default_container_configs(self) -> List[
sunbeam_core.ContainerConfigFile]:
def default_container_configs(
self,
) -> List[sunbeam_core.ContainerConfigFile]:
"""Container configs for WSGI service."""
return [
sunbeam_core.ContainerConfigFile(
[self.container_name],
self.wsgi_conf,
'root',
'root')]
[self.container_name], self.wsgi_conf, "root", "root"
)
]

View File

@ -1,38 +1,69 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Collection of core components."""
import collections
from typing import Generator, List, TYPE_CHECKING, Tuple, Union
if TYPE_CHECKING:
from advanced_sunbeam_openstack.charm import OSBaseOperatorCharm
from advanced_sunbeam_openstack.config_contexts import ConfigContext
from advanced_sunbeam_openstack.relation_handlers import RelationHandler
ContainerConfigFile = collections.namedtuple(
'ContainerConfigFile',
['container_names', 'path', 'user', 'group'])
"ContainerConfigFile", ["container_names", "path", "user", "group"]
)
class OPSCharmContexts():
class OPSCharmContexts:
"""Set of config contexts and contexts from relation handlers."""
def __init__(self, charm):
def __init__(self, charm: "OSBaseOperatorCharm") -> None:
"""Run constructor."""
self.charm = charm
self.namespaces = []
def add_relation_handler(self, handler):
def add_relation_handler(self, handler: "RelationHandler") -> None:
"""Add relation handler."""
interface, relation_name = handler.get_interface()
_ns = relation_name.replace("-", "_")
self.namespaces.append(_ns)
ctxt = handler.context()
obj_name = ''.join([w.capitalize() for w in relation_name.split('-')])
obj_name = "".join([w.capitalize() for w in relation_name.split("-")])
obj = collections.namedtuple(obj_name, ctxt.keys())(*ctxt.values())
setattr(self, _ns, obj)
def add_config_contexts(self, config_adapters):
def add_config_contexts(
self, config_adapters: List["ConfigContext"]
) -> None:
"""Add multiple config contexts."""
for config_adapter in config_adapters:
self.add_config_context(
config_adapter,
config_adapter.namespace)
self.add_config_context(config_adapter, config_adapter.namespace)
def add_config_context(self, config_adapter, namespace):
def add_config_context(
self, config_adapter: "ConfigContext", namespace: str
) -> None:
"""Add add config adapater to context."""
self.namespaces.append(namespace)
setattr(self, namespace, config_adapter)
def __iter__(self):
"""
Iterate over the relations presented to the charm.
"""
def __iter__(
self,
) -> Generator[
Tuple[str, Union["ConfigContext", "RelationHandler"]], None, None
]:
"""Iterate over the relations presented to the charm."""
for namespace in self.namespaces:
yield namespace, getattr(self, namespace)

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for running commands in containers."""
import subprocess
import textwrap
import time
@ -44,49 +46,56 @@ class ContainerProcess(object):
:param tmp_dir: the dir containing the location of process files
:type tmp_dir: str
"""
def __init__(self, container: model.Container, process_name: str,
tmp_dir: str):
def __init__(
self, container: model.Container, process_name: str, tmp_dir: str
) -> None:
"""Run constructor."""
self.container = weakref.proxy(container)
self.process_name = process_name
self._returncode = RETURN_CODE_UNKNOWN
self.tmp_dir = tmp_dir
self.stdout_file = f'{tmp_dir}/{process_name}.stdout'
self.stderr_file = f'{tmp_dir}/{process_name}.stderr'
self.stdout_file = f"{tmp_dir}/{process_name}.stdout"
self.stderr_file = f"{tmp_dir}/{process_name}.stderr"
self._env = dict()
self.env_file = f'{tmp_dir}/{process_name}.env'
self.rc_file = f'{tmp_dir}/{process_name}.rc'
self.env_file = f"{tmp_dir}/{process_name}.env"
self.rc_file = f"{tmp_dir}/{process_name}.rc"
self._cleaned = False
@property
def stdout(self) -> typing.Union[typing.BinaryIO, typing.TextIO]:
return self.container.pull(f'{self.stdout_file}')
"""STDOUT of process."""
return self.container.pull(f"{self.stdout_file}")
@property
def stderr(self) -> typing.Union[typing.BinaryIO, typing.TextIO]:
return self.container.pull(f'{self.stderr_file}')
"""STDERR of process."""
return self.container.pull(f"{self.stderr_file}")
@property
def env(self) -> typing.Dict[str, str]:
"""Environment definition from container."""
if self._env:
return self._env
with self.container.pull(f'{self.env_file}') as f:
for env_vars in f.read().split(b'\n'):
key_values = env_vars.split(b'=', 1)
with self.container.pull(f"{self.env_file}") as f:
for env_vars in f.read().split(b"\n"):
key_values = env_vars.split(b"=", 1)
self._env[key_values[0]] = key_values[1]
return self._env
@property
def returncode(self) -> int:
"""Return code from the process."""
if self._returncode == RETURN_CODE_UNKNOWN:
self._returncode = self._get_returncode()
return self._returncode
def _get_returncode(self):
"""Reads the contents of the returncode file"""
def _get_returncode(self) -> int:
"""Read the contents of the returncode file."""
try:
with self.container.pull(f'{self.rc_file}') as text:
with self.container.pull(f"{self.rc_file}") as text:
return int(text.read())
except pebble.PathError:
# If the rc file doesn't exist within the container, then the
@ -95,9 +104,10 @@ class ContainerProcess(object):
@property
def completed(self) -> bool:
"""Whether process has completed."""
return self._returncode != RETURN_CODE_UNKNOWN
def check_returncode(self):
def check_returncode(self) -> None:
"""Raise CalledProcessError if the exit code is non-zero."""
if self.returncode:
stdout = None
@ -110,11 +120,12 @@ class ContainerProcess(object):
stderr = self.stderr.read()
except pebble.PathError:
pass
raise CalledProcessError(self.returncode, self.process_name,
stdout, stderr)
raise CalledProcessError(
self.returncode, self.process_name, stdout, stderr
)
def wait(self, timeout: int = 30) -> None:
"""Waits for the process to complete.
"""Wait for the process to complete.
Waits for the process to complete. If the process has not completed
within the timeout specified, this method will raise a TimeoutExpired
@ -150,11 +161,13 @@ class ContainerProcess(object):
if self._cleaned:
return
self.container.remove_path(f'{self.tmp_dir}', recursive=True)
self.container.remove_path(f"{self.tmp_dir}", recursive=True)
def __del__(self):
"""On destruction of this process, we'll attempt to clean up left over
process files.
def __del__(self) -> None:
"""On destruction of this process, cleanup.
On destruction of this process, attempt to clean up left over process
files.
"""
try:
self.cleanup()
@ -164,11 +177,14 @@ class ContainerProcess(object):
class ContainerProcessError(Exception):
"""Base class for exceptions raised within this module."""
pass
class CalledProcessError(ContainerProcessError):
"""Raised when an error occurs running a process in a container and
"""Exception when an error occurs running a process in a container.
Raised when an error occurs running a process in a container and
the check=True has been passed to raise an error on failure.
:param returncode: the exit code from the program
@ -180,8 +196,15 @@ class CalledProcessError(ContainerProcessError):
:param stderr: the output of the command on standard err
:type stderr: str
"""
def __init__(self, returncode: int, cmd: typing.Union[str, list],
stdout: str = None, stderr: str = None):
def __init__(
self,
returncode: int,
cmd: typing.Union[str, list],
stdout: str = None,
stderr: str = None,
) -> None:
"""Run constructor."""
self.returncode = returncode
self.cmd = cmd
self.stdout = stdout
@ -189,25 +212,32 @@ class CalledProcessError(ContainerProcessError):
class TimeoutExpired(ContainerProcessError):
"""This exception is raised when the timeout expires while waiting for a
container process.
"""Exception raised when timeout expires waiting for container process.
:param cmd: the command that was run
:type cmd: list
:param timeout: the configured timeout for the command
:type timeout: int
"""
def __init__(self, cmd: typing.Union[str, list], timeout: int):
def __init__(self, cmd: typing.Union[str, list], timeout: int) -> None:
"""Run constructor."""
self.cmd = cmd
self.timeout = timeout
def __str__(self):
def __str__(self) -> str:
"""Message to accompany exception."""
return f"Command '{self.cmd}' timed out after {self.timeout} seconds"
def run(container: model.Container, args: typing.List[str],
timeout: int = 30, check: bool = False,
env: dict = None, service_name: str = None) -> ContainerProcess:
def run(
container: model.Container,
args: typing.List[str],
timeout: int = 30,
check: bool = False,
env: dict = None,
service_name: str = None,
) -> ContainerProcess:
"""Run command with arguments in the specified container.
Run a command in the specified container and returns a
@ -233,23 +263,27 @@ def run(container: model.Container, args: typing.List[str],
:rtype: ContainerProcess
"""
if not container:
raise ValueError('container cannot be None')
raise ValueError("container cannot be None")
if not isinstance(container, model.Container):
raise ValueError('container must be of type ops.model.Container, '
f'not of type {type(container)}')
raise ValueError(
"container must be of type ops.model.Container, "
f"not of type {type(container)}"
)
if isinstance(args, str):
if service_name is None:
service_name = args.split(' ')[0]
service_name = service_name.split('/')[-1]
service_name = args.split(" ")[0]
service_name = service_name.split("/")[-1]
cmdline = args
elif isinstance(args, list):
if service_name is None:
service_name = args[0].split('/')[-1]
service_name = args[0].split("/")[-1]
cmdline = subprocess.list2cmdline(args)
else:
raise ValueError('args are expected to be a str or a list of str.'
f' Provided {type(args)}')
raise ValueError(
"args are expected to be a str or a list of str."
f" Provided {type(args)}"
)
tmp_dir = f'/tmp/{service_name}-{str(uuid.uuid4()).split("-")[0]}'
process = ContainerProcess(container, service_name, tmp_dir)
@ -265,50 +299,62 @@ def run(container: model.Container, args: typing.List[str],
"""
command = textwrap.dedent(command)
container.push(path=f'/tmp/{service_name}.sh', source=command,
encoding='utf-8', permissions=0o755)
container.push(
path=f"/tmp/{service_name}.sh",
source=command,
encoding="utf-8",
permissions=0o755,
)
logger.debug(f'Adding layer for {service_name} to run command '
f'{cmdline}')
container.add_layer('process_layer', {
'summary': 'container process runner',
'description': 'layer for running single-shot commands (kinda)',
'services': {
service_name: {
'override': 'replace',
'summary': cmdline,
'command': f'/tmp/{service_name}.sh',
'startup': 'disabled',
'environment': env or {},
}
}
}, combine=True)
logger.debug(
f"Adding layer for {service_name} to run command " f"{cmdline}"
)
container.add_layer(
"process_layer",
{
"summary": "container process runner",
"description": "layer for running single-shot commands (kinda)",
"services": {
service_name: {
"override": "replace",
"summary": cmdline,
"command": f"/tmp/{service_name}.sh",
"startup": "disabled",
"environment": env or {},
}
},
},
combine=True,
)
timeout_at = time.time() + timeout
try:
# Start the service which will run the command.
logger.debug(f'Starting {service_name} via pebble')
logger.debug(f"Starting {service_name} via pebble")
# TODO(wolsen) this is quite naughty, but the container object
# doesn't provide us access to the pebble layer to specify
# timeouts and such. Some commands may need a longer time to
# start, and as such I'm using the private internal reference
# in order to be able to specify the timeout itself.
container._pebble.start_services([service_name], # noqa
timeout=float(timeout))
container._pebble.start_services(
[service_name], timeout=float(timeout) # noqa
)
except pebble.ChangeError:
# Check to see if the command has timed out and if so, raise
# the TimeoutExpired.
if time.time() >= timeout_at:
logger.error(f'Command {cmdline} could not start out after '
f'{timeout} seconds in container '
f'{container.name}')
logger.error(
f"Command {cmdline} could not start out after "
f"{timeout} seconds in container "
f"{container.name}"
)
raise TimeoutExpired(args, timeout)
# Note, this could be expected.
logger.exception(f'Error running {service_name}')
logger.exception(f"Error running {service_name}")
logger.debug('Waiting for process completion...')
logger.debug("Waiting for process completion...")
process.wait(timeout)
# It appears that pebble services are still active after the command
@ -320,16 +366,20 @@ def run(container: model.Container, args: typing.List[str],
except pebble.ChangeError as e:
# Eat the change error that might occur. This was a best effort
# attempt to ensure the process is stopped
logger.exception(f'Failed to stop service {service_name}', e)
logger.exception(f"Failed to stop service {service_name}", e)
if check:
process.check_returncode()
return process
def call(container: model.Container, args: typing.Union[str, list],
env: dict = None, timeout: int = 30) -> int:
"""Runs a command in the container.
def call(
container: model.Container,
args: typing.Union[str, list],
env: dict = None,
timeout: int = 30,
) -> int:
"""Run a command in the container.
The command will run until the process completes (either normally or
abnormally) or until the timeout expires.
@ -349,17 +399,39 @@ def call(container: model.Container, args: typing.Union[str, list],
return run(container, args, env=env, timeout=timeout).returncode
def check_call(container: model.Container, args: typing.Union[str, list],
env: dict = None, timeout: int = 30,
service_name: str = None) -> None:
run(container, args, env=env, check=True, timeout=timeout,
service_name=service_name)
def check_call(
container: model.Container,
args: typing.Union[str, list],
env: dict = None,
timeout: int = 30,
service_name: str = None,
) -> None:
"""Run command and check it succeeds."""
run(
container,
args,
env=env,
check=True,
timeout=timeout,
service_name=service_name,
)
def check_output(container: model.Container, args: typing.Union[str, list],
env: dict = None, timeout: int = 30,
service_name: str = None) -> str:
process = run(container, args, env=env, check=True, timeout=timeout,
service_name=service_name)
def check_output(
container: model.Container,
args: typing.Union[str, list],
env: dict = None,
timeout: int = 30,
service_name: str = None,
) -> str:
"""Return output from command execution."""
process = run(
container,
args,
env=env,
check=True,
timeout=timeout,
service_name=service_name,
)
with process.stdout as stdout:
return stdout.read()

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to handle errors and bailing out of an event/hook."""
import logging
from contextlib import contextmanager
@ -22,20 +24,27 @@ logger = logging.getLogger(__name__)
class GuardException(Exception):
"""GuardException."""
pass
class BlockedException(Exception):
"""Charm is blocked."""
pass
@contextmanager
def guard(charm: 'CharmBase',
section: str,
handle_exception: bool = True,
log_traceback: bool = True,
**__):
def guard(
charm: "CharmBase",
section: str,
handle_exception: bool = True,
log_traceback: bool = True,
**__
) -> None:
"""Context manager to handle errors and bailing out of an event/hook.
The nature of Juju is that all the information may not be available to run
a set of actions. This context manager allows a section of code to be
'guarded' so that it can be bailed at any time.
@ -54,21 +63,28 @@ def guard(charm: 'CharmBase',
yield
logging.info("Completed guarded section fully: '%s'", section)
except GuardException as e:
logger.info("Guarded Section: Early exit from '%s' due to '%s'.",
section, str(e))
logger.info(
"Guarded Section: Early exit from '%s' due to '%s'.",
section,
str(e),
)
except BlockedException as e:
logger.warning(
"Charm is blocked in section '%s' due to '%s'", section, str(e))
"Charm is blocked in section '%s' due to '%s'", section, str(e)
)
charm.unit.status = BlockedStatus(e.msg)
except Exception as e:
# something else went wrong
if handle_exception:
logging.error("Exception raised in secion '%s': %s",
section, str(e))
logging.error(
"Exception raised in secion '%s': %s", section, str(e)
)
if log_traceback:
import traceback
logging.error(traceback.format_exc())
charm.unit.status = BlockedStatus(
"Error in charm (see logs): {}".format(str(e)))
"Error in charm (see logs): {}".format(str(e))
)
return
raise

View File

@ -1,8 +1,24 @@
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common interfaces not charm specific."""
import logging
import typing
import ops.model
from ops.framework import EventBase
from ops.framework import EventSource
from ops.framework import Object
@ -11,80 +27,76 @@ from ops.framework import StoredState
class PeersRelationCreatedEvent(EventBase):
"""
The PeersRelationCreatedEvent indicates that the peer relation now exists.
"""The PeersRelationCreatedEvent indicates that the peer relation now exists.
It does not indicate that any peers are available or have joined, simply
that the relation exists. This is useful to to indicate that the
application databag is available for storing information shared across
units.
"""
pass
class PeersDataChangedEvent(EventBase):
"""
The CharmPasswordChangedEvent indicates that the leader unit has changed
the password that the charm administrator uses.
"""
"""The PeersDataChangedEvent indicates peer data hjas changed."""
pass
class PeersEvents(ObjectEvents):
"""Peer Events."""
peers_relation_created = EventSource(PeersRelationCreatedEvent)
peers_data_changed = EventSource(PeersDataChangedEvent)
class OperatorPeers(Object):
"""Interface for the peers relation."""
on = PeersEvents()
state = StoredState()
def __init__(self, charm, relation_name):
def __init__(self, charm: ops.charm.CharmBase, relation_name: str) -> None:
"""Run constructor."""
super().__init__(charm, relation_name)
self.relation_name = relation_name
self.framework.observe(
charm.on[relation_name].relation_created,
self.on_created
charm.on[relation_name].relation_created, self.on_created
)
self.framework.observe(
charm.on[relation_name].relation_changed,
self.on_changed
charm.on[relation_name].relation_changed, self.on_changed
)
@property
def peers_rel(self):
def peers_rel(self) -> ops.model.Relation:
"""Peer relation."""
return self.framework.model.get_relation(self.relation_name)
@property
def _app_data_bag(self) -> typing.Dict[str, str]:
"""
"""
"""Return all app data on peer relation."""
return self.peers_rel.data[self.peers_rel.app]
def on_created(self, event):
logging.info('Peers on_created')
def on_created(self, event: ops.framework.EventBase) -> None:
"""Handle relation created event."""
logging.info("Peers on_created")
self.on.peers_relation_created.emit()
def on_changed(self, event):
logging.info('Peers on_changed')
def on_changed(self, event: ops.framework.EventBase) -> None:
"""Handle relation changed event."""
logging.info("Peers on_changed")
self.on.peers_data_changed.emit()
def set_app_data(self, settings) -> None:
"""
"""
def set_app_data(self, settings: typing.Dict[str, str]) -> None:
"""Publish settings on the peer app data bag."""
for k, v in settings.items():
self._app_data_bag[k] = v
def get_app_data(self, key) -> None:
"""
"""
def get_app_data(self, key: str) -> None:
"""Get the value corresponding to key from the app data bag."""
return self._app_data_bag.get(key)
def get_all_app_data(self) -> None:
"""
"""
"""Return all the app data from the relation."""
return self._app_data_bag

View File

@ -12,16 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base classes for defining a charm using the Operator framework.
"""
"""Base classes for defining a charm using the Operator framework."""
import json
import logging
from collections.abc import Callable
from typing import Tuple
from typing import Callable, List, Tuple
import ops.charm
import ops.framework
import charms.nginx_ingress_integrator.v0.ingress as ingress
import charms.sunbeam_mysql_k8s.v0.mysql as mysql
@ -33,7 +31,7 @@ logger = logging.getLogger(__name__)
class RelationHandler(ops.charm.Object):
"""Base handler class for relations
"""Base handler class for relations.
A relation handler is used to manage a charms interaction with a relation
interface. This includes:
@ -46,8 +44,13 @@ class RelationHandler(ops.charm.Object):
recieved and sent on an interface.
"""
def __init__(self, charm: ops.charm.CharmBase,
relation_name: str, callback_f: Callable):
def __init__(
self,
charm: ops.charm.CharmBase,
relation_name: str,
callback_f: Callable,
) -> None:
"""Run constructor."""
super().__init__(charm, None)
self.charm = charm
self.relation_name = relation_name
@ -63,21 +66,25 @@ class RelationHandler(ops.charm.Object):
raise NotImplementedError
def get_interface(self) -> Tuple[ops.charm.Object, str]:
"""Returns the interface that this handler encapsulates.
"""Return the interface that this handler encapsulates.
This is a combination of the interface object and the
name of the relation its wired into.
"""
return self.interface, self.relation_name
def interface_properties(self):
def interface_properties(self) -> dict:
"""Extract properties of the interface."""
property_names = [
p for p in dir(self.interface) if isinstance(
getattr(type(self.interface), p, None), property)]
p
for p in dir(self.interface)
if isinstance(getattr(type(self.interface), p, None), property)
]
properties = {
p: getattr(self.interface, p)
for p in property_names
if not p.startswith('_') and p not in ['model']}
if not p.startswith("_") and p not in ["model"]
}
return properties
@property
@ -91,23 +98,25 @@ class RelationHandler(ops.charm.Object):
class IngressHandler(RelationHandler):
"""Handler for Ingress relations"""
"""Handler for Ingress relations."""
def __init__(self, charm: ops.charm.CharmBase,
relation_name: str,
service_name: str,
default_public_ingress_port: int,
callback_f: Callable):
def __init__(
self,
charm: ops.charm.CharmBase,
relation_name: str,
service_name: str,
default_public_ingress_port: int,
callback_f: Callable,
) -> None:
"""Run constructor."""
self.default_public_ingress_port = default_public_ingress_port
self.service_name = service_name
super().__init__(charm, relation_name, callback_f)
def setup_event_handler(self) -> ops.charm.Object:
"""Configure event handlers for an Ingress relation."""
logger.debug('Setting up ingress event handler')
interface = ingress.IngressRequires(
self.charm,
self.ingress_config)
logger.debug("Setting up ingress event handler")
interface = ingress.IngressRequires(self.charm, self.ingress_config)
return interface
@property
@ -116,75 +125,78 @@ class IngressHandler(RelationHandler):
# Most charms probably won't (or shouldn't) expose service-port
# but use it if its there.
port = self.model.config.get(
'service-port',
self.default_public_ingress_port)
"service-port", self.default_public_ingress_port
)
svc_hostname = self.model.config.get(
'os-public-hostname',
self.service_name)
"os-public-hostname", self.service_name
)
return {
'service-hostname': svc_hostname,
'service-name': self.charm.app.name,
'service-port': port}
"service-hostname": svc_hostname,
"service-name": self.charm.app.name,
"service-port": port,
}
@property
def ready(self) -> bool:
"""Whether the handler is ready for use."""
# Nothing to wait for
return True
def context(self):
def context(self) -> dict:
"""Context containing ingress data."""
return {}
class DBHandler(RelationHandler):
"""Handler for DB relations"""
"""Handler for DB relations."""
def __init__(
self,
charm: ops.charm.CharmBase,
relation_name: str,
callback_f,
databases=None
):
callback_f: Callable,
databases: List[str] = None,
) -> None:
"""Run constructor."""
self.databases = databases
super().__init__(charm, relation_name, callback_f)
def setup_event_handler(self) -> ops.charm.Object:
"""Configure event handlers for a MySQL relation."""
logger.debug('Setting up DB event handler')
logger.debug("Setting up DB event handler")
db = mysql.MySQLConsumer(
self.charm,
self.relation_name,
databases=self.databases)
_rname = self.relation_name.replace('-', '_')
self.charm, self.relation_name, databases=self.databases
)
_rname = self.relation_name.replace("-", "_")
db_relation_event = getattr(
self.charm.on,
f'{_rname}_relation_changed')
self.framework.observe(db_relation_event,
self._on_database_changed)
self.charm.on, f"{_rname}_relation_changed"
)
self.framework.observe(db_relation_event, self._on_database_changed)
return db
def _on_database_changed(self, event) -> None:
"""Handles database change events."""
def _on_database_changed(self, event: ops.framework.EventBase) -> None:
"""Handle database change events."""
databases = self.interface.databases()
logger.info(f'Received databases: {databases}')
logger.info(f"Received databases: {databases}")
if not databases:
return
credentials = self.interface.credentials()
# XXX Lets not log the credentials
logger.info(f'Received credentials: {credentials}')
logger.info(f"Received credentials: {credentials}")
self.callback_f(event)
@property
def ready(self) -> bool:
"""Handler ready for use."""
"""Whether the handler is ready for use."""
try:
# Nothing to wait for
return bool(self.interface.databases())
except AttributeError:
return False
def context(self):
def context(self) -> dict:
"""Context containing database connection data."""
try:
databases = self.interface.databases()
except AttributeError:
@ -192,15 +204,17 @@ class DBHandler(RelationHandler):
if not databases:
return {}
ctxt = {
'database': self.interface.databases()[0],
'database_host': self.interface.credentials().get('address'),
'database_password': self.interface.credentials().get('password'),
'database_user': self.interface.credentials().get('username'),
'database_type': 'mysql+pymysql'}
"database": self.interface.databases()[0],
"database_host": self.interface.credentials().get("address"),
"database_password": self.interface.credentials().get("password"),
"database_user": self.interface.credentials().get("username"),
"database_type": "mysql+pymysql",
}
return ctxt
class AMQPHandler(RelationHandler):
"""Handler for managing a amqp relation."""
DEFAULT_PORT = "5672"
@ -208,15 +222,16 @@ class AMQPHandler(RelationHandler):
self,
charm: ops.charm.CharmBase,
relation_name: str,
callback_f,
callback_f: Callable,
username: str,
vhost: int,
):
) -> None:
"""Run constructor."""
self.username = username
self.vhost = vhost
super().__init__(charm, relation_name, callback_f)
def setup_event_handler(self):
def setup_event_handler(self) -> ops.charm.Object:
"""Configure event handlers for an AMQP relation."""
logger.debug("Setting up AMQP event handler")
amqp = sunbeam_amqp.AMQPRequires(
@ -225,21 +240,22 @@ class AMQPHandler(RelationHandler):
self.framework.observe(amqp.on.ready, self._on_amqp_ready)
return amqp
def _on_amqp_ready(self, event) -> None:
"""Handles AMQP change events."""
def _on_amqp_ready(self, event: ops.framework.EventBase) -> None:
"""Handle AMQP change events."""
# Ready is only emitted when the interface considers
# that the relation is complete (indicated by a password)
self.callback_f(event)
@property
def ready(self) -> bool:
"""Handler ready for use."""
"""Whether handler is ready for use."""
try:
return bool(self.interface.password)
except AttributeError:
return False
def context(self):
def context(self) -> dict:
"""Context containing AMQP connection data."""
try:
hosts = self.interface.hostnames
except AttributeError:
@ -247,60 +263,65 @@ class AMQPHandler(RelationHandler):
if not hosts:
return {}
ctxt = super().context()
ctxt['hostnames'] = list(set(ctxt['hostnames']))
ctxt['hosts'] = ','.join(ctxt['hostnames'])
ctxt['port'] = ctxt.get('ssl_port') or self.DEFAULT_PORT
transport_url_hosts = ','.join([
"{}:{}@{}:{}".format(self.username,
ctxt['password'],
host_, # TODO deal with IPv6
ctxt['port'])
for host_ in ctxt['hostnames']
])
ctxt["hostnames"] = list(set(ctxt["hostnames"]))
ctxt["hosts"] = ",".join(ctxt["hostnames"])
ctxt["port"] = ctxt.get("ssl_port") or self.DEFAULT_PORT
transport_url_hosts = ",".join(
[
"{}:{}@{}:{}".format(
self.username,
ctxt["password"],
host_, # TODO deal with IPv6
ctxt["port"],
)
for host_ in ctxt["hostnames"]
]
)
transport_url = "rabbit://{}/{}".format(
transport_url_hosts,
self.vhost)
ctxt['transport_url'] = transport_url
transport_url_hosts, self.vhost
)
ctxt["transport_url"] = transport_url
return ctxt
class IdentityServiceRequiresHandler(RelationHandler):
"""Handler for managing a identity-service relation."""
def __init__(
self,
charm: ops.charm.CharmBase,
relation_name: str,
callback_f,
callback_f: Callable,
service_endpoints: dict,
region: str,
):
) -> None:
"""Ron constructor."""
self.service_endpoints = service_endpoints
self.region = region
super().__init__(charm, relation_name, callback_f)
def setup_event_handler(self):
def setup_event_handler(self) -> ops.charm.Object:
"""Configure event handlers for an Identity service relation."""
logger.debug("Setting up Identity Service event handler")
id_svc = sunbeam_id_svc.IdentityServiceRequires(
self.charm,
self.relation_name,
self.service_endpoints,
self.region
self.charm, self.relation_name, self.service_endpoints, self.region
)
self.framework.observe(
id_svc.on.ready,
self._on_identity_service_ready)
id_svc.on.ready, self._on_identity_service_ready
)
return id_svc
def _on_identity_service_ready(self, event) -> None:
"""Handles AMQP change events."""
def _on_identity_service_ready(
self, event: ops.framework.EventBase
) -> None:
"""Handle AMQP change events."""
# Ready is only emitted when the interface considers
# that the relation is complete (indicated by a password)
self.callback_f(event)
@property
def ready(self) -> bool:
"""Handler ready for use."""
"""Whether handler is ready for use."""
try:
return bool(self.interface.service_password)
except AttributeError:
@ -308,10 +329,11 @@ class IdentityServiceRequiresHandler(RelationHandler):
class BasePeerHandler(RelationHandler):
"""Base handler for managing a peers relation."""
LEADER_READY_KEY = 'leader_ready'
LEADER_READY_KEY = "leader_ready"
def setup_event_handler(self):
def setup_event_handler(self) -> None:
"""Configure event handlers for peer relation."""
logger.debug("Setting up peer event handler")
peer_int = sunbeam_interfaces.OperatorPeers(
@ -319,39 +341,50 @@ class BasePeerHandler(RelationHandler):
self.relation_name,
)
self.framework.observe(
peer_int.on.peers_data_changed,
self._on_peers_data_changed)
peer_int.on.peers_data_changed, self._on_peers_data_changed
)
return peer_int
def _on_peers_data_changed(self, event) -> None:
def _on_peers_data_changed(self, event: ops.framework.EventBase) -> None:
"""Process peer data changed event."""
self.callback_f(event)
@property
def ready(self) -> bool:
"""Whether the handler is complete."""
return True
def context(self):
def context(self) -> dict:
"""Return all app data set on the peer relation."""
try:
return self.interface.get_all_app_data()
except AttributeError:
return {}
def set_app_data(self, settings):
def set_app_data(self, settings: dict) -> None:
"""Store data in peer app db."""
self.interface.set_app_data(settings)
def get_app_data(self, key):
def get_app_data(self, key: str) -> str:
"""Retrieve data from the peer relation."""
return self.interface.get_app_data(key)
def leader_set(self, settings=None, **kwargs):
"""Juju leader set value(s)"""
def leader_get(self, key: str) -> str:
"""Retrieve data from the peer relation."""
return self.peers.get_app_data(key)
def leader_set(self, settings: dict, **kwargs) -> None:
"""Store data in peer app db."""
settings = settings or {}
settings.update(kwargs)
self.set_app_data(settings)
def set_leader_ready(self):
def set_leader_ready(self) -> None:
"""Tell peers the leader is ready."""
self.set_app_data({self.LEADER_READY_KEY: json.dumps(True)})
def is_leader_ready(self):
def is_leader_ready(self) -> bool:
"""Whether the leader has announced it is ready."""
ready = self.get_app_data(self.LEADER_READY_KEY)
if ready is None:
return False

View File

@ -12,22 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for rendering templates inside containers."""
import logging
import os
from typing import List, TYPE_CHECKING
from collections import defaultdict
if TYPE_CHECKING:
import advanced_sunbeam_openstack.core as sunbeam_core
import ops.model
from charmhelpers.contrib.openstack.templating import (
OSConfigException,
OSConfigRenderer,
get_loader)
from charmhelpers.contrib.openstack.templating import get_loader
import jinja2
# from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
log = logging.getLogger(__name__)
def get_container(containers, name):
def get_container(
containers: List['ops.model.Container'], name: str
) -> 'ops.model.Container':
"""Search for container with given name inlist of containers."""
container = None
for c in containers:
if c.name == name:
@ -35,113 +39,31 @@ def get_container(containers, name):
return container
def sidecar_config_render(containers, container_configs, template_dir,
openstack_release, adapters):
def sidecar_config_render(
containers: List['ops.model.Container'],
container_configs: List['sunbeam_core.ContainerConfigFile'],
template_dir: str,
openstack_release: str,
context: 'sunbeam_core.OPSCharmContexts',
) -> None:
"""Render templates inside containers."""
loader = get_loader(template_dir, openstack_release)
_tmpl_env = jinja2.Environment(loader=loader)
for config in container_configs:
for container_name in config.container_names:
try:
template = _tmpl_env.get_template(
os.path.basename(config.path) + '.j2')
os.path.basename(config.path) + ".j2"
)
except jinja2.exceptions.TemplateNotFound:
template = _tmpl_env.get_template(
os.path.basename(config.path))
os.path.basename(config.path)
)
container = get_container(containers, container_name)
contents = template.render(adapters)
kwargs = {
'user': config.user,
'group': config.group}
contents = template.render(context)
kwargs = {"user": config.user, "group": config.group}
container.push(config.path, contents, **kwargs)
log.debug(f'Wrote template {config.path} in container '
f'{container.name}.')
class SidecarConfigRenderer(OSConfigRenderer):
"""
This class provides a common templating system to be used by OpenStack
sidecar charms.
"""
def __init__(self, templates_dir, openstack_release):
super(SidecarConfigRenderer, self).__init__(templates_dir,
openstack_release)
class _SidecarConfigRenderer(OSConfigRenderer):
"""
This class provides a common templating system to be used by OpenStack
sidecar charms.
"""
def __init__(self, templates_dir, openstack_release):
super(SidecarConfigRenderer, self).__init__(templates_dir,
openstack_release)
self.config_to_containers = defaultdict(set)
self.owner_info = defaultdict(set)
def _get_template(self, template):
"""
"""
self._get_tmpl_env()
if not template.endswith('.j2'):
template += '.j2'
template = self._tmpl_env.get_template(template)
log.debug(f'Loaded template from {template.filename}')
return template
def register(self, config_file, contexts, config_template=None,
containers=None, user=None, group=None):
"""
"""
# NOTE(wolsen): Intentionally overriding base class to raise an error
# if this is accidentally used instead.
if containers is None:
raise ValueError('One or more containers must be provided')
super().register(config_file, contexts, config_template)
# Register user/group info. There's a better way to do this for sure
if user or group:
self.owner_info[config_file] = (user, group)
for container in containers:
self.config_to_containers[config_file].add(container)
log.debug(f'Registered config file "{config_file}" for container '
f'{container}')
def write(self, config_file, container):
"""
"""
containers = self.config_to_containers.get(config_file)
if not containers or container.name not in containers:
log.error(f'Config file {config_file} not registered for '
f'container {container.name}')
raise OSConfigException
contents = self.render(config_file)
owner_info = self.owner_info.get(config_file)
kwargs = {}
log.debug(f'Got owner_info of {owner_info}')
if owner_info:
user, group = owner_info
kwargs['user'] = user
kwargs['group'] = group
container.push(config_file, contents, **kwargs)
log.debug(f'Wrote template {config_file} in container '
f'{container.name}.')
def write_all(self, container=None):
for config_file, containers in self.config_to_containers.items():
if container:
if container.name not in containers:
continue
self.write(config_file, container)
else:
for c in containers:
self.write(config_file, c)
log.debug(
f"Wrote template {config.path} in container "
f"{container.name}."
)

View File

@ -14,20 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing shared code to be used in a charms units tests."""
import inspect
import io
import json
import ops
import os
import pathlib
import sys
import typing
import unittest
import yaml
from mock import patch
from mock import Mock, patch
sys.path.append('lib') # noqa
sys.path.append('src') # noqa
sys.path.append("lib") # noqa
sys.path.append("src") # noqa
from ops import framework, model
@ -35,206 +38,233 @@ from ops.testing import Harness, _TestingModelBackend, _TestingPebbleClient
class CharmTestCase(unittest.TestCase):
"""Class to make mocking easier."""
def setUp(self, obj, patches):
def setUp(self, obj: 'typing.ANY', patches: 'typing.List') -> None:
"""Run constructor."""
super().setUp()
self.patches = patches
self.obj = obj
self.patch_all()
def patch(self, method):
def patch(self, method: 'typing.ANY') -> Mock:
"""Patch the named method on self.obj."""
_m = patch.object(self.obj, method)
mock = _m.start()
self.addCleanup(_m.stop)
return mock
def patch_obj(self, obj, method):
def patch_obj(self, obj: 'typing.ANY', method: 'typing.ANY') -> Mock:
"""Patch the named method on obj."""
_m = patch.object(obj, method)
mock = _m.start()
self.addCleanup(_m.stop)
return mock
def patch_all(self):
def patch_all(self) -> None:
"""Patch all objects in self.patches."""
for method in self.patches:
setattr(self, method, self.patch(method))
def add_base_amqp_relation(harness):
rel_id = harness.add_relation('amqp', 'rabbitmq')
harness.add_relation_unit(
rel_id,
'rabbitmq/0')
harness.add_relation_unit(
rel_id,
'rabbitmq/0')
def add_base_amqp_relation(harness: Harness) -> str:
"""Add amqp relation."""
rel_id = harness.add_relation("amqp", "rabbitmq")
harness.add_relation_unit(rel_id, "rabbitmq/0")
harness.add_relation_unit(rel_id, "rabbitmq/0")
harness.update_relation_data(
rel_id,
'rabbitmq/0',
{'ingress-address': '10.0.0.13'})
rel_id, "rabbitmq/0", {"ingress-address": "10.0.0.13"}
)
return rel_id
def add_amqp_relation_credentials(harness, rel_id):
def add_amqp_relation_credentials(
harness: Harness, rel_id: str
) -> None:
"""Add amqp data to amqp relation."""
harness.update_relation_data(
rel_id,
'rabbitmq',
{
'hostname': 'rabbithost1.local',
'password': 'rabbit.pass'})
"rabbitmq",
{"hostname": "rabbithost1.local", "password": "rabbit.pass"},
)
def add_base_identity_service_relation(harness):
rel_id = harness.add_relation('identity-service', 'keystone')
harness.add_relation_unit(
rel_id,
'keystone/0')
harness.add_relation_unit(
rel_id,
'keystone/0')
def add_base_identity_service_relation(harness: Harness) -> str:
"""Add identity-service relation."""
rel_id = harness.add_relation("identity-service", "keystone")
harness.add_relation_unit(rel_id, "keystone/0")
harness.add_relation_unit(rel_id, "keystone/0")
harness.update_relation_data(
rel_id,
'keystone/0',
{'ingress-address': '10.0.0.33'})
rel_id, "keystone/0", {"ingress-address": "10.0.0.33"}
)
return rel_id
def add_identity_service_relation_response(harness, rel_id):
def add_identity_service_relation_response(
harness: Harness, rel_id: str
) -> None:
"""Add id service data to identity-service relation."""
harness.update_relation_data(
rel_id,
'keystone',
"keystone",
{
'admin-domain-id': 'admindomid1',
'admin-project-id': 'adminprojid1',
'admin-user-id': 'adminuserid1',
'api-version': '3',
'auth-host': 'keystone.local',
'auth-port': '12345',
'auth-protocol': 'http',
'internal-host': 'keystone.internal',
'internal-port': '5000',
'internal-protocol': 'http',
'service-domain': 'servicedom',
'service-domain_id': 'svcdomid1',
'service-host': 'keystone.service',
'service-password': 'svcpass1',
'service-port': '5000',
'service-protocol': 'http',
'service-project': 'svcproj1',
'service-project-id': 'svcprojid1',
'service-username': 'svcuser1'})
"admin-domain-id": "admindomid1",
"admin-project-id": "adminprojid1",
"admin-user-id": "adminuserid1",
"api-version": "3",
"auth-host": "keystone.local",
"auth-port": "12345",
"auth-protocol": "http",
"internal-host": "keystone.internal",
"internal-port": "5000",
"internal-protocol": "http",
"service-domain": "servicedom",
"service-domain_id": "svcdomid1",
"service-host": "keystone.service",
"service-password": "svcpass1",
"service-port": "5000",
"service-protocol": "http",
"service-project": "svcproj1",
"service-project-id": "svcprojid1",
"service-username": "svcuser1",
},
)
def add_base_db_relation(harness):
rel_id = harness.add_relation('shared-db', 'mysql')
harness.add_relation_unit(
rel_id,
'mysql/0')
harness.add_relation_unit(
rel_id,
'mysql/0')
def add_base_db_relation(harness: Harness) -> str:
"""Add db relation."""
rel_id = harness.add_relation("shared-db", "mysql")
harness.add_relation_unit(rel_id, "mysql/0")
harness.add_relation_unit(rel_id, "mysql/0")
harness.update_relation_data(
rel_id,
'mysql/0',
{'ingress-address': '10.0.0.3'})
rel_id, "mysql/0", {"ingress-address": "10.0.0.3"}
)
return rel_id
def add_db_relation_credentials(harness, rel_id):
def add_db_relation_credentials(
harness: Harness, rel_id: str
) -> None:
"""Add db credentials data to db relation."""
harness.update_relation_data(
rel_id,
'mysql',
"mysql",
{
'databases': json.dumps(['db1']),
'data': json.dumps({
'credentials': {
'username': 'foo',
'password': 'hardpassword',
'address': '10.0.0.10'}})})
"databases": json.dumps(["db1"]),
"data": json.dumps(
{
"credentials": {
"username": "foo",
"password": "hardpassword",
"address": "10.0.0.10",
}
}
),
},
)
def add_api_relations(harness):
add_db_relation_credentials(
harness,
add_base_db_relation(harness))
add_amqp_relation_credentials(
harness,
add_base_amqp_relation(harness))
add_identity_service_relation_response(
harness,
add_base_identity_service_relation(harness))
def add_api_relations(harness: Harness) -> None:
"""Add standard relation to api charm."""
add_db_relation_credentials(harness, add_base_db_relation(harness))
add_amqp_relation_credentials(harness, add_base_amqp_relation(harness))
add_identity_service_relation_response(
harness, add_base_identity_service_relation(harness)
)
def get_harness(charm_class, charm_metadata=None, container_calls=None,
charm_config=None):
def get_harness(
charm_class: ops.charm.CharmBase,
charm_metadata: dict = None,
container_calls: dict = None,
charm_config: dict = None,
) -> Harness:
"""Return a testing harness."""
class _OSTestingPebbleClient(_TestingPebbleClient):
def push(
self, path, source, *,
encoding='utf-8', make_dirs=False, permissions=None,
user_id=None, user=None, group_id=None, group=None):
container_calls['push'][path] = {
'source': source,
'permissions': permissions,
'user': user,
'group': group}
self,
path: str,
source: typing.Union[bytes, str, typing.BinaryIO, typing.TextIO],
*,
encoding: str = "utf-8",
make_dirs: bool = False,
permissions: int = None,
user_id: int = None,
user: str = None,
group_id: int = None,
group: str = None,
) -> None:
"""Capture push events and store in container_calls."""
container_calls["push"][path] = {
"source": source,
"permissions": permissions,
"user": user,
"group": group,
}
def pull(self, path, *, encoding='utf-8'):
container_calls['pull'].append(path)
def pull(self, path: str, *, encoding: str = "utf-8") -> None:
"""Capture pull events and store in container_calls."""
container_calls["pull"].append(path)
reader = io.StringIO("0")
return reader
def remove_path(self, path, *, recursive=False):
container_calls['remove_path'].append(path)
def remove_path(self, path: str, *, recursive: bool = False) -> None:
"""Capture remove events and store in container_calls."""
container_calls["remove_path"].append(path)
class _OSTestingModelBackend(_TestingModelBackend):
def get_pebble(self, socket_path: str):
def get_pebble(self, socket_path: str) -> _OSTestingPebbleClient:
"""Get the testing pebble client."""
client = self._pebble_clients.get(socket_path, None)
if client is None:
client = _OSTestingPebbleClient(self)
self._pebble_clients[socket_path] = client
return client
def network_get(self, endpoint_name, relation_id=None):
def network_get(
self, endpoint_name: str, relation_id: str = None
) -> dict:
"""Return a fake set of network data."""
network_data = {
'bind-addresses': [{
'interface-name': 'eth0',
'addresses': [{
'cidr': '10.0.0.0/24',
'value': '10.0.0.10'}]}],
'ingress-addresses': ['10.0.0.10'],
'egress-subnets': ['10.0.0.0/24']}
"bind-addresses": [
{
"interface-name": "eth0",
"addresses": [
{"cidr": "10.0.0.0/24", "value": "10.0.0.10"}
],
}
],
"ingress-addresses": ["10.0.0.10"],
"egress-subnets": ["10.0.0.0/24"],
}
return network_data
filename = inspect.getfile(charm_class)
charm_dir = pathlib.Path(filename).parents[1]
if not charm_metadata:
metadata_file = f'{charm_dir}/metadata.yaml'
metadata_file = f"{charm_dir}/metadata.yaml"
if os.path.isfile(metadata_file):
with open(metadata_file) as f:
charm_metadata = f.read()
if not charm_config:
config_file = f'{charm_dir}/config.yaml'
config_file = f"{charm_dir}/config.yaml"
if os.path.isfile(config_file):
with open(config_file) as f:
charm_config = yaml.safe_load(f.read())['options']
charm_config = yaml.safe_load(f.read())["options"]
harness = Harness(
charm_class,
meta=charm_metadata,
)
harness._backend = _OSTestingModelBackend(
harness._unit_name, harness._meta)
harness._model = model.Model(
harness._meta,
harness._backend)
harness._unit_name, harness._meta
)
harness._model = model.Model(harness._meta, harness._backend)
harness._framework = framework.Framework(
":memory:",
harness._charm_dir,
harness._meta,
harness._model)
":memory:", harness._charm_dir, harness._meta, harness._model
)
if charm_config:
harness.update_config(charm_config)
return harness

View File

@ -1,11 +1,10 @@
# This file is managed centrally. If you find the need to modify this as a
# one-off, please don't. Intead, consult #openstack-charms and ask about
# requirements management in charms via bot-control. Thank you.
flake8-annotations
flake8-docstrings
charm-tools>=2.4.4
coverage>=3.6
mock>=1.2
flake8>=2.2.4,<=2.4.1
pyflakes==2.1.1
flake8
pyflakes
stestr>=2.2.0
requests>=2.18.4
psutil

View File

@ -1,4 +1,4 @@
# Operator charm (with zaza): tox.ini
# Operator charm helper: tox.ini
[tox]
envlist = pep8,fetch,py3
@ -42,16 +42,6 @@ deps =
commands =
./fetch-libs.sh
[testenv:py35]
basepython = python3.5
# python3.5 is irrelevant on a focal+ charm.
commands = /bin/true
[testenv:py36]
basepython = python3.6
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:py37]
basepython = python3.7
deps = -r{toxinidir}/requirements.txt
@ -73,23 +63,6 @@ deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = flake8 {posargs} src unit_tests tests advanced_sunbeam_openstack
[testenv:cover]
# Technique based heavily upon
# https://github.com/openstack/nova/blob/master/tox.ini
basepython = python3
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
setenv =
{[testenv]setenv}
PYTHON=coverage run
commands =
coverage erase
stestr run --slowest {posargs}
coverage combine
coverage html -d cover
coverage xml -o cover/coverage.xml
coverage report
[coverage:run]
branch = True
concurrency = multiprocessing
@ -105,37 +78,6 @@ omit =
basepython = python3
commands = {posargs}
[testenv:build]
basepython = python3
deps = -r{toxinidir}/build-requirements.txt
commands =
charmcraft build
[testenv:func-noop]
basepython = python3
commands =
functest-run-suite --help
[testenv:func]
basepython = python3
commands =
functest-run-suite --keep-model
[testenv:func-smoke]
basepython = python3
commands =
functest-run-suite --keep-model --smoke
[testenv:func-dev]
basepython = python3
commands =
functest-run-suite --keep-model --dev
[testenv:func-target]
basepython = python3
commands =
functest-run-suite --keep-model --bundle {posargs}
[flake8]
# Ignore E902 because the unit_tests directory is missing in the built charm.
ignore = E402,E226,E902
ignore = E402,E226,E902,ANN101,ANN003

View File

@ -0,0 +1,15 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for aso."""

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,21 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test charms for unit tests."""
import os
import tempfile
import sys
sys.path.append('lib') # noqa
sys.path.append('src') # noqa
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import ops.framework
sys.path.append("lib") # noqa
sys.path.append("src") # noqa
import advanced_sunbeam_openstack.charm as sunbeam_charm
CHARM_CONFIG = {
'region': 'RegionOne',
'debug': 'true'}
CHARM_CONFIG = {"region": "RegionOne", "debug": "true"}
CHARM_METADATA = '''
CHARM_METADATA = """
name: my-service
version: 3
bases:
@ -57,9 +61,9 @@ storage:
resources:
mysvc-image:
type: oci-image
'''
"""
API_CHARM_METADATA = '''
API_CHARM_METADATA = """
name: my-service
version: 3
bases:
@ -103,48 +107,60 @@ storage:
resources:
mysvc-image:
type: oci-image
'''
"""
class MyCharm(sunbeam_charm.OSBaseOperatorCharm):
"""Test charm for testing OSBaseOperatorCharm."""
openstack_release = 'diablo'
service_name = 'my-service'
openstack_release = "diablo"
service_name = "my-service"
def __init__(self, framework):
def __init__(self, framework: "ops.framework.Framework") -> None:
"""Run constructor."""
self.seen_events = []
self.render_calls = []
self._template_dir = self._setup_templates()
super().__init__(framework)
def _log_event(self, event):
def _log_event(self, event: "ops.framework.EventBase") -> None:
"""Log events."""
self.seen_events.append(type(event).__name__)
def _on_service_pebble_ready(self, event):
def _on_service_pebble_ready(
self, event: "ops.framework.EventBase"
) -> None:
"""Log pebble ready event."""
self._log_event(event)
super()._on_service_pebble_ready(event)
self._log_event(event)
def _on_config_changed(self, event):
def _on_config_changed(self, event: "ops.framework.EventBase") -> None:
"""Log config changed event."""
self._log_event(event)
super()._on_config_changed(event)
def configure_charm(self, event):
def configure_charm(self, event: "ops.framework.EventBase") -> None:
"""Log configure_charm call."""
self._log_event(event)
super().configure_charm(event)
self._log_event(event)
@property
def public_ingress_port(self):
def public_ingress_port(self) -> int:
"""Charms default port."""
return 789
def _setup_templates(self):
def _setup_templates(self) -> str:
"""Run temp templates dir setup."""
tmpdir = tempfile.mkdtemp()
_template_dir = f'{tmpdir}/templates'
_template_dir = f"{tmpdir}/templates"
os.mkdir(_template_dir)
with open(f'{_template_dir}/my-service.conf.j2', 'w') as f:
with open(f"{_template_dir}/my-service.conf.j2", "w") as f:
f.write("")
return _template_dir
@property
def template_dir(self):
def template_dir(self) -> str:
"""Temp templates dir."""
return self._template_dir
@ -160,41 +176,53 @@ TEMPLATE_CONTENTS = """
class MyAPICharm(sunbeam_charm.OSBaseOperatorAPICharm):
openstack_release = 'diablo'
service_name = 'my-service'
wsgi_admin_script = '/bin/wsgi_admin'
wsgi_public_script = '/bin/wsgi_public'
"""Test charm for testing OSBaseOperatorAPICharm."""
def __init__(self, framework):
openstack_release = "diablo"
service_name = "my-service"
wsgi_admin_script = "/bin/wsgi_admin"
wsgi_public_script = "/bin/wsgi_public"
def __init__(self, framework: "ops.framework.Framework") -> None:
"""Run constructor."""
self.seen_events = []
self.render_calls = []
self._template_dir = self._setup_templates()
super().__init__(framework)
def _setup_templates(self):
def _setup_templates(self) -> str:
"""Run temp templates dir setup."""
tmpdir = tempfile.mkdtemp()
_template_dir = f'{tmpdir}/templates'
_template_dir = f"{tmpdir}/templates"
os.mkdir(_template_dir)
with open(f'{_template_dir}/my-service.conf.j2', 'w') as f:
with open(f"{_template_dir}/my-service.conf.j2", "w") as f:
f.write(TEMPLATE_CONTENTS)
with open(f'{_template_dir}/wsgi-my-service.conf.j2', 'w') as f:
with open(f"{_template_dir}/wsgi-my-service.conf.j2", "w") as f:
f.write(TEMPLATE_CONTENTS)
return _template_dir
def _log_event(self, event):
def _log_event(self, event: "ops.framework.EventBase") -> None:
"""Log events."""
self.seen_events.append(type(event).__name__)
def _on_service_pebble_ready(self, event):
def _on_service_pebble_ready(
self, event: "ops.framework.EventBase"
) -> None:
"""Log pebble ready event."""
self._log_event(event)
super()._on_service_pebble_ready(event)
self._log_event(event)
def _on_config_changed(self, event):
def _on_config_changed(self, event: "ops.framework.EventBase") -> None:
"""Log config changed event."""
self._log_event(event)
super()._on_config_changed(event)
@property
def default_public_ingress_port(self):
def default_public_ingress_port(self) -> int:
"""Charms default port."""
return 789
@property
def template_dir(self):
def template_dir(self) -> str:
"""Templates dir."""
return self._template_dir

View File

@ -1,6 +1,4 @@
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test aso."""
import json
from mock import patch
import sys
sys.path.append('lib') # noqa
@ -28,11 +26,13 @@ from . import test_charms
class TestOSBaseOperatorCharm(test_utils.CharmTestCase):
"""Test for the OSBaseOperatorCharm class."""
PATCHES = [
]
def setUp(self):
def setUp(self) -> None:
"""Charm test class setup."""
self.container_calls = {
'push': {},
'pull': [],
@ -46,43 +46,45 @@ class TestOSBaseOperatorCharm(test_utils.CharmTestCase):
self.harness.begin()
self.addCleanup(self.harness.cleanup)
def set_pebble_ready(self):
def set_pebble_ready(self) -> None:
"""Set pebble ready event."""
container = self.harness.model.unit.get_container("my-service")
# Emit the PebbleReadyEvent
self.harness.charm.on.my_service_pebble_ready.emit(container)
def test_pebble_ready_handler(self):
def test_pebble_ready_handler(self) -> None:
"""Test is raised and observed."""
self.assertEqual(self.harness.charm.seen_events, [])
self.set_pebble_ready()
self.assertEqual(self.harness.charm.seen_events, ['PebbleReadyEvent'])
def test_write_config(self):
def test_write_config(self) -> None:
"""Test writing config when charm is ready."""
self.set_pebble_ready()
self.assertEqual(
self.container_calls['push'],
{})
def test_handler_prefix(self):
self.assertEqual(
self.harness.charm.handler_prefix,
'my_service')
def test_container_names(self):
def test_container_names(self) -> None:
"""Test container name list is correct."""
self.assertEqual(
self.harness.charm.container_names,
['my-service'])
def test_relation_handlers_ready(self):
def test_relation_handlers_ready(self) -> None:
"""Test relation handlers are ready."""
self.assertTrue(
self.harness.charm.relation_handlers_ready())
class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
"""Test for the OSBaseOperatorAPICharm class."""
PATCHES = [
]
def setUp(self):
def setUp(self) -> None:
"""Charm test class setup."""
self.container_calls = {
'push': {},
'pull': [],
@ -98,10 +100,12 @@ class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
self.harness.update_config(test_charms.CHARM_CONFIG)
self.harness.begin()
def set_pebble_ready(self):
def set_pebble_ready(self) -> None:
"""Set pebble ready event."""
self.harness.container_pebble_ready('my-service')
def test_write_config(self):
def test_write_config(self) -> None:
"""Test when charm is ready configs are written correctly."""
self.harness.set_leader()
rel_id = self.harness.add_relation('peers', 'my-service')
self.harness.add_relation_unit(
@ -135,8 +139,8 @@ class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
'source': expect_string,
'user': 'root'})
@patch('advanced_sunbeam_openstack.templating.sidecar_config_render')
def test__on_database_changed(self, _renderer):
def test__on_database_changed(self) -> None:
"""Test database is requested."""
rel_id = self.harness.add_relation('peers', 'my-service')
self.harness.add_relation_unit(
rel_id,
@ -149,10 +153,10 @@ class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
db_rel_id,
'my-service')
requested_db = json.loads(rel_data['databases'])[0]
# self.assertRegex(requested_db, r'^db_.*my_service$')
self.assertEqual(requested_db, 'my_service')
def test_contexts(self):
def test_contexts(self) -> None:
"""Test contexts are correctly populated."""
rel_id = self.harness.add_relation('peers', 'my-service')
self.harness.add_relation_unit(
rel_id,
@ -172,7 +176,8 @@ class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
contexts.options.debug,
'true')
def test_peer_leader_db(self):
def test_peer_leader_db(self) -> None:
"""Test interacting with peer app db."""
rel_id = self.harness.add_relation('peers', 'my-service')
self.harness.add_relation_unit(
rel_id,
@ -195,7 +200,8 @@ class TestOSBaseOperatorAPICharm(test_utils.CharmTestCase):
self.harness.charm.leader_get('ginger'),
'biscuit')
def test_peer_leader_ready(self):
def test_peer_leader_ready(self) -> None:
"""Test peer leader ready methods."""
rel_id = self.harness.add_relation('peers', 'my-service')
self.harness.add_relation_unit(
rel_id,