Use UserIRR class to manage user

Use ops_sunbeam common class to manage user.

Change-Id: I27121d07d317132c1655ede6a81e7d4e25546dd8
This commit is contained in:
Guillaume Boutry 2023-10-26 08:44:45 +02:00
parent 54aa9fec6c
commit 6b8bf0578d
2 changed files with 44 additions and 184 deletions

View File

@ -17,36 +17,24 @@
This charm provide Magnum services as part of an OpenStack deployment This charm provide Magnum services as part of an OpenStack deployment
""" """
import hashlib
import json
import logging import logging
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
List, List,
) )
import charms.keystone_k8s.v0.identity_resource as identity_resource import ops
import ops_sunbeam.charm as sunbeam_charm import ops_sunbeam.charm as sunbeam_charm
import ops_sunbeam.config_contexts as sunbeam_config_contexts import ops_sunbeam.config_contexts as sunbeam_config_contexts
import ops_sunbeam.container_handlers as sunbeam_chandlers import ops_sunbeam.container_handlers as sunbeam_chandlers
import ops_sunbeam.core as sunbeam_core import ops_sunbeam.core as sunbeam_core
import ops_sunbeam.relation_handlers as sunbeam_rhandlers import ops_sunbeam.relation_handlers as sunbeam_rhandlers
import pwgen
from ops.charm import (
RelationEvent,
)
from ops.framework import ( from ops.framework import (
StoredState, StoredState,
) )
from ops.main import ( from ops.main import (
main, main,
) )
from ops.model import (
ModelError,
Relation,
SecretNotFoundError,
SecretRotate,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -61,9 +49,17 @@ class MagnumConfigurationContext(sunbeam_config_contexts.ConfigContext):
if TYPE_CHECKING: if TYPE_CHECKING:
charm: "MagnumOperatorCharm" charm: "MagnumOperatorCharm"
@property
def ready(self) -> bool:
"""Whether the context has all the data is needs."""
return self.charm.user_id_ops.ready
def context(self) -> dict: def context(self) -> dict:
"""Magnum configuration context.""" """Magnum configuration context."""
username, password = self.charm.domain_admin_credentials credentials = self.charm.user_id_ops.get_config_credentials()
if credentials is None:
return {}
username, password = credentials
return { return {
"domain_name": self.charm.domain_name, "domain_name": self.charm.domain_name,
"domain_admin_user": username, "domain_admin_user": username,
@ -195,20 +191,24 @@ class MagnumOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
_cadapters.extend([MagnumConfigurationContext(self, "magnum")]) _cadapters.extend([MagnumConfigurationContext(self, "magnum")])
return _cadapters return _cadapters
def hash_ops(self, ops: list) -> str:
"""Return the sha1 of the requested ops."""
return hashlib.sha1(json.dumps(ops).encode()).hexdigest()
def get_relation_handlers(self) -> List[sunbeam_rhandlers.RelationHandler]: def get_relation_handlers(self) -> List[sunbeam_rhandlers.RelationHandler]:
"""Relation handlers for the service.""" """Relation handlers for the service."""
handlers = super().get_relation_handlers() handlers = super().get_relation_handlers()
self.id_ops = sunbeam_rhandlers.IdentityResourceRequiresHandler( self.user_id_ops = (
self, sunbeam_rhandlers.UserIdentityResourceRequiresHandler(
"identity-ops", self,
self.handle_keystone_ops, "identity-ops",
mandatory="identity-ops" in self.mandatory_relations, self.configure_charm,
mandatory="identity-ops" in self.mandatory_relations,
name=self.domain_admin_user,
domain=self.domain_name,
role="admin",
add_suffix=True,
extra_ops=self._get_create_role_ops(),
extra_ops_process=self._handle_create_role_response,
)
) )
handlers.append(self.id_ops) handlers.append(self.user_id_ops)
return handlers return handlers
@property @property
@ -265,159 +265,28 @@ class MagnumOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
"""User to manage users and projects in domain_name.""" """User to manage users and projects in domain_name."""
return "magnum_domain_admin" return "magnum_domain_admin"
@property def _get_create_role_ops(self) -> list:
def domain_admin_credentials(self) -> tuple: """Generate ops request for create role."""
"""Credentials for domain admin user.""" return [
credentials_id = self._get_domain_admin_credentials_secret()
credentials = self.model.get_secret(id=credentials_id)
username = credentials.get_content().get("username")
user_password = credentials.get_content().get("password")
return (username, user_password)
def _get_domain_admin_credentials_secret(self) -> str:
"""Get domain admin secret."""
label = f"{CREDENTIALS_SECRET_PREFIX}{self.domain_admin_user}"
credentials_id = self.peers.get_app_data(label)
if not credentials_id:
credentials_id = self._retrieve_or_set_secret(
self.domain_admin_user,
)
return credentials_id
def _grant_domain_admin_credentials_secret(
self,
relation: Relation,
) -> None:
"""Grant secret access to the related units."""
credentials_id = None
try:
credentials_id = self._get_domain_admin_credentials_secret()
secret = self.model.get_secret(id=credentials_id)
logger.debug(
f"Granting access to secret {credentials_id} for relation "
f"{relation.app.name} {relation.name}/{relation.id}"
)
secret.grant(relation)
except (ModelError, SecretNotFoundError) as e:
logger.debug(
f"Error during granting access to secret {credentials_id} for "
f"relation {relation.app.name} {relation.name}/{relation.id}: "
f"{str(e)}"
)
def _retrieve_or_set_secret(
self,
username: str,
rotate: SecretRotate = SecretRotate.NEVER,
add_suffix_to_username: bool = False,
) -> str:
"""Retrieve or create a secret."""
label = f"{CREDENTIALS_SECRET_PREFIX}{username}"
credentials_id = self.peers.get_app_data(label)
if credentials_id:
return credentials_id
password = pwgen.pwgen(12)
if add_suffix_to_username:
suffix = pwgen.pwgen(6)
username = f"{username}-{suffix}"
credentials_secret = self.model.app.add_secret(
{"username": username, "password": password},
label=label,
rotate=rotate,
)
self.peers.set_app_data(
{ {
label: credentials_secret.id, "name": "create_role",
"params": {"name": "magnum_domain_admin"},
} }
)
return credentials_secret.id
def _get_magnum_domain_ops(self) -> list:
"""Generate ops request for domain setup."""
credentials_id = self._get_domain_admin_credentials_secret()
ops = [
# Create domain magnum
{
"name": "create_domain",
"params": {"name": "magnum", "enable": True},
},
# Create role magnum_domain_admin
{"name": "create_role", "params": {"name": "magnum_domain_admin"}},
# Create user magnum
{
"name": "create_user",
"params": {
"name": self.domain_admin_user,
"password": credentials_id,
"domain": "magnum",
},
},
# Grant role admin to magnum_domain_admin user
{
"name": "grant_role",
"params": {
"role": "admin",
"domain": "magnum",
"user": self.domain_admin_user,
"user_domain": "magnum",
},
},
] ]
return ops
def _handle_initial_magnum_domain_setup_response( def _handle_create_role_response(
self, self, event: ops.EventBase, response: dict
event: RelationEvent,
) -> None: ) -> None:
"""Handle domain setup response from identity-ops.""" """Handle response from identity-ops."""
logger.info("%r", response)
if { if {
op.get("return-code") op.get("return-code")
for op in self.id_ops.interface.response.get( for op in response.get("ops", [])
"ops", if op.get("name") == "create_role"
[],
)
} == {0}: } == {0}:
logger.debug( logger.debug("Magnum domain admin role has been created.")
"Initial magnum domain setup commands completed," else:
" running configure charm" logger.warning("Magnum domain admin role creation failed.")
)
self.configure_charm(event)
def handle_keystone_ops(self, event: RelationEvent) -> None:
"""Event handler for identity ops."""
if isinstance(event, identity_resource.IdentityOpsProviderReadyEvent):
self._state.identity_ops_ready = True
if not self.unit.is_leader():
return
# Send op request only by leader unit
ops = self._get_magnum_domain_ops()
id_ = self.hash_ops(ops)
self._grant_domain_admin_credentials_secret(event.relation)
request = {
"id": id_,
"tag": "initial_magnum_domain_setup",
"ops": ops,
}
logger.debug(f"Sending ops request: {request}")
self.id_ops.interface.request_ops(request)
elif isinstance(
event,
identity_resource.IdentityOpsProviderGoneAwayEvent,
):
self._state.identity_ops_ready = False
elif isinstance(event, identity_resource.IdentityOpsResponseEvent):
if not self.unit.is_leader():
return
response = self.id_ops.interface.response
logger.debug(f"Got response from keystone: {response}")
request_tag = response.get("tag")
if request_tag == "initial_magnum_domain_setup":
self._handle_initial_magnum_domain_setup_response(event)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -19,6 +19,9 @@
import json import json
import ops_sunbeam.test_utils as test_utils import ops_sunbeam.test_utils as test_utils
from mock import (
Mock,
)
from ops.testing import ( from ops.testing import (
Harness, Harness,
) )
@ -81,20 +84,8 @@ class TestMagnumOperatorCharm(test_utils.CharmTestCase):
"""Add complete Identity resource relation.""" """Add complete Identity resource relation."""
rel_id = harness.add_relation("identity-ops", "keystone") rel_id = harness.add_relation("identity-ops", "keystone")
harness.add_relation_unit(rel_id, "keystone/0") harness.add_relation_unit(rel_id, "keystone/0")
ops = harness.charm._get_magnum_domain_ops() harness.charm.user_id_ops.get_config_credentials = Mock(
id_ = harness.charm.hash_ops(ops) return_value=("test", "test")
harness.update_relation_data(
rel_id,
"keystone/0",
{
"request": json.dumps(
{
"id": id_,
"tag": "initial_magnum_domain_setup",
"ops": ops,
}
)
},
) )
harness.update_relation_data( harness.update_relation_data(
rel_id, rel_id,
@ -102,7 +93,7 @@ class TestMagnumOperatorCharm(test_utils.CharmTestCase):
{ {
"response": json.dumps( "response": json.dumps(
{ {
"id": id_, "id": 1,
"tag": "initial_magnum_domain_setup", "tag": "initial_magnum_domain_setup",
"ops": [{"name": "create_domain", "return-code": 0}], "ops": [{"name": "create_domain", "return-code": 0}],
} }