Switch identity_user module to OpenStackModule

Change-Id: I61a43ac31f23758204d11fe72754a50993dcc294
This commit is contained in:
Artem Goncharov 2021-05-20 16:14:02 +02:00 committed by Artem Goncharov
parent 6b3bf3bba0
commit 28d32b53e8
1 changed files with 73 additions and 79 deletions

View File

@ -131,54 +131,12 @@ user:
type: str
sample: "demouser"
'''
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.openstack.cloud.plugins.module_utils.openstack import (openstack_full_argument_spec,
openstack_module_kwargs,
openstack_cloud_from_module)
from ansible_collections.openstack.cloud.plugins.module_utils.openstack import OpenStackModule
def _needs_update(params_dict, user):
for k in params_dict:
if k not in ('password', 'update_password') and user[k] != params_dict[k]:
return True
# We don't get password back in the user object, so assume any supplied
# password is a change.
if (
params_dict['password'] is not None
and params_dict['update_password'] == 'always'
):
return True
return False
def _get_domain_id(cloud, domain):
try:
# We assume admin is passing domain id
domain_id = cloud.get_domain(domain)['id']
except Exception:
# If we fail, maybe admin is passing a domain name.
# Note that domains have unique names, just like id.
try:
domain_id = cloud.search_domains(filters={'name': domain})[0]['id']
except Exception:
# Ok, let's hope the user is non-admin and passing a sane id
domain_id = domain
return domain_id
def _get_default_project_id(cloud, default_project, domain_id, module):
project = cloud.get_project(default_project, domain_id=domain_id)
if not project:
module.fail_json(msg='Default project %s is not valid' % default_project)
return project['id']
def main():
argument_spec = openstack_full_argument_spec(
class IdentityUserModule(OpenStackModule):
argument_spec = dict(
name=dict(required=True),
password=dict(required=False, default=None, no_log=True),
email=dict(required=False, default=None),
@ -190,47 +148,81 @@ def main():
update_password=dict(default=None, choices=['always', 'on_create']),
)
module_kwargs = openstack_module_kwargs()
module = AnsibleModule(
argument_spec,
**module_kwargs)
module_kwargs = dict()
name = module.params['name']
password = module.params.get('password')
email = module.params['email']
default_project = module.params['default_project']
domain = module.params['domain']
enabled = module.params['enabled']
state = module.params['state']
update_password = module.params['update_password']
description = module.params['description']
def _needs_update(self, params_dict, user):
for k in params_dict:
if k not in ('password', 'update_password') and user[k] != params_dict[k]:
return True
# We don't get password back in the user object, so assume any supplied
# password is a change.
if (
params_dict['password'] is not None
and params_dict['update_password'] == 'always'
):
return True
return False
def _get_domain_id(self, domain):
try:
# We assume admin is passing domain id
domain_id = self.conn.get_domain(domain)['id']
except Exception:
# If we fail, maybe admin is passing a domain name.
# Note that domains have unique names, just like id.
try:
domain_id = self.conn.search_domains(filters={'name': domain})[0]['id']
except Exception:
# Ok, let's hope the user is non-admin and passing a sane id
domain_id = domain
return domain_id
def _get_default_project_id(self, default_project, domain_id):
project = self.conn.get_project(default_project, domain_id=domain_id)
if not project:
self.fail_json(msg='Default project %s is not valid' % default_project)
return project['id']
def run(self):
name = self.params['name']
password = self.params.get('password')
email = self.params['email']
default_project = self.params['default_project']
domain = self.params['domain']
enabled = self.params['enabled']
state = self.params['state']
update_password = self.params['update_password']
description = self.params['description']
sdk, cloud = openstack_cloud_from_module(module)
try:
domain_id = None
if domain:
domain_id = _get_domain_id(cloud, domain)
user = cloud.get_user(name, domain_id=domain_id)
domain_id = self._get_domain_id(domain)
user = self.conn.get_user(name, domain_id=domain_id)
else:
user = cloud.get_user(name)
user = self.conn.get_user(name)
if state == 'present':
if update_password in ('always', 'on_create'):
if not password:
msg = "update_password is %s but a password value is missing" % update_password
module.fail_json(msg=msg)
self.fail_json(msg=msg)
default_project_id = None
if default_project:
default_project_id = _get_default_project_id(cloud, default_project, domain_id, module)
default_project_id = self._get_default_project_id(
default_project, domain_id)
if user is None:
if description is not None:
user = cloud.create_user(
user = self.conn.create_user(
name=name, password=password, email=email,
default_project=default_project_id, domain_id=domain_id,
enabled=enabled, description=description)
else:
user = cloud.create_user(
user = self.conn.create_user(
name=name, password=password, email=email,
default_project=default_project_id, domain_id=domain_id,
enabled=enabled)
@ -246,47 +238,49 @@ def main():
if default_project_id is not None:
params_dict['default_project_id'] = default_project_id
if _needs_update(params_dict, user):
if self._needs_update(params_dict, user):
if update_password == 'always':
if description is not None:
user = cloud.update_user(
user = self.conn.update_user(
user['id'], password=password, email=email,
default_project=default_project_id,
domain_id=domain_id, enabled=enabled, description=description)
else:
user = cloud.update_user(
user = self.conn.update_user(
user['id'], password=password, email=email,
default_project=default_project_id,
domain_id=domain_id, enabled=enabled)
else:
if description is not None:
user = cloud.update_user(
user = self.conn.update_user(
user['id'], email=email,
default_project=default_project_id,
domain_id=domain_id, enabled=enabled, description=description)
else:
user = cloud.update_user(
user = self.conn.update_user(
user['id'], email=email,
default_project=default_project_id,
domain_id=domain_id, enabled=enabled)
changed = True
else:
changed = False
module.exit_json(changed=changed, user=user)
self.exit_json(changed=changed, user=user)
elif state == 'absent':
if user is None:
changed = False
else:
if domain:
cloud.delete_user(user['id'], domain_id=domain_id)
self.conn.delete_user(user['id'], domain_id=domain_id)
else:
cloud.delete_user(user['id'])
self.conn.delete_user(user['id'])
changed = True
module.exit_json(changed=changed)
self.exit_json(changed=changed)
except sdk.exceptions.OpenStackCloudException as e:
module.fail_json(msg=str(e), extra_data=e.extra_data)
def main():
module = IdentityUserModule()
module()
if __name__ == '__main__':