Add actions to regenerate password

Add regenerate-password action to regenerate
passwords for admin, charm or service user.

Remove KeystonepasswordManager class.

Update zaza-smoke-test job to mandatory.

Change-Id: I542894149b533da4cba5371f7412d4cbcc21e305
This commit is contained in:
Hemanth Nakkina 2022-12-06 16:03:33 +05:30
parent 4705d82fbe
commit 5009f76997
5 changed files with 142 additions and 157 deletions

View File

@ -2,14 +2,7 @@
templates: templates:
- openstack-python3-charm-yoga-jobs - openstack-python3-charm-yoga-jobs
- openstack-cover-jobs - openstack-cover-jobs
# - microk8s-func-test - microk8s-func-test
check:
jobs:
- charmbuild:
nodeset: ubuntu-focal
- zaza-smoke-test:
nodeset: ubuntu-focal
voting: false
vars: vars:
charm_build_name: keystone-k8s charm_build_name: keystone-k8s
juju_channel: 3.1/stable juju_channel: 3.1/stable

View File

@ -16,3 +16,14 @@ get-service-account:
required: required:
- username - username
additionalProperties: False additionalProperties: False
regenerate-password:
description: |
Regenerate password for the given user.
params:
username:
type: string
description: The username for the account.
required:
- username
additionalProperties: False

View File

@ -47,7 +47,6 @@ from ops.charm import (
RelationChangedEvent, RelationChangedEvent,
) )
from ops.framework import ( from ops.framework import (
Object,
StoredState, StoredState,
) )
from ops.main import ( from ops.main import (
@ -58,9 +57,6 @@ from ops.model import (
SecretNotFoundError, SecretNotFoundError,
SecretRotate, SecretRotate,
) )
from ops_sunbeam.interfaces import (
OperatorPeers,
)
from utils import ( from utils import (
manager, manager,
@ -202,43 +198,6 @@ class CloudCredentialsProvidesHandler(sunbeam_rhandlers.RelationHandler):
return True return True
class KeystonePasswordManager(Object):
"""Helper for management of keystone credential passwords."""
def __init__(self, charm: ops.charm.CharmBase, interface: OperatorPeers):
self.charm = charm
self.interface = interface
def store(self, username: str, password: str):
"""Store username and password."""
logging.debug(f"Storing password for {username}")
self.interface.set_app_data(
{
f"password_{username}": password,
}
)
def retrieve(self, username: str) -> str:
"""Retrieve persisted password for provided username."""
if not self.interface:
return None
password = self.interface.get_app_data(f"password_{username}")
return str(password) if password else None
def retrieve_or_set(self, username: str) -> str:
"""Retrieve or setup a password for a user.
New passwords will only be created by the lead unit of the
application.
"""
password = self.retrieve(username)
if not password and self.charm.unit.is_leader():
password = pwgen.pwgen(12)
self.store(username, password)
return password
class KeystoneOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm): class KeystoneOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
"""Charm the service.""" """Charm the service."""
@ -278,17 +237,37 @@ class KeystoneOperatorCharm(sunbeam_charm.OSBaseOperatorAPICharm):
self.framework.observe( self.framework.observe(
self.on.get_admin_password_action, self._get_admin_password_action self.on.get_admin_password_action, self._get_admin_password_action
) )
self.framework.observe( self.framework.observe(
self.on.get_admin_account_action, self._get_admin_account_action self.on.get_admin_account_action, self._get_admin_account_action
) )
self.password_manager = KeystonePasswordManager(self, self.peers)
self.framework.observe( self.framework.observe(
self.on.get_service_account_action, self.on.get_service_account_action,
self._get_service_account_action, self._get_service_account_action,
) )
self.framework.observe(
self.on.regenerate_password_action,
self._regenerate_password_action,
)
def _retrieve_or_set_secret(self, username: str, scope: dict = {}) -> str:
credentials_id = self.peers.get_app_data(f"credentials_{username}")
if credentials_id:
return credentials_id
password = pwgen.pwgen(12)
credentials_secret = self.model.app.add_secret(
{"username": username, "password": password},
label=f"credentials_{username}",
)
self.peers.set_app_data(
{
f"credentials_{username}": credentials_secret.id,
}
)
if "relation" in scope:
credentials_secret.grant(scope["relation"])
return credentials_secret.id
def _get_admin_password_action(self, event: ActionEvent) -> None: def _get_admin_password_action(self, event: ActionEvent) -> None:
if not self.unit.is_leader(): if not self.unit.is_leader():
@ -330,6 +309,90 @@ export OS_AUTH_VERSION=3
} }
) )
def _get_service_account_action(self, event: ActionEvent) -> None:
"""Create/get details for a service account.
This action handler will create a new services account
for the provided username. This account can be used
to provide access to OpenStack services from outside
of the Charmed deployment.
"""
if not self.unit.is_leader():
event.fail("Please run action on lead unit.")
return
# TODO: refactor into general helper method.
username = event.params["username"]
service_domain = self.keystone_manager.create_domain(
name="service_domain", may_exist=True
)
service_project = self.keystone_manager.get_project(
name=self.service_project, domain=service_domain
)
user_password = None
try:
credentials_id = self._retrieve_or_set_secret(username)
credentials = self.model.get_secret(id=credentials_id)
user_password = credentials.get_content().get("password")
except SecretNotFoundError:
logger.warning("Secret for {username} not found")
service_user = self.keystone_manager.create_user(
name=username,
password=user_password,
domain=service_domain.id,
may_exist=True,
)
admin_role = self.keystone_manager.create_role(
name=self.admin_role, may_exist=True
)
# TODO(wolsen) let's not always grant admin role!
self.keystone_manager.grant_role(
role=admin_role,
user=service_user,
project=service_project,
may_exist=True,
)
event.set_results(
{
"username": username,
"password": user_password,
"user-domain-name": service_domain.name,
"project-name": service_project.name,
"project-domain-name": service_domain.name,
"region": self.model.config["region"],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3,
}
)
def _regenerate_password_action(self, event: ActionEvent) -> None:
"""Regenerate password for a user account.
This action handler will update the user account
with a new password.
"""
if not self.unit.is_leader():
event.fail("Please run action on lead unit.")
return
username = event.params["username"]
try:
credentials_id = self._retrieve_or_set_secret(username)
credentials = self.model.get_secret(id=credentials_id)
password = pwgen.pwgen(12)
self.keystone_manager.update_user(name=username, password=password)
credentials.set_content(
{"username": username, "password": password}
)
event.set_results({"password": password})
except SecretNotFoundError:
event.fail(f"Secret for {username} not found")
except Exception as e:
event.fail(f"Regeneration of password failed: {e}")
def _on_peer_data_changed(self, event: RelationChangedEvent): def _on_peer_data_changed(self, event: RelationChangedEvent):
"""Check the peer data updates for updated fernet keys. """Check the peer data updates for updated fernet keys.
@ -469,6 +532,7 @@ export OS_AUTH_VERSION=3
event.secret.label == "fernet-keys" event.secret.label == "fernet-keys"
or event.secret.label == "credential-keys" or event.secret.label == "credential-keys"
or event.secret.label == f"credentials_{self.admin_user}" or event.secret.label == f"credentials_{self.admin_user}"
or event.secret.label == f"credentials_{self.charm_user}"
): ):
# TODO: Remove older revisions of the secret # TODO: Remove older revisions of the secret
# event.secret.remove_revision(event.revision) # event.secret.remove_revision(event.revision)
@ -703,85 +767,6 @@ export OS_AUTH_VERSION=3
region=self.model.config["region"], # XXX(wolsen) region matters? region=self.model.config["region"], # XXX(wolsen) region matters?
) )
def _get_service_account_action(self, event: ActionEvent) -> None:
"""Create/get details for a service account.
This action handler will create a new services account
for the provided username. This account can be used
to provide access to OpenStack services from outside
of the Charmed deployment.
"""
if not self.unit.is_leader():
event.fail("Please run action on lead unit.")
return
# TODO: refactor into general helper method.
username = event.params["username"]
service_domain = self.keystone_manager.create_domain(
name="service_domain", may_exist=True
)
service_project = self.keystone_manager.get_project(
name=self.service_project, domain=service_domain
)
user_password = None
try:
credentials_id = self._retrieve_or_set_secret(username)
credentials = self.model.get_secret(id=credentials_id)
user_password = credentials.get_content().get("password")
except SecretNotFoundError:
logger.warning("Secret for {username} not found")
service_user = self.keystone_manager.create_user(
name=username,
password=user_password,
domain=service_domain.id,
may_exist=True,
)
admin_role = self.keystone_manager.create_role(
name=self.admin_role, may_exist=True
)
# TODO(wolsen) let's not always grant admin role!
self.keystone_manager.grant_role(
role=admin_role,
user=service_user,
project=service_project,
may_exist=True,
)
event.set_results(
{
"username": username,
"password": user_password,
"user-domain-name": service_domain.name,
"project-name": service_project.name,
"project-domain-name": service_domain.name,
"region": self.model.config["region"],
"internal-endpoint": self.internal_endpoint,
"public-endpoint": self.public_endpoint,
"api-version": 3,
}
)
def _retrieve_or_set_secret(self, username: str, scope: dict = {}) -> str:
credentials_id = self.peers.get_app_data(f"credentials_{username}")
if credentials_id:
return credentials_id
password = pwgen.pwgen(12)
credentials_secret = self.model.app.add_secret(
{"username": username, "password": password},
label=f"credentials_{username}",
)
self.peers.set_app_data(
{
f"credentials_{username}": credentials_secret.id,
}
)
if "relation" in scope:
credentials_secret.grant(scope["relation"])
return credentials_secret.id
@property @property
def default_public_ingress_port(self): def default_public_ingress_port(self):
"""Default public ingress port.""" """Default public ingress port."""

View File

@ -584,6 +584,27 @@ class KeystoneManager(framework.Object):
return None return None
def update_user(
self,
name: str,
password: str,
email: str = None,
project: "Project" = None,
domain: "Domain" = None,
) -> "User":
"""Update password for user."""
user = self.get_user(name=name, domain=domain, project=project)
user = self.api.users.update(
user,
name=name,
default_project=project,
domain=domain,
password=password,
email=email,
)
logger.debug(f"Updated user {user.name}.")
return user
def create_role( def create_role(
self, self,
name: str, name: str,

View File

@ -460,31 +460,6 @@ class TestKeystoneOperatorCharm(test_utils.CharmTestCase):
) )
self.assertFalse(self.km_mock.setup_keystone.called) self.assertFalse(self.km_mock.setup_keystone.called)
def test_password_storage(self):
"""Test storing password."""
self.harness.set_leader()
rel_id = self.harness.add_relation("peers", "keystone-k8s")
self.harness.charm.password_manager.store("test-user", "foobar")
self.assertEqual(
self.harness.charm.password_manager.retrieve("test-user"), "foobar"
)
self.assertEqual(
self.harness.charm.password_manager.retrieve("unknown-user"), None
)
self.assertEqual(
self.harness.get_relation_data(
rel_id,
self.harness.charm.app.name,
),
{
"password_test-user": "foobar",
},
)
def test_get_service_account_action(self): def test_get_service_account_action(self):
"""Test get_service_account action.""" """Test get_service_account action."""
self.harness.add_relation("peers", "keystone-k8s") self.harness.add_relation("peers", "keystone-k8s")