Remove charmhelpers dependancy and add linters

Change-Id: Iaf0e8f305825c51f444c397d8bc80c0e2958548e
This commit is contained in:
Liam Young 2022-11-03 14:15:55 +00:00
parent facf398bd7
commit b8aa8879f9
15 changed files with 978 additions and 709 deletions

View File

@ -0,0 +1,39 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
# Testing tools configuration
[tool.coverage.run]
branch = true
[tool.coverage.report]
show_missing = true
[tool.pytest.ini_options]
minversion = "6.0"
log_cli_level = "INFO"
# Formatting tools configuration
[tool.black]
line-length = 79
[tool.isort]
profile = "black"
multi_line_output = 3
force_grid_wrap = true
# Linting tools configuration
[tool.flake8]
max-line-length = 79
max-doc-length = 99
max-complexity = 10
exclude = [".git", "__pycache__", ".tox", "build", "dist", "*.egg_info", "venv"]
select = ["E", "W", "F", "C", "N", "R", "D", "H"]
# Ignore W503, E501 because using black creates errors with this
# Ignore D107 Missing docstring in __init__
ignore = ["W503", "E501", "D107", "E402"]
per-file-ignores = []
docstring-convention = "google"
# Check for properly formatted copyright header in each file
copyright-check = "True"
copyright-author = "Canonical Ltd."
copyright-regexp = "Copyright\\s\\d{4}([-,]\\d{4})*\\s+%(author)s"

View File

@ -24,5 +24,3 @@ git+https://opendev.org/openstack/charm-ops-interface-tls-certificates#egg=inter
# Note: Required for cinder-k8s, cinder-ceph-k8s, glance-k8s, nova-k8s
git+https://opendev.org/openstack/charm-ops-interface-ceph-client#egg=interface_ceph_client
# Charmhelpers is only present as interface_ceph_client uses it.
git+https://github.com/juju/charm-helpers.git#egg=charmhelpers

View File

@ -0,0 +1,15 @@
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Charm source code."""

View File

@ -28,34 +28,49 @@ develop a new k8s charm using the Operator Framework:
import json
import logging
import os
import pwgen
import subprocess
import time
from typing import Callable, List, Dict, Optional
from typing import (
Callable,
Dict,
List,
Optional,
)
import charms.keystone_k8s.v0.cloud_credentials as sunbeam_cc_svc
import charms.keystone_k8s.v0.identity_service as sunbeam_id_svc
import ops.charm
import ops.pebble
import ops_sunbeam.charm as sunbeam_charm
import ops_sunbeam.config_contexts as sunbeam_contexts
import ops_sunbeam.core as sunbeam_core
import ops_sunbeam.relation_handlers as sunbeam_rhandlers
import pwgen
from ops import (
model,
)
from ops.charm import (
ActionEvent,
CharmEvents,
HookEvent,
RelationChangedEvent,
RelationEvent,
HookEvent,
ActionEvent,
)
import ops.pebble
from ops.main import main
from ops.framework import StoredState, Object, EventSource
from ops import model
from ops.framework import (
EventSource,
Object,
StoredState,
)
from ops.main import (
main,
)
from ops_sunbeam.interfaces import (
OperatorPeers,
)
from utils import manager
import ops_sunbeam.charm as sunbeam_charm
import ops_sunbeam.core as sunbeam_core
import ops_sunbeam.config_contexts as sunbeam_contexts
import ops_sunbeam.relation_handlers as sunbeam_rhandlers
from ops_sunbeam.interfaces import OperatorPeers
import charms.keystone_k8s.v0.identity_service as sunbeam_id_svc
import charms.keystone_k8s.v0.cloud_credentials as sunbeam_cc_svc
from utils import (
manager,
)
logger = logging.getLogger(__name__)
@ -64,59 +79,66 @@ LAST_FERNET_KEY_ROTATION_KEY = "last_fernet_rotation"
FERNET_KEYS_KEY = "fernet_keys"
KEYSTONE_CONF = '/etc/keystone/keystone.conf'
LOGGING_CONF = '/etc/keystone/logging.conf'
KEYSTONE_CONF = "/etc/keystone/keystone.conf"
LOGGING_CONF = "/etc/keystone/logging.conf"
class KeystoneLoggingAdapter(sunbeam_contexts.ConfigContext):
"""Config adapter to collect logging config."""
def context(self):
"""Configuration context."""
config = self.charm.model.config
ctxt = {}
if config['debug']:
ctxt['root_level'] = 'DEBUG'
log_level = config['log-level']
if log_level in ['DEBUG', 'INFO', 'WARNING', 'ERROR']:
ctxt['log_level'] = log_level
if config["debug"]:
ctxt["root_level"] = "DEBUG"
log_level = config["log-level"]
if log_level in ["DEBUG", "INFO", "WARNING", "ERROR"]:
ctxt["log_level"] = log_level
else:
logger.error('log-level must be one of the following values '
f'(DEBUG, INFO, WARNING, ERROR) not "{log_level}"')
ctxt['log_level'] = None
ctxt['log_file'] = '/var/log/keystone/keystone.log'
logger.error(
"log-level must be one of the following values "
f'(DEBUG, INFO, WARNING, ERROR) not "{log_level}"'
)
ctxt["log_level"] = None
ctxt["log_file"] = "/var/log/keystone/keystone.log"
return ctxt
class KeystoneConfigAdapter(sunbeam_contexts.ConfigContext):
"""Config adapter to collect keystone config."""
def context(self):
"""Configuration context."""
config = self.charm.model.config
return {
'api_version': 3,
'admin_role': self.charm.admin_role,
'assignment_backend': 'sql',
'service_tenant_id': self.charm.service_project_id,
'admin_domain_name': self.charm.admin_domain_name,
'admin_domain_id': self.charm.admin_domain_id,
'auth_methods': 'external,password,token,oauth1,mapped',
'default_domain_id': self.charm.default_domain_id,
'public_port': self.charm.service_port,
'debug': config['debug'],
'token_expiration': config['token-expiration'],
'allow_expired_window': config['allow-expired-window'],
'catalog_cache_expiration': config['catalog-cache-expiration'],
'dogpile_cache_expiration': config['dogpile-cache-expiration'],
'identity_backend': 'sql',
'token_provider': 'fernet',
'fernet_max_active_keys': config['fernet-max-active-keys'],
'public_endpoint': self.charm.public_endpoint,
'admin_endpoint': self.charm.admin_endpoint,
'domain_config_dir': '/etc/keystone/domains',
'log_config': '/etc/keystone/logging.conf.j2',
'paste_config_file': '/etc/keystone/keystone-paste.ini',
"api_version": 3,
"admin_role": self.charm.admin_role,
"assignment_backend": "sql",
"service_tenant_id": self.charm.service_project_id,
"admin_domain_name": self.charm.admin_domain_name,
"admin_domain_id": self.charm.admin_domain_id,
"auth_methods": "external,password,token,oauth1,mapped",
"default_domain_id": self.charm.default_domain_id,
"public_port": self.charm.service_port,
"debug": config["debug"],
"token_expiration": config["token-expiration"],
"allow_expired_window": config["allow-expired-window"],
"catalog_cache_expiration": config["catalog-cache-expiration"],
"dogpile_cache_expiration": config["dogpile-cache-expiration"],
"identity_backend": "sql",
"token_provider": "fernet",
"fernet_max_active_keys": config["fernet-max-active-keys"],
"public_endpoint": self.charm.public_endpoint,
"admin_endpoint": self.charm.admin_endpoint,
"domain_config_dir": "/etc/keystone/domains",
"log_config": "/etc/keystone/logging.conf.j2",
"paste_config_file": "/etc/keystone/keystone-paste.ini",
}
class IdentityServiceProvidesHandler(sunbeam_rhandlers.RelationHandler):
"""Handler for identity service relation."""
def __init__(
self,
@ -135,7 +157,8 @@ class IdentityServiceProvidesHandler(sunbeam_rhandlers.RelationHandler):
)
self.framework.observe(
id_svc.on.ready_identity_service_clients,
self._on_identity_service_ready)
self._on_identity_service_ready,
)
return id_svc
def _on_identity_service_ready(self, event) -> None:
@ -146,10 +169,12 @@ class IdentityServiceProvidesHandler(sunbeam_rhandlers.RelationHandler):
@property
def ready(self) -> bool:
"""Report if relation is ready."""
return True
class CloudCredentialsProvidesHandler(sunbeam_rhandlers.RelationHandler):
"""Handler for cloud credentials relation."""
def __init__(
self,
@ -168,7 +193,8 @@ class CloudCredentialsProvidesHandler(sunbeam_rhandlers.RelationHandler):
)
self.framework.observe(
id_svc.on.ready_cloud_credentials_clients,
self._on_cloud_credentials_ready)
self._on_cloud_credentials_ready,
)
return id_svc
def _on_cloud_credentials_ready(self, event) -> None:
@ -179,6 +205,7 @@ class CloudCredentialsProvidesHandler(sunbeam_rhandlers.RelationHandler):
@property
def ready(self) -> bool:
"""Check if handler is ready."""
return True
@ -198,31 +225,32 @@ class HeartbeatEvent(HookEvent):
class KeystoneEvents(CharmEvents):
"""Custom local events."""
fernet_keys_updated = EventSource(FernetKeysUpdatedEvent)
heartbeat = EventSource(HeartbeatEvent)
class KeystoneInterface(Object):
"""Define Keystone interface."""
def __init__(self, charm):
super().__init__(charm, 'keystone-peers')
"""Init KeystoneInterface class."""
super().__init__(charm, "keystone-peers")
self.charm = charm
self.framework.observe(
self.charm.on.peers_relation_changed,
self._on_peer_data_changed
self.charm.on.peers_relation_changed, self._on_peer_data_changed
)
def _on_peer_data_changed(self, event: RelationChangedEvent):
"""
Check the peer data updates for updated fernet keys.
"""Check the peer data updates for updated fernet keys.
Then we can pull the keys from the app data,
and tell the local charm to write them to disk.
"""
old_data = event.relation.data[self.charm.unit].get(
FERNET_KEYS_KEY, ''
FERNET_KEYS_KEY, ""
)
data = self.charm.peers.get_app_data(FERNET_KEYS_KEY) or ''
data = self.charm.peers.get_app_data(FERNET_KEYS_KEY) or ""
# only launch the event if the data has changed
# and there there are actually keys
@ -238,35 +266,36 @@ class KeystoneInterface(Object):
)
def distribute_fernet_keys(self, keys: Dict[str, str]):
"""
Trigger a fernet key distribution.
"""Trigger a fernet key distribution.
This is achieved by simply saving it to the app data here,
which will trigger the peer data changed event across all the units.
"""
self.charm.peers.set_app_data({
FERNET_KEYS_KEY: json.dumps(keys),
})
self.charm.peers.set_app_data(
{
FERNET_KEYS_KEY: json.dumps(keys),
}
)
class KeystonePasswordManager(Object):
"""Helper for management of keystone credential passwords."""
def __init__(self,
charm: ops.charm.CharmBase,
interface: OperatorPeers):
def __init__(self, charm: ops.charm.CharmBase, interface: OperatorPeers):
self.charm = charm
self.interface = interface
def store(self, username: str, password: str):
"""Store username and password."""
logging.debug(f"Storing password for {username}")
self.interface.set_app_data({
f"password_{username}": password,
})
self.interface.set_app_data(
{
f"password_{username}": password,
}
)
def retrieve(self, username: str) -> str:
"""Retrieve persisted password for provided username"""
"""Retrieve persisted password for provided username."""
if not self.interface:
return None
password = self.interface.get_app_data(f"password_{username}")
@ -281,10 +310,7 @@ class KeystonePasswordManager(Object):
password = self.retrieve(username)
if not password and self.charm.unit.is_leader():
password = pwgen.pwgen(12)
self.store(
username,
password
)
self.store(username, password)
return password
@ -295,52 +321,46 @@ class KeystoneOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
_state = StoredState()
_authed = False
service_name = "keystone"
wsgi_admin_script = '/usr/bin/keystone-wsgi-admin'
wsgi_public_script = '/usr/bin/keystone-wsgi-public'
wsgi_admin_script = "/usr/bin/keystone-wsgi-admin"
wsgi_public_script = "/usr/bin/keystone-wsgi-public"
service_port = 5000
mandatory_relations = {
'database',
'ingress-public'
}
mandatory_relations = {"database", "ingress-public"}
def __init__(self, framework):
super().__init__(framework)
self.keystone_manager = manager.KeystoneManager(
self,
KEYSTONE_CONTAINER)
self._state.set_default(admin_domain_name='admin_domain')
self, KEYSTONE_CONTAINER
)
self._state.set_default(admin_domain_name="admin_domain")
self._state.set_default(admin_domain_id=None)
self._state.set_default(default_domain_id=None)
self._state.set_default(service_project_id=None)
self.peer_interface = KeystoneInterface(self)
self.framework.observe(
self.on.fernet_keys_updated,
self._on_fernet_keys_updated
self.on.fernet_keys_updated, self._on_fernet_keys_updated
)
self.framework.observe(self.on.heartbeat, self._on_heartbeat)
self._launch_heartbeat()
self.framework.observe(
self.on.get_admin_password_action,
self._get_admin_password_action
self.on.get_admin_password_action, self._get_admin_password_action
)
self.framework.observe(
self.on.get_admin_account_action,
self._get_admin_account_action
self.on.get_admin_account_action, self._get_admin_account_action
)
self.password_manager = KeystonePasswordManager(self, self.peers)
self.framework.observe(
self.on.get_service_account_action,
self._get_service_account_action
self._get_service_account_action,
)
def _get_admin_password_action(self, event: ActionEvent) -> None:
if not self.unit.is_leader():
event.fail('Please run action on lead unit.')
event.fail("Please run action on lead unit.")
return
event.set_results({"password": self.admin_password})
@ -351,7 +371,7 @@ class KeystoneOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
to access the cloud using the admin account.
"""
if not self.unit.is_leader():
event.fail('Please run action on lead unit.')
event.fail("Please run action on lead unit.")
return
openrc = f"""# openrc for access to OpenStack
export OS_AUTH_URL={self.public_endpoint}
@ -363,27 +383,28 @@ export OS_PROJECT_NAME=admin
export OS_IDENTITY_API_VERSION=3
export OS_AUTH_VERSION=3
"""
event.set_results({
"username": self.admin_user,
"password": self.admin_password,
"user-domain-name": self.admin_domain_name,
"project-name": "admin",
"project-domain-name": self.admin_domain_name,
"region": self.model.config['region'],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3,
"openrc": openrc,
})
event.set_results(
{
"username": self.admin_user,
"password": self.admin_password,
"user-domain-name": self.admin_domain_name,
"project-name": "admin",
"project-domain-name": self.admin_domain_name,
"region": self.model.config["region"],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3,
"openrc": openrc,
}
)
def _launch_heartbeat(self):
"""
Launch another process that will wake up the charm every 5 minutes.
"""Launch another process that will wake up the charm every 5 minutes.
Used to auto schedule fernet key rotation.
"""
# check if already running
if subprocess.call(['pgrep', '-f', 'heartbeat']) == 0:
if subprocess.call(["pgrep", "-f", "heartbeat"]) == 0:
return
logger.debug("Launching the heartbeat")
@ -393,6 +414,7 @@ export OS_AUTH_VERSION=3
)
def _on_fernet_keys_updated(self, event: FernetKeysUpdatedEvent):
"""Respond to fernet_keys_updated event."""
if not self.bootstrapped():
event.defer()
return
@ -402,7 +424,8 @@ export OS_AUTH_VERSION=3
self.keystone_manager.write_fernet_keys(keys)
def _on_heartbeat(self, _event):
"""
"""Respond to heartbeat event.
This should be called regularly.
It will check if it's time to rotate the fernet keys,
@ -417,27 +440,26 @@ export OS_AUTH_VERSION=3
return
# minimum allowed for max_keys is 3
max_keys = max(self.model.config['fernet-max-active-keys'], 3)
exp = self.model.config['token-expiration']
exp_window = self.model.config['allow-expired-window']
max_keys = max(self.model.config["fernet-max-active-keys"], 3)
exp = self.model.config["token-expiration"]
exp_window = self.model.config["allow-expired-window"]
rotation_seconds = (exp + exp_window) / (max_keys - 2)
# last time the fernet keys were rotated, in seconds since the epoch
last_rotation: Optional[str] = (
self.peers.get_app_data(LAST_FERNET_KEY_ROTATION_KEY)
last_rotation: Optional[str] = self.peers.get_app_data(
LAST_FERNET_KEY_ROTATION_KEY
)
now: int = int(time.time())
if (
last_rotation is None or
now - int(last_rotation) >= rotation_seconds
last_rotation is None
or now - int(last_rotation) >= rotation_seconds
):
self._rotate_fernet_keys()
self.peers.set_app_data({LAST_FERNET_KEY_ROTATION_KEY: str(now)})
def _rotate_fernet_keys(self):
"""
Rotate fernet keys and trigger distribution.
"""Rotate fernet keys and trigger distribution.
If this is run on a non-leader unit, it's a noop.
Keys should only ever be rotated and distributed from a single unit.
@ -449,22 +471,23 @@ export OS_AUTH_VERSION=3
self.keystone_manager.read_fernet_keys()
)
def get_relation_handlers(self, handlers=None) -> List[
sunbeam_rhandlers.RelationHandler]:
def get_relation_handlers(
self, handlers=None
) -> List[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service."""
handlers = handlers or []
if self.can_add_handler('identity-service', handlers):
if self.can_add_handler("identity-service", handlers):
self.id_svc = IdentityServiceProvidesHandler(
self,
'identity-service',
"identity-service",
self.register_service,
)
handlers.append(self.id_svc)
if self.can_add_handler('identity-credentials', handlers):
if self.can_add_handler("identity-credentials", handlers):
self.cc_svc = CloudCredentialsProvidesHandler(
self,
'identity-credentials',
"identity-credentials",
self.add_credentials,
)
handlers.append(self.cc_svc)
@ -475,52 +498,58 @@ export OS_AUTH_VERSION=3
def config_contexts(self) -> List[sunbeam_contexts.ConfigContext]:
"""Configuration adapters for the operator."""
return [
KeystoneConfigAdapter(self, 'ks_config'),
KeystoneLoggingAdapter(self, 'ks_logging'),
sunbeam_contexts.CharmConfigContext(self, 'options')]
KeystoneConfigAdapter(self, "ks_config"),
KeystoneLoggingAdapter(self, "ks_logging"),
sunbeam_contexts.CharmConfigContext(self, "options"),
]
@property
def container_configs(self):
"""Container configs for keystone."""
_cconfigs = super().container_configs
_cconfigs.extend([
sunbeam_core.ContainerConfigFile(
LOGGING_CONF,
'keystone',
'keystone')])
_cconfigs.extend(
[
sunbeam_core.ContainerConfigFile(
LOGGING_CONF, "keystone", "keystone"
)
]
)
return _cconfigs
def register_service(self, event):
"""Register service in keystone."""
if not self._state.bootstrapped:
event.defer()
return
if not self.unit.is_leader():
return
relation = self.model.get_relation(
event.relation_name,
event.relation_id)
event.relation_name, event.relation_id
)
binding = self.framework.model.get_binding(relation)
ingress_address = str(binding.network.ingress_address)
service_domain = self.keystone_manager.create_domain(
name='service_domain',
may_exist=True)
name="service_domain", may_exist=True
)
service_project = self.keystone_manager.get_project(
name=self.service_project,
domain=service_domain)
admin_domain = self.keystone_manager.get_domain(
name='admin_domain')
name=self.service_project, domain=service_domain
)
admin_domain = self.keystone_manager.get_domain(name="admin_domain")
admin_project = self.keystone_manager.get_project(
name='admin',
domain=admin_domain)
name="admin", domain=admin_domain
)
admin_user = self.keystone_manager.get_user(
name=self.model.config['admin-user'],
name=self.model.config["admin-user"],
project=admin_project,
domain=admin_domain)
domain=admin_domain,
)
admin_role = self.keystone_manager.create_role(
name=self.admin_role,
may_exist=True)
name=self.admin_role, may_exist=True
)
for ep_data in event.service_endpoints:
service_username = 'svc_{}'.format(
event.client_app_name.replace('-', '_'))
service_username = "svc_{}".format(
event.client_app_name.replace("-", "_")
)
service_password = self.password_manager.retrieve_or_set(
service_username
)
@ -528,37 +557,41 @@ export OS_AUTH_VERSION=3
name=service_username,
password=service_password,
domain=service_domain.id,
may_exist=True)
may_exist=True,
)
self.keystone_manager.grant_role(
role=admin_role,
user=service_user,
project=service_project,
may_exist=True)
may_exist=True,
)
service = self.keystone_manager.create_service(
name=ep_data['service_name'],
service_type=ep_data['type'],
description=ep_data['description'],
may_exist=True)
for interface in ['admin', 'internal', 'public']:
name=ep_data["service_name"],
service_type=ep_data["type"],
description=ep_data["description"],
may_exist=True,
)
for interface in ["admin", "internal", "public"]:
self.keystone_manager.create_endpoint(
service=service,
interface=interface,
url=ep_data[f'{interface}_url'],
url=ep_data[f"{interface}_url"],
region=event.region,
may_exist=True)
may_exist=True,
)
self.id_svc.interface.set_identity_service_credentials(
event.relation_name,
event.relation_id,
'v3',
"v3",
ingress_address,
self.default_public_ingress_port,
'http',
"http",
ingress_address,
self.default_public_ingress_port,
'http',
"http",
ingress_address,
self.default_public_ingress_port,
'http',
"http",
admin_domain,
admin_project,
admin_user,
@ -568,62 +601,69 @@ export OS_AUTH_VERSION=3
service_user,
self.internal_endpoint,
self.admin_endpoint,
self.public_endpoint)
self.public_endpoint,
)
def add_credentials(self, event):
"""
"""Add credentials from user defined in event.
:param event:
:return:
"""
if not self.unit.is_leader():
logger.debug('Current unit is not the leader unit, deferring '
'credential creation to leader unit.')
logger.debug(
"Current unit is not the leader unit, deferring "
"credential creation to leader unit."
)
return
if not self.bootstrapped():
logger.debug('Keystone is not bootstrapped, deferring credential '
'creation until after bootstrap.')
logger.debug(
"Keystone is not bootstrapped, deferring credential "
"creation until after bootstrap."
)
event.defer()
return
relation = self.model.get_relation(
event.relation_name,
event.relation_id)
event.relation_name, event.relation_id
)
binding = self.framework.model.get_binding(relation)
ingress_address = str(binding.network.ingress_address)
service_domain = self.keystone_manager.create_domain(
name='service_domain',
may_exist=True)
name="service_domain", may_exist=True
)
service_project = self.keystone_manager.get_project(
name=self.service_project,
domain=service_domain)
name=self.service_project, domain=service_domain
)
user_password = self.password_manager.retrieve_or_set(event.username)
service_user = self.keystone_manager.create_user(
name=event.username,
password=user_password,
domain=service_domain.id,
may_exist=True)
may_exist=True,
)
admin_role = self.keystone_manager.create_role(
name=self.admin_role,
may_exist=True)
name=self.admin_role, may_exist=True
)
# TODO(wolsen) let's not always grant admin role!
self.keystone_manager.grant_role(
role=admin_role,
user=service_user,
project=service_project,
may_exist=True)
may_exist=True,
)
self.cc_svc.interface.set_cloud_credentials(
relation_name=event.relation_name,
relation_id=event.relation_id,
api_version='3',
api_version="3",
auth_host=ingress_address,
auth_port=self.default_public_ingress_port,
auth_protocol='http',
auth_protocol="http",
internal_host=ingress_address, # XXX(wolsen) internal address?
internal_port=self.default_public_ingress_port,
internal_protocol='http',
internal_protocol="http",
username=service_user.name,
password=user_password,
project_name=service_project.name,
@ -632,7 +672,7 @@ export OS_AUTH_VERSION=3
user_domain_id=service_domain.id,
project_domain_name=service_domain.name,
project_domain_id=service_domain.id,
region=self.model.config['region'], # XXX(wolsen) region matters?
region=self.model.config["region"], # XXX(wolsen) region matters?
)
def _get_service_account_action(self, event: ActionEvent) -> None:
@ -644,59 +684,67 @@ export OS_AUTH_VERSION=3
of the Charmed deployment.
"""
if not self.unit.is_leader():
event.fail('Please run action on lead unit.')
event.fail("Please run action on lead unit.")
return
# TODO: refactor into general helper method.
username = event.params['username']
username = event.params["username"]
service_domain = self.keystone_manager.create_domain(
name='service_domain',
may_exist=True)
name="service_domain", may_exist=True
)
service_project = self.keystone_manager.get_project(
name=self.service_project,
domain=service_domain)
name=self.service_project, domain=service_domain
)
user_password = self.password_manager.retrieve_or_set(username)
service_user = self.keystone_manager.create_user(
name=username,
password=user_password,
domain=service_domain.id,
may_exist=True)
may_exist=True,
)
admin_role = self.keystone_manager.create_role(
name=self.admin_role,
may_exist=True)
name=self.admin_role, may_exist=True
)
# TODO(wolsen) let's not always grant admin role!
self.keystone_manager.grant_role(
role=admin_role,
user=service_user,
project=service_project,
may_exist=True)
may_exist=True,
)
event.set_results({
"username": username,
"password": user_password,
"user-domain-name": service_domain.name,
"project-name": service_project.name,
"project-domain-name": service_domain.name,
"region": self.model.config['region'],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3
})
event.set_results(
{
"username": username,
"password": user_password,
"user-domain-name": service_domain.name,
"project-name": service_project.name,
"project-domain-name": service_domain.name,
"region": self.model.config["region"],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3,
}
)
@property
def default_public_ingress_port(self):
"""Default public ingress port."""
return 5000
@property
def default_domain_id(self):
"""Default domain id."""
return self._state.default_domain_id
@property
def admin_domain_name(self):
"""Admin domain name."""
return self._state.admin_domain_name
@property
def admin_domain_id(self):
"""Admin domain ID."""
return self._state.admin_domain_id
@property
@ -706,11 +754,13 @@ export OS_AUTH_VERSION=3
@property
def admin_user(self):
return self.model.config['admin-user']
"""Admin User."""
return self.model.config["admin-user"]
@property
def admin_role(self):
return self.model.config['admin-role']
"""Admin role."""
return self.model.config["admin-role"]
@property
def charm_user(self):
@ -719,7 +769,7 @@ export OS_AUTH_VERSION=3
This is a special admin user reserved for the charm to interact with
keystone.
"""
return '_charm-keystone-admin'
return "_charm-keystone-admin"
@property
def charm_password(self) -> str:
@ -728,53 +778,58 @@ export OS_AUTH_VERSION=3
@property
def service_project(self):
return self.model.config['service-tenant']
"""Service project name."""
return self.model.config["service-tenant"]
@property
def service_project_id(self):
"""Service project id."""
return self._state.service_project_id
@property
def admin_endpoint(self):
admin_hostname = self.model.config.get('os-admin-hostname')
"""Admin endpoint for keystone api."""
admin_hostname = self.model.config.get("os-admin-hostname")
if not admin_hostname:
admin_hostname = self.model.get_binding(
"identity-service"
).network.ingress_address
return f'http://{admin_hostname}:{self.service_port}'
return f"http://{admin_hostname}:{self.service_port}"
@property
def internal_endpoint(self):
"""Internal endpoint for keystone api."""
if self.ingress_internal and self.ingress_internal.url:
return self.ingress_internal.url
internal_hostname = self.model.config.get('os-internal-hostname')
internal_hostname = self.model.config.get("os-internal-hostname")
if not internal_hostname:
internal_hostname = self.model.get_binding(
"identity-service"
).network.ingress_address
return f'http://{internal_hostname}:{self.service_port}'
return f"http://{internal_hostname}:{self.service_port}"
@property
def public_endpoint(self):
"""Public endpoint for keystone api."""
if self.ingress_public and self.ingress_public.url:
return self.ingress_public.url
address = self.public_ingress_address
if not address:
address = self.model.get_binding(
'identity-service'
"identity-service"
).network.ingress_address
return f'http://{address}:{self.service_port}'
return f"http://{address}:{self.service_port}"
@property
def healthcheck_http_url(self) -> str:
"""Healthcheck HTTP URL for the service."""
return f'http://localhost:{self.default_public_ingress_port}/v3'
return f"http://localhost:{self.default_public_ingress_port}/v3"
def _do_bootstrap(self) -> bool:
"""
Starts the appropriate services in the order they are needed.
"""Starts the appropriate services in the order they are needed.
If the service has not yet been bootstrapped, then this will
1. Create the database
2. Bootstrap the keystone users service
@ -787,7 +842,7 @@ export OS_AUTH_VERSION=3
try:
self.keystone_manager.setup_keystone()
except (ops.pebble.ExecError, ops.pebble.ConnectionError) as error:
logger.exception('Failed to bootstrap')
logger.exception("Failed to bootstrap")
logger.exception(error)
return False
@ -798,10 +853,10 @@ export OS_AUTH_VERSION=3
# sure of exact exceptions to be caught. List below that
# are observed:
# keystoneauth1.exceptions.connection.ConnectFailure
logger.exception('Failed to setup projects and users')
logger.exception("Failed to setup projects and users")
return False
self.unit.status = model.MaintenanceStatus('Starting Keystone')
self.unit.status = model.MaintenanceStatus("Starting Keystone")
return True
def _ingress_changed(self, event: ops.framework.EventBase) -> None:
@ -810,18 +865,13 @@ export OS_AUTH_VERSION=3
Invoked when the data on the ingress relation has changed. This will
update the keystone endpoints, and then call the configure_charm.
"""
logger.debug('Received an ingress_changed event')
logger.debug("Received an ingress_changed event")
if self.bootstrapped():
self.keystone_manager.update_service_catalog_for_keystone()
self.configure_charm(event)
class KeystoneXenaOperatorCharm(KeystoneOperatorCharm):
openstack_release = 'xena'
if __name__ == "__main__":
# Note: use_juju_for_storage=True required per
# https://github.com/canonical/operator/issues/506
main(KeystoneXenaOperatorCharm, use_juju_for_storage=True)
main(KeystoneOperatorCharm, use_juju_for_storage=True)

View File

@ -38,7 +38,7 @@ max_active_keys = {{ ks_config.fernet_max_active_keys }}
{% include "parts/section-signing" %}
{% include "section-oslo-cache" %}
{% include "parts/section-oslo-cache" %}
# This goes in the section above, selectively
# Bug #1899117
expiration_time = {{ ks_config.dogpile_cache_expiration }}
@ -103,7 +103,7 @@ admin_project_name = admin
{% include "parts/section-federation" %}
{% include "section-oslo-middleware" %}
{% include "parts/section-oslo-middleware" %}
# This goes in the section above, selectively
# Bug #1819134
max_request_body_size = 114688

View File

@ -0,0 +1,6 @@
[cache]
{% if memcache_url %}
enabled = true
backend = oslo_cache.memcache_pool
memcache_servers = {{ memcache_url }}
{% endif %}

View File

@ -0,0 +1,5 @@
[oslo_middleware]
# Bug #1758675
enable_proxy_headers_parsing = true

View File

@ -0,0 +1,15 @@
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Keystone utilities."""

View File

@ -1,4 +1,4 @@
# Copyright 2021, Canonical Ltd.
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,54 +12,79 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ops.pebble
from ops import framework
from ops.model import MaintenanceStatus
from keystoneauth1 import session
from keystoneauth1.identity import v3
from keystoneclient.v3 import client
from keystoneclient.v3.domains import Domain
from keystoneclient.v3.endpoints import Endpoint
from keystoneclient.v3.projects import Project
from keystoneclient.v3.regions import Region
from keystoneclient.v3.roles import Role
from keystoneclient.v3.services import Service
from keystoneclient.v3.users import User
import ops_sunbeam.guard as sunbeam_guard
"""Manager for interacting with keystone."""
import logging
import typing
import ops.pebble
import ops_sunbeam.guard as sunbeam_guard
from keystoneauth1 import (
session,
)
from keystoneauth1.identity import (
v3,
)
from keystoneclient.v3 import (
client,
)
from keystoneclient.v3.domains import (
Domain,
)
from keystoneclient.v3.endpoints import (
Endpoint,
)
from keystoneclient.v3.projects import (
Project,
)
from keystoneclient.v3.regions import (
Region,
)
from keystoneclient.v3.roles import (
Role,
)
from keystoneclient.v3.services import (
Service,
)
from keystoneclient.v3.users import (
User,
)
from ops import (
framework,
)
from ops.model import (
MaintenanceStatus,
)
logger = logging.getLogger(__name__)
class KeystoneException(Exception):
class KeystoneExceptionError(Exception):
"""Error interacting with Keystone."""
pass
class KeystoneManager(framework.Object):
"""
"""Class for managing interactions with keystone api."""
"""
def __init__(self, charm, container_name):
super().__init__(charm, 'keystone-manager')
"""Setup the manager."""
super().__init__(charm, "keystone-manager")
self.charm = charm
self.container_name = container_name
self._api = None
def run_cmd(self, cmd, exception_on_error=True, **kwargs):
"""Run command in container."""
pebble_handler = self.charm.get_named_pebble_handler(
self.container_name)
return pebble_handler.execute(
cmd,
exception_on_error=True, **kwargs)
self.container_name
)
return pebble_handler.execute(cmd, exception_on_error=True, **kwargs)
@property
def api(self):
"""
Returns the current api reference or creates a new one.
"""Returns the current api reference or creates a new one.
TODO(wolsen): All of the direct interaction with keystone belongs in
an Adapter class which can handle v3 as well as future versions.
@ -72,31 +97,37 @@ class KeystoneManager(framework.Object):
auth_url="http://localhost:5000/v3",
username=self.charm.charm_user,
password=self.charm.charm_password,
system_scope='all',
project_domain_name='Default',
user_domain_name='Default',
system_scope="all",
project_domain_name="Default",
user_domain_name="Default",
)
keystone_session = session.Session(auth=auth)
self._api = client.Client(session=keystone_session,
endpoint_override='http://localhost:5000/v3')
self._api = client.Client(
session=keystone_session,
endpoint_override="http://localhost:5000/v3",
)
return self._api
@property
def admin_endpoint(self):
"""Admin endpoint for this keystone."""
return self.charm.admin_endpoint
@property
def internal_endpoint(self):
"""Internal endpoint for this keystone."""
return self.charm.internal_endpoint
@property
def public_endpoint(self):
"""Public endpoint for this keystone."""
return self.charm.public_endpoint
@property
def regions(self):
"""List of regions required for this keystone."""
# split regions and strip out empty regions
regions = [r for r in self.charm.model.config['region'].split() if r]
regions = [r for r in self.charm.model.config["region"].split() if r]
return regions
def setup_keystone(self):
@ -107,15 +138,14 @@ class KeystoneManager(framework.Object):
repositories for tokens and credentials, and bootstrapping the initial
keystone service.
"""
with sunbeam_guard.guard(self.charm, 'Initializing Keystone', False):
with sunbeam_guard.guard(self.charm, "Initializing Keystone", False):
self._sync_database()
self._fernet_setup()
self._credential_setup()
self._bootstrap()
def rotate_fernet_keys(self):
"""
Rotate the fernet keys.
"""Rotate the fernet keys.
And probably also distribute the keys to other units too.
@ -126,27 +156,29 @@ class KeystoneManager(framework.Object):
at intervals of
(token-expiration + allow-expired-window)/(fernet-max-active-keys - 2)
"""
with sunbeam_guard.guard(self.charm, 'Rotating fernet keys'):
self.run_cmd([
'sudo', '-u', 'keystone',
'keystone-manage', 'fernet_rotate',
'--keystone-user', 'keystone',
'--keystone-group', 'keystone'])
with sunbeam_guard.guard(self.charm, "Rotating fernet keys"):
self.run_cmd(
[
"sudo",
"-u",
"keystone",
"keystone-manage",
"fernet_rotate",
"--keystone-user",
"keystone",
"--keystone-group",
"keystone",
]
)
def read_fernet_keys(self) -> typing.Mapping[str, str]:
"""
Pull the fernet keys from the on-disk repository.
"""
"""Pull the fernet keys from the on-disk repository."""
container = self.charm.unit.get_container(self.container_name)
files = container.list_files("/etc/keystone/fernet-keys")
return {
file.name: container.pull(file.path).read() for file in files
}
return {file.name: container.pull(file.path).read() for file in files}
def write_fernet_keys(self, keys: typing.Mapping[str, str]):
"""
Update the local fernet key repository with the provided keys.
"""
"""Update the local fernet key repository with the provided keys."""
container = self.charm.unit.get_container(self.container_name)
logger.debug("Writing updated fernet keys")
@ -154,10 +186,10 @@ class KeystoneManager(framework.Object):
# write the keys
for filename, contents in keys.items():
container.push(
f'/etc/keystone/fernet-keys/{filename}',
f"/etc/keystone/fernet-keys/{filename}",
contents,
user='keystone',
group='keystone',
user="keystone",
group="keystone",
permissions=0o600,
)
@ -169,6 +201,7 @@ class KeystoneManager(framework.Object):
def _set_status(self, status: str, app: bool = False) -> None:
"""Sets the status to the specified status string.
By default, the status is set on the individual unit but can be set
for the whole application if app is set to True.
@ -186,176 +219,227 @@ class KeystoneManager(framework.Object):
target.status = MaintenanceStatus(status)
def _sync_database(self):
"""Syncs the database using the keystone-manage db_sync
"""Syncs the database using the keystone-manage db_sync.
The database is synchronized using the keystone-manage db_sync command.
Database configuration information is retrieved from configuration
files.
:raises: KeystoneException when the database sync fails.
:raises: KeystoneExceptionError when the database sync fails.
"""
try:
self._set_status('Syncing database')
self._set_status("Syncing database")
logger.info("Syncing database...")
self.run_cmd([
'sudo', '-u', 'keystone',
'keystone-manage', '--config-dir',
'/etc/keystone', 'db_sync'])
self.run_cmd(
[
"sudo",
"-u",
"keystone",
"keystone-manage",
"--config-dir",
"/etc/keystone",
"db_sync",
]
)
except ops.pebble.ExecError:
logger.exception('Error occurred synchronizing the database.')
raise KeystoneException('Database sync failed')
logger.exception("Error occurred synchronizing the database.")
raise KeystoneExceptionError("Database sync failed")
def _fernet_setup(self):
"""Sets up the fernet token store in the specified container.
:raises: KeystoneException when a failure occurs setting up the fernet
:raises: KeystoneExceptionError when a failure occurs setting up the fernet
token store
"""
try:
self._set_status('Setting up fernet tokens')
self._set_status("Setting up fernet tokens")
logger.info("Setting up fernet tokens...")
self.run_cmd([
'sudo', '-u', 'keystone',
'keystone-manage', 'fernet_setup',
'--keystone-user', 'keystone',
'--keystone-group', 'keystone'])
self.run_cmd(
[
"sudo",
"-u",
"keystone",
"keystone-manage",
"fernet_setup",
"--keystone-user",
"keystone",
"--keystone-group",
"keystone",
]
)
except ops.pebble.ExecError:
logger.exception('Error occurred setting up fernet tokens')
raise KeystoneException('Fernet setup failed.')
logger.exception("Error occurred setting up fernet tokens")
raise KeystoneExceptionError("Fernet setup failed.")
def _credential_setup(self):
"""
"""
"""Run keystone credential_setup."""
try:
self._set_status('Setting up credentials')
self._set_status("Setting up credentials")
logger.info("Setting up credentials...")
self.run_cmd([
'sudo', '-u', 'keystone',
'keystone-manage', 'credential_setup'])
self.run_cmd(
[
"sudo",
"-u",
"keystone",
"keystone-manage",
"credential_setup",
]
)
except ops.pebble.ExecError:
logger.exception('Error occurred during credential setup')
raise KeystoneException('Credential setup failed.')
logger.exception("Error occurred during credential setup")
raise KeystoneExceptionError("Credential setup failed.")
def _bootstrap(self):
"""
"""
"""Run keystone bootstrap."""
try:
self._set_status('Bootstrapping Keystone')
logger.info('Bootstrapping keystone service')
self._set_status("Bootstrapping Keystone")
logger.info("Bootstrapping keystone service")
# NOTE(wolsen) in classic keystone charm, there's a comment about
# enabling immutable roles for this. This is unnecessary as it is
# now the default behavior for keystone-manage bootstrap.
self.run_cmd([
'keystone-manage', 'bootstrap',
'--bootstrap-username', self.charm.charm_user,
'--bootstrap-password', self.charm.charm_password,
'--bootstrap-project-name', 'admin',
'--bootstrap-role-name', self.charm.admin_role,
'--bootstrap-service-name', 'keystone',
'--bootstrap-admin-url', self.admin_endpoint,
'--bootstrap-public-url', self.public_endpoint,
'--bootstrap-internal-url', self.internal_endpoint,
'--bootstrap-region-id', self.regions[0]])
self.run_cmd(
[
"keystone-manage",
"bootstrap",
"--bootstrap-username",
self.charm.charm_user,
"--bootstrap-password",
self.charm.charm_password,
"--bootstrap-project-name",
"admin",
"--bootstrap-role-name",
self.charm.admin_role,
"--bootstrap-service-name",
"keystone",
"--bootstrap-admin-url",
self.admin_endpoint,
"--bootstrap-public-url",
self.public_endpoint,
"--bootstrap-internal-url",
self.internal_endpoint,
"--bootstrap-region-id",
self.regions[0],
]
)
except ops.pebble.ExecError:
logger.exception('Error occurred bootstrapping keystone service')
raise KeystoneException('Bootstrap failed')
logger.exception("Error occurred bootstrapping keystone service")
raise KeystoneExceptionError("Bootstrap failed")
def setup_initial_projects_and_users(self):
"""
"""
with sunbeam_guard.guard(self.charm,
'Setting up initial projects and users',
False):
"""Setup initial projects and users."""
with sunbeam_guard.guard(
self.charm, "Setting up initial projects and users", False
):
self._setup_admin_accounts()
self._setup_service_accounts()
self.update_service_catalog_for_keystone()
def _setup_admin_accounts(self):
"""
"""
"""Setup admin accounts."""
# Get the default domain id
default_domain = self.get_domain('default')
logger.debug(f'Default domain id: {default_domain.id}')
default_domain = self.get_domain("default")
logger.debug(f"Default domain id: {default_domain.id}")
self.charm._state.default_domain_id = default_domain.id # noqa
# Get the admin domain id
admin_domain = self.create_domain(name='admin_domain',
may_exist=True)
logger.debug(f'Admin domain id: {admin_domain.id}')
admin_domain = self.create_domain(name="admin_domain", may_exist=True)
logger.debug(f"Admin domain id: {admin_domain.id}")
self.charm._state.admin_domain_id = admin_domain.id # noqa
self.charm._state.admin_domain_name = admin_domain.name # noqa
# Ensure that we have the necessary projects: admin and service
admin_project = self.create_project(name='admin', domain=admin_domain,
may_exist=True)
admin_project = self.create_project(
name="admin", domain=admin_domain, may_exist=True
)
logger.debug('Ensuring admin user exists')
admin_user = self.create_user(name=self.charm.admin_user,
password=self.charm.admin_password,
domain=admin_domain, may_exist=True)
logger.debug("Ensuring admin user exists")
admin_user = self.create_user(
name=self.charm.admin_user,
password=self.charm.admin_password,
domain=admin_domain,
may_exist=True,
)
logger.debug('Ensuring roles exist for admin')
logger.debug("Ensuring roles exist for admin")
# I seem to recall all kinds of grief between Member and member and
# _member_ and inconsistencies in what other projects expect.
member_role = self.create_role(name='member', may_exist=True)
admin_role = self.create_role(name=self.charm.admin_role,
may_exist=True)
member_role = self.create_role(name="member", may_exist=True)
admin_role = self.create_role(
name=self.charm.admin_role, may_exist=True
)
logger.debug('Granting roles to admin user')
logger.debug("Granting roles to admin user")
# Make the admin a member of the admin project
self.grant_role(role=member_role, user=admin_user,
project=admin_project, may_exist=True)
self.grant_role(
role=member_role,
user=admin_user,
project=admin_project,
may_exist=True,
)
# Make the admin an admin of the admin project
self.grant_role(role=admin_role, user=admin_user,
project=admin_project, may_exist=True)
self.grant_role(
role=admin_role,
user=admin_user,
project=admin_project,
may_exist=True,
)
# Make the admin a domain-level admin
self.grant_role(role=admin_role, user=admin_user,
domain=admin_domain, may_exist=True)
self.grant_role(
role=admin_role,
user=admin_user,
domain=admin_domain,
may_exist=True,
)
def _setup_service_accounts(self):
"""
"""
"""Create service accounts."""
# Get the service domain id
service_domain = self.create_domain(name='service_domain',
may_exist=True)
logger.debug(f'Service domain id: {service_domain.id}.')
service_domain = self.create_domain(
name="service_domain", may_exist=True
)
logger.debug(f"Service domain id: {service_domain.id}.")
service_project = self.create_project(name=self.charm.service_project,
domain=service_domain,
may_exist=True)
logger.debug(f'Service project id: {service_project.id}.')
service_project = self.create_project(
name=self.charm.service_project,
domain=service_domain,
may_exist=True,
)
logger.debug(f"Service project id: {service_project.id}.")
self.charm._state.service_project_id = service_project.id # noqa
def update_service_catalog_for_keystone(self):
"""
"""
service = self.create_service(name='keystone', service_type='identity',
description='Keystone Identity Service',
may_exist=True)
"""Create identity service in catalogue."""
service = self.create_service(
name="keystone",
service_type="identity",
description="Keystone Identity Service",
may_exist=True,
)
endpoints = {
'admin': self.admin_endpoint,
'internal': self.internal_endpoint,
'public': self.public_endpoint,
"admin": self.admin_endpoint,
"internal": self.internal_endpoint,
"public": self.public_endpoint,
}
for region in self.charm.model.config['region'].split():
for region in self.charm.model.config["region"].split():
if not region:
continue
for interface, url in endpoints.items():
self.create_endpoint(service=service, interface=interface,
url=url, region=region, may_exist=True)
self.create_endpoint(
service=service,
interface=interface,
url=url,
region=region,
may_exist=True,
)
def get_domain(self, name: str) -> 'Domain':
"""Returns the domain specified by the name, or None if a matching
def get_domain(self, name: str) -> "Domain":
"""Get domain by name.
Returns the domain specified by the name, or None if a matching
domain could not be found.
:param name: the name of the domain
@ -368,76 +452,94 @@ class KeystoneManager(framework.Object):
return None
def create_domain(self, name: str, description: str = 'Created by Juju',
may_exist: bool = False) -> 'Domain':
"""
"""
def create_domain(
self,
name: str,
description: str = "Created by Juju",
may_exist: bool = False,
) -> "Domain":
"""Create a domain."""
if may_exist:
domain = self.get_domain(name)
if domain:
logger.debug(f'Domain {name} already exists with domain '
f'id {domain.id}.')
logger.debug(
f"Domain {name} already exists with domain "
f"id {domain.id}."
)
return domain
domain = self.api.domains.create(name=name, description=description)
logger.debug(f'Created domain {name} with id {domain.id}')
logger.debug(f"Created domain {name} with id {domain.id}")
return domain
def create_project(self, name: str, domain: str,
description: str = 'Created by Juju',
may_exist: bool = False) -> 'Project':
"""
"""
def create_project(
self,
name: str,
domain: str,
description: str = "Created by Juju",
may_exist: bool = False,
) -> "Project":
"""Create a project."""
if may_exist:
for project in self.api.projects.list(domain=domain):
if project.name.lower() == name.lower():
logger.debug(f'Project {name} already exists with project '
f'id {project.id}.')
logger.debug(
f"Project {name} already exists with project "
f"id {project.id}."
)
return project
project = self.api.projects.create(name=name, description=description,
domain=domain)
logger.debug(f'Created project {name} with id {project.id}')
project = self.api.projects.create(
name=name, description=description, domain=domain
)
logger.debug(f"Created project {name} with id {project.id}")
return project
def get_project(self, name: str,
domain: typing.Union[str, 'Domain'] = None):
"""
"""
def get_project(
self, name: str, domain: typing.Union[str, "Domain"] = None
):
"""Get a project from name."""
projects = self.api.projects.list(domain=domain)
for project in projects:
if project.name.lower() == name.lower():
return project
return None
def create_user(self, name: str, password: str, email: str = None,
project: 'Project' = None,
domain: 'Domain' = None,
may_exist: bool = False) -> 'User':
"""
"""
def create_user(
self,
name: str,
password: str,
email: str = None,
project: "Project" = None,
domain: "Domain" = None,
may_exist: bool = False,
) -> "User":
"""Create a user."""
if may_exist:
user = self.get_user(name, project=project, domain=domain)
if user:
logger.debug(f'User {name} already exists with user '
f'id {user.id}.')
logger.debug(
f"User {name} already exists with user " f"id {user.id}."
)
return user
user = self.api.users.create(name=name, default_project=project,
domain=domain, password=password,
email=email)
logger.debug(f'Created user {user.name} with id {user.id}.')
user = self.api.users.create(
name=name,
default_project=project,
domain=domain,
password=password,
email=email,
)
logger.debug(f"Created user {user.name} with id {user.id}.")
return user
def get_user(self, name: str, project: 'Project' = None,
domain: typing.Union[str, 'Domain'] = None) -> 'User':
"""
"""
def get_user(
self,
name: str,
project: "Project" = None,
domain: typing.Union[str, "Domain"] = None,
) -> "User":
"""Get a user from name."""
users = self.api.users.list(default_project=project, domain=domain)
for user in users:
if user.name.lower() == name.lower():
@ -445,45 +547,41 @@ class KeystoneManager(framework.Object):
return None
def create_role(self, name: str,
domain: typing.Union['Domain', str] = None,
may_exist: bool = False) -> 'Role':
"""
"""
def create_role(
self,
name: str,
domain: typing.Union["Domain", str] = None,
may_exist: bool = False,
) -> "Role":
"""Create a role."""
if may_exist:
role = self.get_role(name=name, domain=domain)
if role:
logger.debug(f'Role {name} already exists with role '
f'id {role.id}')
logger.debug(
f"Role {name} already exists with role " f"id {role.id}"
)
return role
role = self.api.roles.create(name=name, domain=domain)
logger.debug(f'Created role {name} with id {role.id}.')
logger.debug(f"Created role {name} with id {role.id}.")
return role
def get_role(self, name: str,
domain: 'Domain' = None) -> 'Role':
"""
"""
def get_role(self, name: str, domain: "Domain" = None) -> "Role":
"""Get role for user."""
for role in self.api.roles.list(domain=domain):
if role.name == name:
return role
return None
def get_roles(self, user: 'User',
project: 'Project' = None,
domain: 'Project' = None) \
-> typing.List['Role']:
"""
"""
def get_roles(
self, user: "User", project: "Project" = None, domain: "Project" = None
) -> typing.List["Role"]:
"""Get Roles for user."""
if project and domain:
raise ValueError('Project and domain are mutually exclusive')
raise ValueError("Project and domain are mutually exclusive")
if not project and not domain:
raise ValueError('Project or domain must be specified')
raise ValueError("Project or domain must be specified")
if project:
roles = self.api.roles.list(user=user, project=project)
@ -492,59 +590,64 @@ class KeystoneManager(framework.Object):
return roles
def grant_role(self, role: typing.Union['Role', str],
user: 'User',
project: typing.Union['Project', str] = None,
domain: typing.Union['Domain', str] = None,
may_exist: bool = False) -> 'Role':
"""
"""
def grant_role(
self,
role: typing.Union["Role", str],
user: "User",
project: typing.Union["Project", str] = None,
domain: typing.Union["Domain", str] = None,
may_exist: bool = False,
) -> "Role":
"""Grant role to user."""
if project and domain:
raise ValueError('Project and domain are mutually exclusive')
raise ValueError("Project and domain are mutually exclusive")
if not project and not domain:
raise ValueError('Project or domain must be specified')
raise ValueError("Project or domain must be specified")
if domain:
ctxt_str = f'domain {domain.name}'
ctxt_str = f"domain {domain.name}"
else:
ctxt_str = f'project {project.name}'
ctxt_str = f"project {project.name}"
if may_exist:
roles = self.get_roles(user=user, project=project, domain=domain)
for r in roles:
if role.id == r.id:
logger.debug(f'User {user.name} already has role '
f'{role.name} for {ctxt_str}')
logger.debug(
f"User {user.name} already has role "
f"{role.name} for {ctxt_str}"
)
return r
role = self.api.roles.grant(role=role, user=user, project=project,
domain=domain)
logger.debug(f'Granted user {user} role {role} for '
f'{ctxt_str}.')
role = self.api.roles.grant(
role=role, user=user, project=project, domain=domain
)
logger.debug(f"Granted user {user} role {role} for " f"{ctxt_str}.")
return role
def create_region(self, name: str, description: str = None,
may_exist: bool = False) -> 'Region':
"""
"""
def create_region(
self, name: str, description: str = None, may_exist: bool = False
) -> "Region":
"""Create Region in keystone."""
if may_exist:
for region in self.api.regions.list():
if region.id == name:
logger.debug(f'Region {name} already exists.')
logger.debug(f"Region {name} already exists.")
return region
region = self.api.regions.create(id=name, description=description)
logger.debug(f'Created region {name}.')
logger.debug(f"Created region {name}.")
return region
def create_service(self, name: str, service_type: str,
description: str, owner: str = None,
may_exist: bool = False) -> 'Service':
"""
"""
def create_service(
self,
name: str,
service_type: str,
description: str,
owner: str = None,
may_exist: bool = False,
) -> "Service":
"""Create service in Keystone."""
if may_exist:
services = self.api.services.list(name=name, type=service_type)
# TODO(wolsen) can we have more than one service with the same
@ -552,45 +655,58 @@ class KeystoneManager(framework.Object):
# one for now.
print("FOUND: {}".format(services))
for service in services:
logger.debug(f'Service {name} already exists with '
f'service id {service.id}.')
logger.debug(
f"Service {name} already exists with "
f"service id {service.id}."
)
return service
service = self.api.services.create(name=name, type=service_type,
description=description)
logger.debug(f'Created service {service.name} with id {service.id}')
service = self.api.services.create(
name=name, type=service_type, description=description
)
logger.debug(f"Created service {service.name} with id {service.id}")
return service
def create_endpoint(self, service: 'Service', url: str, interface: str,
region: str, may_exist: bool = False) \
-> 'Endpoint':
"""
"""
ep_string = (f'{interface} endpoint for service {service} in '
f'region {region}')
def create_endpoint(
self,
service: "Service",
url: str,
interface: str,
region: str,
may_exist: bool = False,
) -> "Endpoint":
"""Create endpoint in keystone."""
ep_string = (
f"{interface} endpoint for service {service} in "
f"region {region}"
)
if may_exist:
endpoints = self.api.endpoints.list(service=service,
interface=interface,
region=region)
endpoints = self.api.endpoints.list(
service=service, interface=interface, region=region
)
if endpoints:
# NOTE(wolsen) if we have endpoints found, there should be only
# one endpoint; but assert it to make sure
assert len(endpoints) == 1
endpoint = endpoints[0]
if endpoint.url != url:
logger.debug(f'{ep_string} ({endpoint.url}) does '
f'not match requested url ({url}). Updating.')
endpoint = self.api.endpoints.update(endpoint=endpoint,
url=url)
logger.debug(f'Endpoint updated to use {url}')
logger.debug(
f"{ep_string} ({endpoint.url}) does "
f"not match requested url ({url}). Updating."
)
endpoint = self.api.endpoints.update(
endpoint=endpoint, url=url
)
logger.debug(f"Endpoint updated to use {url}")
else:
logger.debug(f'Endpoint {ep_string} already exists with '
f'id {endpoint.id}')
logger.debug(
f"Endpoint {ep_string} already exists with "
f"id {endpoint.id}"
)
return endpoint
endpoint = self.api.endpoints.create(service=service, url=url,
interface=interface,
region=region)
logger.debug(f'Created endpoint {ep_string} with id {endpoint.id}')
endpoint = self.api.endpoints.create(
service=service, url=url, interface=interface, region=region
)
logger.debug(f"Created endpoint {ep_string} with id {endpoint.id}")
return endpoint

View File

@ -4,6 +4,7 @@
# https://github.com/openstack-charmers/release-tools
#
pwgen
coverage
mock
flake8

View File

@ -0,0 +1,15 @@
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for charm."""

View File

@ -1,48 +0,0 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Learn more about testing at: https://juju.is/docs/sdk/testing
import unittest
from unittest.mock import Mock
from ops.testing import Harness
from charm import KeystoneOperatorCharm
class TestCharm(unittest.TestCase):
def test_config_changed(self):
harness = Harness(KeystoneOperatorCharm)
self.addCleanup(harness.cleanup)
harness.begin()
self.assertEqual(list(harness.charm._stored.things), [])
harness.update_config({"thing": "foo"})
self.assertEqual(list(harness.charm._stored.things), ["foo"])
def test_action(self):
harness = Harness(KeystoneOperatorCharm)
harness.begin()
# the harness doesn't (yet!) help much with actions themselves
action_event = Mock(params={"fail": ""})
harness.charm._on_fortune_action(action_event)
self.assertTrue(action_event.set_results.called)
def test_action_fail(self):
harness = Harness(KeystoneOperatorCharm)
harness.begin()
action_event = Mock(params={"fail": "fail this"})
harness.charm._on_fortune_action(action_event)
self.assertEqual(action_event.fail.call_args, [("fail this",)])

View File

@ -0,0 +1,15 @@
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for charm."""

View File

@ -14,16 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import MagicMock, ANY
import mock
"""Define keystone tests."""
import json
import os
from unittest.mock import (
ANY,
MagicMock,
)
import charm
import mock
import ops_sunbeam.test_utils as test_utils
import charm
class _KeystoneXenaOperatorCharm(charm.KeystoneXenaOperatorCharm):
class _KeystoneOperatorCharm(charm.KeystoneOperatorCharm):
"""Create Keystone operator test charm."""
def __init__(self, framework):
self.seen_events = []
@ -38,15 +45,16 @@ class _KeystoneXenaOperatorCharm(charm.KeystoneXenaOperatorCharm):
@property
def public_ingress_address(self) -> str:
return '10.0.0.10'
return "10.0.0.10"
class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
"""Test Keystone operator charm."""
PATCHES = [
'manager',
'subprocess',
'pwgen',
"manager",
"subprocess",
"pwgen",
]
def add_id_relation(self) -> str:
@ -71,34 +79,40 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
"description": "Cinder Volume Service v2",
"internal_url": f"{interal_url}/v2/$(tenant_id)s",
"public_url": f"{public_url}/v2/$(tenant_id)s",
"admin_url": f"{interal_url}/v2/$(tenant_id)s"},
"admin_url": f"{interal_url}/v2/$(tenant_id)s",
},
{
"service_name": "cinderv3",
"type": "volumev3",
"description": "Cinder Volume Service v3",
"internal_url": f"{interal_url}/v3/$(tenant_id)s",
"public_url": f"{public_url}/v3/$(tenant_id)s",
"admin_url": f"{interal_url}/v3/$(tenant_id)s"}])})
"admin_url": f"{interal_url}/v3/$(tenant_id)s",
},
]
),
},
)
return rel_id
def ks_manager_mock(self):
"""Create keystone manager mock."""
def _create_mock(p_name, p_id):
_mock = mock.MagicMock()
type(_mock).name = mock.PropertyMock(
return_value=p_name)
type(_mock).id = mock.PropertyMock(
return_value=p_id)
type(_mock).name = mock.PropertyMock(return_value=p_name)
type(_mock).id = mock.PropertyMock(return_value=p_id)
return _mock
service_domain_mock = _create_mock('sdomain_name', 'sdomain_id')
admin_domain_mock = _create_mock('adomain_name', 'adomain_id')
service_domain_mock = _create_mock("sdomain_name", "sdomain_id")
admin_domain_mock = _create_mock("adomain_name", "adomain_id")
admin_project_mock = _create_mock('aproject_name', 'aproject_id')
admin_project_mock = _create_mock("aproject_name", "aproject_id")
service_user_mock = _create_mock('suser_name', 'suser_id')
admin_user_mock = _create_mock('auser_name', 'auser_id')
service_user_mock = _create_mock("suser_name", "suser_id")
admin_user_mock = _create_mock("auser_name", "auser_id")
admin_role_mock = _create_mock('arole_name', 'arole_id')
admin_role_mock = _create_mock("arole_name", "arole_id")
km_mock = mock.MagicMock()
km_mock.get_domain.return_value = admin_domain_mock
@ -108,36 +122,39 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
km_mock.create_user.return_value = service_user_mock
km_mock.create_role.return_value = admin_role_mock
km_mock.read_fernet_keys.return_value = {
'0': 'Qf4vHdf6XC2dGKpEwtGapq7oDOqUWepcH2tKgQ0qOKc=',
'3': 'UK3qzLGvu-piYwau0BFyed8O3WP8lFKH_v1sXYulzhs=',
'4': 'YVYUJbQNASbVzzntqj2sG9rbDOV_QQfueDCz0PJEKKw=',
"0": "Qf4vHdf6XC2dGKpEwtGapq7oDOqUWepcH2tKgQ0qOKc=",
"3": "UK3qzLGvu-piYwau0BFyed8O3WP8lFKH_v1sXYulzhs=",
"4": "YVYUJbQNASbVzzntqj2sG9rbDOV_QQfueDCz0PJEKKw=",
}
return km_mock
@mock.patch(
'charms.observability_libs.v0.kubernetes_service_patch.'
'KubernetesServicePatch')
"charms.observability_libs.v0.kubernetes_service_patch."
"KubernetesServicePatch"
)
def setUp(self, mock_svc_patch):
"""Run test setup."""
super().setUp(charm, self.PATCHES)
# used by _launch_heartbeat.
# value doesn't matter for tests because mocking
os.environ["JUJU_CHARM_DIR"] = "/arbitrary/directory/"
self.subprocess.call.return_value = 1
self.pwgen.pwgen.return_value = 'randonpassword'
self.pwgen.pwgen.return_value = "randonpassword"
self.km_mock = self.ks_manager_mock()
self.manager.KeystoneManager.return_value = self.km_mock
self.harness = test_utils.get_harness(
_KeystoneXenaOperatorCharm,
container_calls=self.container_calls)
_KeystoneOperatorCharm, container_calls=self.container_calls
)
# clean up events that were dynamically defined,
# otherwise we get issues because they'll be redefined,
# which is not allowed.
from charms.data_platform_libs.v0.database_requires import (
DatabaseEvents
DatabaseEvents,
)
for attr in (
"database_database_created",
"database_endpoints_changed",
@ -152,116 +169,113 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
self.harness.begin()
def test_pebble_ready_handler(self):
"""Test pebble ready handler."""
self.assertEqual(self.harness.charm.seen_events, [])
self.harness.container_pebble_ready('keystone')
self.assertEqual(self.harness.charm.seen_events, ['PebbleReadyEvent'])
self.harness.container_pebble_ready("keystone")
self.assertEqual(self.harness.charm.seen_events, ["PebbleReadyEvent"])
def test_id_client(self):
"""Test responding to an identity client."""
test_utils.add_complete_ingress_relation(self.harness)
self.harness.set_leader()
peer_rel_id = self.harness.add_relation('peers', 'keystone')
self.harness.container_pebble_ready('keystone')
peer_rel_id = self.harness.add_relation("peers", "keystone")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.harness, test_utils.add_base_db_relation(self.harness)
)
identity_rel_id = self.add_id_relation()
rel_data = self.harness.get_relation_data(
identity_rel_id,
self.harness.charm.unit.app.name)
identity_rel_id, self.harness.charm.unit.app.name
)
self.maxDiff = None
self.assertEqual(
rel_data,
{
'admin-auth-url': 'http://10.0.0.10:5000',
'admin-domain-id': 'adomain_id',
'admin-domain-name': 'adomain_name',
'admin-project-id': 'aproject_id',
'admin-project-name': 'aproject_name',
'admin-user-id': 'auser_id',
'admin-user-name': 'auser_name',
'api-version': 'v3',
'auth-host': '10.0.0.10',
'auth-port': '5000',
'auth-protocol': 'http',
'internal-auth-url': 'http://internal-url',
'internal-host': '10.0.0.10',
'internal-port': '5000',
'internal-protocol': 'http',
'public-auth-url': 'http://public-url',
'service-domain-id': 'sdomain_id',
'service-domain-name': 'sdomain_name',
'service-host': '10.0.0.10',
'service-password': 'randonpassword',
'service-port': '5000',
'service-project-id': 'aproject_id',
'service-project-name': 'aproject_name',
'service-protocol': 'http',
'service-user-id': 'suser_id',
'service-user-name': 'suser_name'})
"admin-auth-url": "http://10.0.0.10:5000",
"admin-domain-id": "adomain_id",
"admin-domain-name": "adomain_name",
"admin-project-id": "aproject_id",
"admin-project-name": "aproject_name",
"admin-user-id": "auser_id",
"admin-user-name": "auser_name",
"api-version": "v3",
"auth-host": "10.0.0.10",
"auth-port": "5000",
"auth-protocol": "http",
"internal-auth-url": "http://internal-url",
"internal-host": "10.0.0.10",
"internal-port": "5000",
"internal-protocol": "http",
"public-auth-url": "http://public-url",
"service-domain-id": "sdomain_id",
"service-domain-name": "sdomain_name",
"service-host": "10.0.0.10",
"service-password": "randonpassword",
"service-port": "5000",
"service-project-id": "aproject_id",
"service-project-name": "aproject_name",
"service-protocol": "http",
"service-user-id": "suser_id",
"service-user-name": "suser_name",
},
)
peer_data = self.harness.get_relation_data(
peer_rel_id,
self.harness.charm.unit.app.name)
peer_rel_id, self.harness.charm.unit.app.name
)
self.assertEqual(
peer_data,
{
'leader_ready': 'true',
'password_svc_cinder': 'randonpassword'
}
{"leader_ready": "true", "password_svc_cinder": "randonpassword"},
)
def test_leader_bootstraps(self):
"""Test leader bootstrap."""
test_utils.add_complete_ingress_relation(self.harness)
self.harness.set_leader()
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
self.harness.add_relation_unit(
rel_id,
'keystone-k8s/1')
self.harness.container_pebble_ready('keystone')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.add_relation_unit(rel_id, "keystone-k8s/1")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.harness, test_utils.add_base_db_relation(self.harness)
)
self.km_mock.setup_keystone.assert_called_once_with()
self.km_mock.setup_initial_projects_and_users.assert_called_once_with()
def test_leader_rotate_fernet_keys(self):
"""Test leader fernet key rotation."""
test_utils.add_complete_ingress_relation(self.harness)
self.harness.set_leader()
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
self.harness.add_relation_unit(
rel_id,
'keystone-k8s/1')
self.harness.container_pebble_ready('keystone')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.add_relation_unit(rel_id, "keystone-k8s/1")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.harness, test_utils.add_base_db_relation(self.harness)
)
self.harness.charm._rotate_fernet_keys()
self.km_mock.rotate_fernet_keys.assert_called_once_with()
def test_not_leader_rotate_fernet_keys(self):
"""Test non-leader fernet keys."""
test_utils.add_complete_ingress_relation(self.harness)
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
self.harness.add_relation_unit(
rel_id,
'keystone-k8s/1')
self.harness.container_pebble_ready('keystone')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.add_relation_unit(rel_id, "keystone-k8s/1")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.harness, test_utils.add_base_db_relation(self.harness)
)
self.harness.charm._rotate_fernet_keys()
self.km_mock.rotate_fernet_keys.assert_not_called()
def test_on_heartbeat(self):
"""Test on_heartbeat calls."""
test_utils.add_complete_ingress_relation(self.harness)
self.harness.set_leader()
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
self.harness.add_relation_unit(
rel_id,
'keystone-k8s/1')
self.harness.container_pebble_ready('keystone')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.add_relation_unit(rel_id, "keystone-k8s/1")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.harness, test_utils.add_base_db_relation(self.harness)
)
self.harness.charm._on_heartbeat(None)
self.km_mock.rotate_fernet_keys.assert_called_once_with()
@ -272,6 +286,7 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
self.km_mock.rotate_fernet_keys.assert_called_once_with()
def test_launching_heartbeat(self):
"""Test launching a heartbeat."""
# verify that the heartbeat script is launched during initialisation
self.subprocess.Popen.assert_called_once_with(
["./src/heartbeat.sh"],
@ -280,44 +295,34 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
# implementation detail, but probably good to double check
self.subprocess.call.assert_called_once_with(
['pgrep', '-f', 'heartbeat']
["pgrep", "-f", "heartbeat"]
)
def test_non_leader_no_bootstraps(self):
"""Test bootstraping on a non-leader."""
test_utils.add_complete_ingress_relation(self.harness)
self.harness.set_leader(False)
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
self.harness.add_relation_unit(
rel_id,
'keystone-k8s/1')
self.harness.container_pebble_ready('keystone')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.add_relation_unit(rel_id, "keystone-k8s/1")
self.harness.container_pebble_ready("keystone")
test_utils.add_db_relation_credentials(
self.harness,
test_utils.add_base_db_relation(self.harness))
self.assertFalse(
self.km_mock.setup_keystone.called)
self.harness, test_utils.add_base_db_relation(self.harness)
)
self.assertFalse(self.km_mock.setup_keystone.called)
def test_password_storage(self):
"""Test storing password."""
self.harness.set_leader()
rel_id = self.harness.add_relation('peers', 'keystone-k8s')
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.charm.password_manager.store(
'test-user',
'foobar'
self.harness.charm.password_manager.store("test-user", "foobar")
self.assertEqual(
self.harness.charm.password_manager.retrieve("test-user"), "foobar"
)
self.assertEqual(
self.harness.charm.password_manager.retrieve(
'test-user'
),
'foobar'
)
self.assertEqual(
self.harness.charm.password_manager.retrieve(
'unknown-user'
),
None
self.harness.charm.password_manager.retrieve("unknown-user"), None
)
self.assertEqual(
@ -327,14 +332,15 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
),
{
"password_test-user": "foobar",
}
},
)
def test_get_service_account_action(self):
self.harness.add_relation('peers', 'keystone-k8s')
"""Test get_service_account action."""
self.harness.add_relation("peers", "keystone-k8s")
action_event = MagicMock()
action_event.params = {'username': 'external_service'}
action_event.params = {"username": "external_service"}
# Check call on non-lead unit.
self.harness.charm._get_service_account_action(action_event)
@ -346,20 +352,23 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
self.harness.set_leader()
self.harness.charm._get_service_account_action(action_event)
action_event.set_results.assert_called_with({
'username': 'external_service',
'password': 'randonpassword',
'user-domain-name': 'sdomain_name',
'project-name': 'aproject_name',
'project-domain-name': 'sdomain_name',
'region': 'RegionOne',
'internal-endpoint': 'http://10.0.0.10:5000',
'public-endpoint': 'http://10.0.0.10:5000',
'api-version': 3
})
action_event.set_results.assert_called_with(
{
"username": "external_service",
"password": "randonpassword",
"user-domain-name": "sdomain_name",
"project-name": "aproject_name",
"project-domain-name": "sdomain_name",
"region": "RegionOne",
"internal-endpoint": "http://10.0.0.10:5000",
"public-endpoint": "http://10.0.0.10:5000",
"api-version": 3,
}
)
def test_get_admin_account_action(self):
self.harness.add_relation('peers', 'keystone-k8s')
"""Test admin account action."""
self.harness.add_relation("peers", "keystone-k8s")
action_event = MagicMock()
self.harness.charm._get_admin_account_action(action_event)
@ -369,15 +378,17 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
self.harness.set_leader()
self.harness.charm._get_admin_account_action(action_event)
action_event.set_results.assert_called_with({
'username': 'admin',
'password': 'randonpassword',
'user-domain-name': 'admin_domain',
'project-name': 'admin',
'project-domain-name': 'admin_domain',
'region': 'RegionOne',
'internal-endpoint': 'http://10.0.0.10:5000',
'public-endpoint': 'http://10.0.0.10:5000',
'api-version': 3,
'openrc': ANY,
})
action_event.set_results.assert_called_with(
{
"username": "admin",
"password": "randonpassword",
"user-domain-name": "admin_domain",
"project-name": "admin",
"project-domain-name": "admin_domain",
"region": "RegionOne",
"internal-endpoint": "http://10.0.0.10:5000",
"public-endpoint": "http://10.0.0.10:5000",
"api-version": 3,
"openrc": ANY,
}
)

View File

@ -15,6 +15,8 @@ minversion = 3.18.0
src_path = {toxinidir}/src/
tst_path = {toxinidir}/tests/
lib_path = {toxinidir}/lib/
pyproject_toml = {toxinidir}/pyproject.toml
all_path = {[vars]src_path} {[vars]tst_path}
[testenv]
basepython = python3
@ -33,6 +35,15 @@ allowlist_externals =
deps =
-r{toxinidir}/test-requirements.txt
[testenv:fmt]
description = Apply coding style standards to code
deps =
black
isort
commands =
isort {[vars]all_path} --skip-glob {[vars]lib_path} --skip {toxinidir}/.tox
black --config {[vars]pyproject_toml} {[vars]all_path} --exclude {[vars]lib_path}
[testenv:build]
basepython = python3
deps =
@ -64,11 +75,6 @@ deps = {[testenv:py3]deps}
basepython = python3.10
deps = {[testenv:py3]deps}
[testenv:pep8]
basepython = python3
deps = {[testenv]deps}
commands = flake8 {posargs} {[vars]src_path} {[vars]tst_path}
[testenv:cover]
basepython = python3
deps = {[testenv:py3]deps}
@ -83,6 +89,31 @@ commands =
coverage xml -o cover/coverage.xml
coverage report
[testenv:pep8]
description = Alias for lint
deps = {[testenv:lint]deps}
commands = {[testenv:lint]commands}
[testenv:lint]
description = Check code against coding style standards
deps =
black
# flake8==4.0.1 # Pin version until https://github.com/csachs/pyproject-flake8/pull/14 is merged
flake8
flake8-docstrings
flake8-copyright
flake8-builtins
pyproject-flake8
pep8-naming
isort
codespell
commands =
codespell {[vars]all_path}
# pflake8 wrapper supports config from pyproject.toml
pflake8 --exclude {[vars]lib_path} --config {toxinidir}/pyproject.toml {[vars]all_path}
isort --check-only --diff {[vars]all_path} --skip-glob {[vars]lib_path}
black --config {[vars]pyproject_toml} --check --diff {[vars]all_path} --exclude {[vars]lib_path}
[testenv:func-noop]
basepython = python3
commands =