Merge "Implement Security Compiance option for password"

This commit is contained in:
Zuul
2020-02-19 14:50:17 +00:00
committed by Gerrit Code Review
12 changed files with 587 additions and 13 deletions

View File

@@ -187,6 +187,47 @@ Here are the essential commands (filenames are arbitrary):
See appendix [Policy Overrides][cdg-appendix-n] in the [OpenStack Charms
Deployment Guide][cdg] for a thorough treatment of this feature.
## Security Compliance config option "password-security-compliance"
The `password-security-compliance` configuration option sets the
`[security_compliance]` section of Keystone's configuration file.
The configuration option is a YAML dictionary, that is one level deep, with the
following keys (and value formats).
```yaml
lockout_failure_attempts: <int>
lockout_duration: <int>
disable_user_account_days_inactive: <int>
change_password_upon_first_use: <boolean>
password_expires_days: <int>
password_regex: <string>
password_regex_description: <string>
unique_last_password_count: <int>
minimum_password_age: <int>
```
It can be set by placing the keys and values in a file and then using the Juju
command:
juju config keystone --file path/to/config.yaml
Note that, in this, case the `config.yaml` file requires the YAML key
`password-security-compliance:` with the desired config keys and values on the
following lines, indented for a dictionary.
> **Note**: Please ensure that the page [Security compliance and PCI-DSS][SCPD]
is consulted before setting these option.
The charm will protect service accounts (accounts requested by other units that
are in the service domain) against being forced to change their password.
Operators should also ensure that any other accounts are protected as per the
above referenced note.
If the config value cannot be parsed as YAML and/or the options are not
able to be parsed as their indicated types then the charm will enter a blocked
state until the config value is changed.
## Token Support
As the keystone charm supports multiple releases of the OpenStack software, it
@@ -294,3 +335,4 @@ For general charm questions refer to the OpenStack [Charm Guide][cg].
[cdg]: https://docs.openstack.org/project-deploy-guide/charm-deployment-guide
[cdg-appendix-n]: https://docs.openstack.org/project-deploy-guide/charm-deployment-guide/latest/app-policy-overrides.html
[lp-bugs-charm-keystone]: https://bugs.launchpad.net/charm-keystone/+filebug
[SCPD]: https://docs.openstack.org/keystone/latest/admin/configuration.html#security-compliance-and-pci-dss

View File

@@ -416,3 +416,17 @@ options:
override YAML files in the service's policy.d directory. The resource
file should be a ZIP file containing at least one yaml file with a .yaml
or .yml extension. If False then remove the overrides.
password-security-compliance:
type: string
default:
description: |
This config item, if set, is used to configure the [security_compliance]
section of the keystone.conf file. This is used to control how the
security around passwords is manged. Please refer to the charm README
for full details.
.
The config item should be a YAML complaint string that sets the various
allowed options in [security_compliance]. The settings are checked by
the charm, but it's possible that it may break things unexpectedly.
Please ensure that the the README and relevant documentation is consulted
before setting this configuration option.

View File

@@ -14,6 +14,7 @@
import os
import json
import yaml
from charmhelpers.contrib.openstack import context
@@ -24,6 +25,7 @@ from charmhelpers.contrib.hahelpers.cluster import (
)
from charmhelpers.core.hookenv import (
cached,
charm_dir,
config,
log,
@@ -213,6 +215,10 @@ class KeystoneContext(context.OSContextGenerator):
flags = context.config_flags_parser(ldap_flags)
ctxt['ldap_config_flags'] = flags
ctxt['password_security_compliance'] = (
self._decode_password_security_compliance_string(
config('password-security-compliance')))
# Base endpoint URL's which are used in keystone responses
# to unauthenticated requests to redirect clients to the
# correct auth URL.
@@ -237,6 +243,79 @@ class KeystoneContext(context.OSContextGenerator):
return ctxt
ALLOWED_SECURITY_COMPLIANCE_SCHEMA = {
'lockout_failure_attempts': int,
'lockout_duration': int,
'disable_user_account_days_inactive': int,
'change_password_upon_first_use': bool,
'password_expires_days': int,
'password_regex': str,
'password_regex_description': str,
'unique_last_password_count': int,
'minimum_password_age': int,
}
@classmethod
@cached
def _decode_password_security_compliance_string(cls, maybe_yaml):
"""Decode string to dict for 'password-security-compliance'
Perform some validation on the string and return either None,
if the string is not valid, or a dictionary of the security
compliance keys and values.
:param maybe_yaml: the config item that is (hopefully) YAML format
:type maybe_yaml: str
:returns: a dictionary of keys: values or None if the value is not
valid.
:rtype: Optional[Dict[str, Union[str, int, bool]]]
"""
cmp_release = CompareOpenStackReleases(os_release('keystone'))
if cmp_release < 'newton':
log("'password-security-compliance' isn't valid for releases "
"before Newton.",
level='ERROR')
return None
try:
config_items = yaml.safe_load(maybe_yaml)
except Exception as e:
log("Couldn't decode config value for "
"'password-security-compliance': Invalid YAML?: {}"
.format(str(e)),
level='ERROR')
return None
# ensure that the top level is a dictionary.
if type(config_items) != dict:
log("Couldn't decode config value for "
"'password-security-compliance'. It doesn't appear to be a "
"dictionary: {}".format(str(config_items)),
level='ERROR')
return None
# check that the keys present are valid ones.
config_keys = config_items.keys()
allowed_keys = cls.ALLOWED_SECURITY_COMPLIANCE_SCHEMA.keys()
invalid_keys = [k for k in config_keys if k not in allowed_keys]
if invalid_keys:
log("Invalid config key(s) found in config "
"'password-security-compliance' setting: {}"
.format(", ".join(invalid_keys)),
level='ERROR')
return None
# check that the types are valid
valid_types = cls.ALLOWED_SECURITY_COMPLIANCE_SCHEMA
invalid_types = {k: (type(v) != valid_types[k])
for k, v in config_items.items()}
if any(invalid_types.values()):
log("Invalid config value type(s) found in config "
"'password-security-compliance' setting: {}"
.format(", ".join(["{}: {} -- should be an {}"
.format(k, type(config_items[k]).__name__,
valid_types[k].__name__)
for k, v in invalid_types.items()])),
level='ERROR')
return None
return config_items
class KeystoneLoggingContext(context.OSContextGenerator):

View File

@@ -85,6 +85,7 @@ from keystone_context import fernet_enabled
from keystone_utils import (
add_service_to_keystone,
ensure_all_service_accounts_protected_for_pci_dss_options,
add_credentials_to_keystone,
determine_packages,
disable_unused_apache_sites,
@@ -711,6 +712,9 @@ def upgrade_charm():
log('Cluster leader - ensuring endpoint configuration is up to '
'date', level=DEBUG)
update_all_identity_relation_units()
# also ensure that the PCI-DSS protection is in place for service
# accounts.
ensure_all_service_accounts_protected_for_pci_dss_options()
# call the policy overrides handler which will install any policy overrides
maybe_do_policyd_overrides(os_release('keystone'), 'keystone')

View File

@@ -1052,6 +1052,44 @@ def create_user(name, password, tenant=None, domain=None):
.format(name, tenant_id, domain_id), level=DEBUG)
def get_user_dict(user, **kwargs):
"""Delegate update_user call to the manager
:param user: the user to fetch
:type user: str
:returns: a dictionary of the user keys:values
:rtype: Optional[Dict[str, ANY]]
"""
manager = get_manager()
return manager.get_user_details_dict(user, **kwargs)
def update_user(user, **kwargs):
"""Delegate update_user call to the manager
:param user: the user to modify
:type user: str
:returns: a dictionary of the user keys:values after the update
:rtype: Dict[str, ANY]
"""
manager = get_manager()
return manager.update_user(user, **kwargs)
def list_users_for_domain(domain=None, domain_id=None):
"""Delegate list_users_for_domain to the manager
:param domain: The domain name.
:type domain: Optional[str]
:param domain_id: The domain_id string
:type domain_id: Optional[str]
:returns: a list of user dictionaries in the domain
:rtype: List[Dict[str, ANY]]
"""
manager = get_manager()
return manager.list_users_for_domain(domain, domain_id)
def get_manager(api_version=None):
return KeystoneManagerProxy(api_version=api_version)
@@ -1599,9 +1637,66 @@ def create_service_credentials(user, new_roles=None):
tenant=tenant, new_roles=new_roles,
grants=[config('admin-role')],
domain=SERVICE_DOMAIN)
# protect the user from pci_dss password shenanigans
protect_user_account_from_pci_dss_force_change_password(user)
return passwd
def protect_user_account_from_pci_dss_force_change_password(user_name):
"""Protect the user account against forcing a password change option
The PCI-DSS inspired option `change_password_upon_first_use` causes the
user to have to change their login password on first use. Obviously, this
is a disaster for service accounts. This function sets the option
`ignore_change_password_upon_first_use` on the specified user account so
that service accounts do not get locked out of the cloud.
It also sets the 'ignore_password_expiry' to ensure that passwords do not
get expired.
This is only applied in a keystone v3 environment, as the PCI-DSS options
are only supported on keystone v3 onwards.
:param user_name: the user to apply the protected option to.
:type user_name: str
"""
if get_api_version() < 3:
return
tenant = config('service-tenant')
if not tenant:
raise ValueError("No service tenant provided in config")
for domain in (DEFAULT_DOMAIN, SERVICE_DOMAIN):
user = get_user_dict(user_name, domain=domain)
if user is None:
log("User {} in domain {} doesn't exist. Can't set "
"'ignore_change_password_upon_first_use' option True for it."
.format(user_name, domain))
continue
options = user.get('options', {})
ignore_option = options.get('ignore_change_password_upon_first_use',
False)
ignore_password_option = options.get('ignore_password_expiry', False)
if ignore_option is False or ignore_password_option is False:
options['ignore_change_password_upon_first_use'] = True
options['ignore_password_expiry'] = True
log("Setting 'ignore_change_password_upon_first_use' and "
"'ignore_password_expiry' to True for"
"user {} in domain {}.".format(user_name, domain))
update_user(user['id'], options=options)
def ensure_all_service_accounts_protected_for_pci_dss_options():
"""This function ensures that the 'ignore_change_password_upon_first_use'
is set for all of the accounts in the SERVICE_DOMAIN, and then the
DEFAULT_DOMAIN.
"""
if get_api_version() < 3:
return
log("Ensuring all service users are protected from PCI-DSS options")
users = list_users_for_domain(domain=SERVICE_DOMAIN)
for user in users:
protect_user_account_from_pci_dss_force_change_password(user['name'])
def add_service_to_keystone(relation_id=None, remote_unit=None):
manager = get_manager()
settings = relation_get(rid=relation_id, unit=remote_unit)
@@ -2063,7 +2158,7 @@ def get_optional_interfaces():
return optional_interfaces
def check_optional_relations(configs):
def check_extra_for_assess_status(configs):
"""Check that if we have a relation_id for high availability that we can
get the hacluster config. If we can't then we are blocked. This function
is called from assess_status/set_os_workload_status as the charm_func and
@@ -2080,6 +2175,12 @@ def check_optional_relations(configs):
return ('blocked',
'hacluster missing configuration: '
'vip, vip_iface, vip_cidr')
# verify that the config item, if set, is actually usable and valid
conf = config('password-security-compliance')
if (conf and (keystone_context.KeystoneContext
._decode_password_security_compliance_string(conf) is None)):
return ('blocked',
"'password-security-compliance' is invalid")
# return 'unknown' as the lowest priority to not clobber an existing
# status.
return 'unknown', ''
@@ -2124,7 +2225,7 @@ def assess_status_func(configs, exclude_ha_resource=False):
determine_ports())
return make_assess_status_func(
configs, required_interfaces,
charm_func=check_optional_relations,
charm_func=check_extra_for_assess_status,
services=_services,
ports=_ports)

View File

@@ -406,6 +406,89 @@ class KeystoneManager3(KeystoneManager):
def update_password(self, user, password):
self.api.users.update(user, password=password)
def get_user_details_dict(self, user, **kwargs):
"""Get the user details dictionary for a user.
This fetches the user details for a user and domain or domain_id.
It uses the lowercase name for the user; all users as far as the
keystone charm are concerned are the same if lower cased.
:param user: the user name to look for.
:type user: str
:returns: a dictionary of key:value pairs representing the user
:rtype: Optional[Dict[str, ANY]]
:raises: RuntimeError if no domain or domain_id is passed.
ValueError if the domain_id cannot be resolved
"""
domain_id = kwargs.get('domain_id', None)
domain = kwargs.get('domain', None)
if not domain_id:
if not domain:
raise RuntimeError(
"Can't resolve a domain as no domain or domain_id "
"supplid.")
domain_id = manager.resolve_domain_id(domain)
if not domain_id:
raise ValueError(
'Could not resolve domain_id for {} when checking if '
' user {} exists'.format(domain, user))
for u in self.api.users.list(domain=domain_id):
if user.lower() == u.name.lower():
if domain_id == u.domain_id:
return u.to_dict()
return None
def update_user(self, user, **kwargs):
"""Update the user with data from the **kwargs.
It is the responsibility of the caller to fully define the user
that needs to be udpated. e.g. preferably the user is a
:class:`keystoneclient.v3.users.User`
:param user: The user to be updated.
:type user: Union[str, keystoneclient.v3.users.User]
:params **kwargs: the keys, values to be udpated.
:type **kwargs: Dict[str, str]
:returns: the dictionary representation of the updated user
:rtype: Dict[str, ANY]
"""
res = self.api.users.update(user, **kwargs)
return res.to_dict()
def list_users_for_domain(self, domain=None, domain_id=None):
"""Return a list of all the users in a domain.
This returns a list of the users in the specified domain_id or
domain_id resolved from the domain name. The return value is a
restricted list of dictionary items:
{
'name': <str>
'id': <str>
}
One of either the :param:`domain` or :param:`domain_id` must be
supplied or otherwise the function raises a RuntimeError.
:param domain: The domain name.
:type domain: Optional[str]
:param domain_id: The domain_id string
:type domain_id: Optional[str]
:returns: a list of user dictionaries in the domain
:rtype: List[Dict[str, ANY]]
:raises: RuntimeError if no domain or domain_id is passed.
ValueError if the domain_id cannot be resolved from the domain
"""
if domain is None and domain_id is None:
raise RuntimeError("Must supply either domain or domain_id param")
domain_id = domain_id or manager.resolve_domain_id(domain)
if domain_id is None:
raise ValueError(
'Could not resolve domain_id for {}.'.format(domain))
users = [{'name': u.name, 'id': u.id}
for u in self.api.users.list(domain=domain_id)]
return users
def roles_for_user(self, user_id, tenant_id=None, domain_id=None):
# Specify either a domain or project, not both
if domain_id:
@@ -547,6 +630,7 @@ if __name__ == '__main__':
# endless loop whilst we process messages from the caller
while True:
try:
result = None
data = uds_client.receive()
if data == "QUIT" or data is None:
break
@@ -587,8 +671,9 @@ if __name__ == '__main__':
traceback.print_exc()
result = {'error': str(e)}
finally:
result_json = json.dumps(result, **JSON_ENCODE_OPTIONS)
uds_client.send(result_json)
if result is not None:
result_json = json.dumps(result, **JSON_ENCODE_OPTIONS)
uds_client.send(result_json)
# normal exit
exit(0)

View File

@@ -115,12 +115,19 @@ group_allow_delete = False
{% endif -%}
{% endif -%}
{% if api_version == 3 -%}
{% if api_version == 3 %}
[resource]
admin_project_domain_name = {{ admin_domain_name }}
admin_project_name = admin
{% endif -%}
{% if password_security_compliance %}
[security_compliance]
{% for k, v in password_security_compliance.items() -%}
{{ k }} = {{ v }}
{% endfor -%}
{% endif -%}
{% include "parts/section-federation" %}
{% include "section-oslo-middleware" %}

View File

@@ -114,12 +114,19 @@ group_allow_delete = False
{% endif -%}
{% endif -%}
{% if api_version == 3 -%}
{% if api_version == 3 %}
[resource]
admin_project_domain_name = {{ admin_domain_name }}
admin_project_name = admin
{% endif -%}
{% if password_security_compliance %}
[security_compliance]
{% for k, v in password_security_compliance.items() -%}
{{ k }} = {{ v }}
{% endfor -%}
{% endif -%}
{% include "parts/section-federation" %}
{% include "section-oslo-middleware" %}

View File

@@ -98,12 +98,19 @@ group_allow_delete = False
{% endif -%}
{% endif -%}
{% if api_version == 3 -%}
{% if api_version == 3 %}
[resource]
admin_project_domain_name = {{ admin_domain_name }}
admin_project_name = admin
{% endif -%}
{% if password_security_compliance %}
[security_compliance]
{% for k, v in password_security_compliance.items() -%}
{{ k }} = {{ v }}
{% endfor -%}
{% endif -%}
{% include "parts/section-federation" %}
{% include "section-oslo-middleware" %}

View File

@@ -16,7 +16,7 @@ import collections
import importlib
import os
from mock import patch, MagicMock
from mock import patch, MagicMock, ANY
with patch('charmhelpers.contrib.openstack.'
'utils.snap_install_requested') as snap_install_requested:
snap_install_requested.return_value = False
@@ -506,3 +506,78 @@ class TestKeystoneContexts(CharmTestCase):
[u'simple_token_secret', u'foobar']]}}
self.assertEqual(ctxt(), exp)
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_pre_newton(
self, mock_log):
self.os_release.return_value = 'mitaka'
self.assertIsNone(
context.
KeystoneContext.
_decode_password_security_compliance_string(""))
mock_log.assert_called_once_with(ANY, level='ERROR')
self.assertIn("Newton", mock_log.call_args.args[0])
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_invalid_yaml(
self, mock_log):
self.os_release.return_value = 'ocata'
self.assertIsNone(
context.
KeystoneContext.
_decode_password_security_compliance_string("hello: this: one"))
mock_log.assert_called_once_with(ANY, level='ERROR')
self.assertIn("Invalid YAML", mock_log.call_args.args[0])
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_yaml_not_dict(
self, mock_log):
self.os_release.return_value = 'pike'
self.assertIsNone(
context.
KeystoneContext.
_decode_password_security_compliance_string("hello"))
mock_log.assert_called_once_with(ANY, level='ERROR')
self.assertIn("dictionary", mock_log.call_args.args[0])
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_invalid_key(
self, mock_log):
self.os_release.return_value = 'queens'
self.assertIsNone(
context.
KeystoneContext.
_decode_password_security_compliance_string(
"lockout_failure_attempts: 5\nlookout_duration: 180\n"))
mock_log.assert_called_once_with(ANY, level='ERROR')
self.assertIn("Invalid config key(s)", mock_log.call_args.args[0])
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_invalid_type(
self, mock_log):
self.os_release.return_value = 'rocky'
self.assertIsNone(
context.
KeystoneContext.
_decode_password_security_compliance_string(
"lockout_failure_attempts: hello"))
mock_log.assert_called_once_with(ANY, level='ERROR')
self.assertIn("Invalid config value", mock_log.call_args.args[0])
@patch.object(context, 'log')
def test__decode_password_security_compliance_string_valid(
self, mock_log):
self.os_release.return_value = 'stein'
ctxt = (context.
KeystoneContext.
_decode_password_security_compliance_string(
"lockout_failure_attempts: 5\n"
"lockout_duration: 180\n"
"password_expires_days: 30\n"))
mock_log.assert_not_called()
self.assertEqual(ctxt,
{
"lockout_failure_attempts": 5,
"lockout_duration": 180,
"password_expires_days": 30,
})

View File

@@ -582,6 +582,8 @@ class KeystoneRelationTests(CharmTestCase):
cmd = ['a2dissite', 'openstack_https_frontend']
self.check_call.assert_called_with(cmd)
@patch.object(hooks,
'ensure_all_service_accounts_protected_for_pci_dss_options')
@patch.object(hooks, 'maybe_do_policyd_overrides')
@patch.object(hooks, 'update_all_identity_relation_units')
@patch.object(utils, 'os_release')
@@ -598,7 +600,8 @@ class KeystoneRelationTests(CharmTestCase):
mock_is_db_ready,
os_release,
update,
mock_maybe_do_policyd_overrides):
mock_maybe_do_policyd_overrides,
mock_protect_service_accounts):
os_release.return_value = 'havana'
mock_is_db_initialised.return_value = True
mock_is_db_ready.return_value = True
@@ -615,7 +618,10 @@ class KeystoneRelationTests(CharmTestCase):
mock_stop_manager_instance.assert_called_once_with()
mock_maybe_do_policyd_overrides.assert_called_once_with(
ANY, "keystone")
mock_protect_service_accounts.assert_called_once_with()
@patch.object(hooks,
'ensure_all_service_accounts_protected_for_pci_dss_options')
@patch.object(hooks, 'maybe_do_policyd_overrides')
@patch.object(hooks, 'update_all_identity_relation_units')
@patch.object(utils, 'os_release')
@@ -632,7 +638,8 @@ class KeystoneRelationTests(CharmTestCase):
mock_is_db_ready,
os_release,
update,
mock_maybe_do_policyd_overrides):
mock_maybe_do_policyd_overrides,
mock_protect_service_accounts):
os_release.return_value = 'havana'
mock_is_db_initialised.return_value = True
mock_is_db_ready.return_value = True
@@ -649,6 +656,7 @@ class KeystoneRelationTests(CharmTestCase):
mock_stop_manager_instance.assert_called_once_with()
mock_maybe_do_policyd_overrides.assert_called_once_with(
ANY, "keystone")
mock_protect_service_accounts.assert_called_once_with()
@patch.object(hooks, 'update_all_identity_relation_units')
@patch.object(hooks, 'is_db_initialised')

View File

@@ -14,6 +14,7 @@
import builtins
import collections
import copy
from mock import patch, call, MagicMock, mock_open, Mock
import json
import os
@@ -643,6 +644,148 @@ class TestKeystoneUtils(CharmTestCase):
utils.create_service_credentials('serviceA')
mock_create_user_credentials.assert_has_calls(calls)
@patch.object(utils,
'protect_user_account_from_pci_dss_force_change_password')
@patch.object(utils, 'set_service_password')
@patch.object(utils, 'get_service_password')
@patch.object(utils, 'create_user_credentials')
def test_create_service_credentials_v3(self,
mock_create_user_credentials,
get_callback,
set_callback,
mock_protect_user_accounts):
get_callback.return_value = 'passA'
cfg = {'service-tenant': 'tenantA', 'admin-role': 'Admin',
'preferred-api-version': 3}
self.config.side_effect = lambda key: cfg.get(key, None)
calls = [
call('serviceA', get_callback, set_callback, domain='default',
grants=['Admin'], new_roles=None, tenant='tenantA'),
call('serviceA', get_callback, set_callback,
domain='service_domain', grants=['Admin'], new_roles=None,
tenant='tenantA')]
utils.create_service_credentials('serviceA')
mock_create_user_credentials.assert_has_calls(calls)
mock_protect_user_accounts.assert_called_once_with('serviceA')
@patch.object(utils, 'get_api_version')
def test_protect_user_account_from_pci_dss_force_change_password_v2(
self, mock_get_api_version):
mock_get_api_version.return_value = 2
utils.protect_user_account_from_pci_dss_force_change_password('u1')
self.config.assert_not_called()
@patch.object(utils, 'get_api_version')
def test_protect_user_account_from_pci_dss_v3_no_tenant(
self, mock_get_api_version):
mock_get_api_version.return_value = 3
cfg = {}
self.config.side_effect = lambda key: cfg.get(key, None)
with self.assertRaises(ValueError):
utils.protect_user_account_from_pci_dss_force_change_password('u1')
self.config.assert_called_once_with('service-tenant')
@patch.object(utils, 'update_user')
@patch.object(utils, 'get_user_dict')
@patch.object(utils, 'get_api_version')
def test_protect_user_account_from_pci_dss_v3(
self,
mock_get_api_version,
mock_get_user_dict,
mock_update_user):
mock_get_api_version.return_value = 3
cfg = {'service-tenant': 'tenantA'}
self.config.side_effect = lambda key: cfg.get(key, None)
users = {
'u1': {
'id': 'u1id',
'options': {'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}},
'u2': {
'id': 'u2id',
'options': {'ignore_change_password_upon_first_use': False,
'ignore_password_expiry': True}},
'u3': {'id': 'u3id', 'options': {}},
'u4': {'id': 'u4id'},
}
mock_get_user_dict.side_effect = (
lambda key, **kwargs: copy.deepcopy(users.get(key, None)))
# test user exists and option exists and is true
utils.protect_user_account_from_pci_dss_force_change_password('u1')
calls = [call('u1', utils.DEFAULT_DOMAIN),
call('u1', utils.SERVICE_DOMAIN)]
mock_get_user_dict.has_calls(calls)
mock_update_user.assert_not_called()
# test user exists and option exists and is False
mock_get_user_dict.reset_mock()
utils.protect_user_account_from_pci_dss_force_change_password('u2')
calls = [call('u2id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
),
call('u2id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
)]
mock_update_user.assert_has_calls(calls)
# test user exists and the option doesn't exist
mock_get_user_dict.reset_mock()
mock_update_user.reset_mock(calls)
utils.protect_user_account_from_pci_dss_force_change_password('u3')
calls = [call('u3id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
),
call('u3id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
)]
mock_update_user.assert_has_calls(calls)
# test user exists and no options exist
mock_get_user_dict.reset_mock()
mock_update_user.reset_mock(calls)
utils.protect_user_account_from_pci_dss_force_change_password('u4')
calls = [call('u4id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
),
call('u4id',
options={'ignore_change_password_upon_first_use': True,
'ignore_password_expiry': True}
)]
mock_update_user.assert_has_calls(calls)
# test user doesn't exist
mock_get_user_dict.reset_mock()
mock_update_user.reset_mock(calls)
utils.protect_user_account_from_pci_dss_force_change_password('u5')
mock_update_user.assert_not_called()
@patch.object(utils,
'protect_user_account_from_pci_dss_force_change_password')
@patch.object(utils, 'list_users_for_domain')
@patch.object(utils, 'get_api_version')
def test_ensure_all_service_accounts_protected_for_pci_dss_options(
self,
mock_get_api_version,
mock_list_users_for_domain,
mock_protect_service_accounts,
):
# test it does nothing for less that keystone v3
mock_get_api_version.return_value = 2
utils.ensure_all_service_accounts_protected_for_pci_dss_options()
mock_protect_service_accounts.assert_not_called()
# now check that service accounts get protected.
mock_get_api_version.return_value = 3
mock_list_users_for_domain.return_value = [
{'name': 'u1'}, {'name': 'u2'}]
utils.ensure_all_service_accounts_protected_for_pci_dss_options()
mock_protect_service_accounts.assert_has_calls([
call('u1'), call('u2')])
def test_ensure_valid_service_incorrect(self):
utils.ensure_valid_service('fakeservice')
self.log.assert_called_with("Invalid service requested: 'fakeservice'")
@@ -961,13 +1104,13 @@ class TestKeystoneUtils(CharmTestCase):
@patch.object(utils, 'services')
@patch.object(utils, 'get_optional_interfaces')
@patch.object(utils, 'REQUIRED_INTERFACES')
@patch.object(utils, 'check_optional_relations')
@patch.object(utils, 'check_extra_for_assess_status')
@patch.object(utils, 'get_managed_services_and_ports')
@patch.object(utils, 'make_assess_status_func')
def test_assess_status_func(self,
make_assess_status_func,
get_managed_services_and_ports,
check_optional_relations,
check_extra_for_assess_status,
REQUIRED_INTERFACES,
get_optional_interfaces,
services,
@@ -981,7 +1124,9 @@ class TestKeystoneUtils(CharmTestCase):
make_assess_status_func.assert_called_once_with(
'test-config',
{'int': ['test 1'], 'opt': ['test 2']},
charm_func=check_optional_relations, services=['s1'], ports=[200])
charm_func=check_extra_for_assess_status,
services=['s1'],
ports=[200])
def test_pause_unit_helper(self):
with patch.object(utils, '_pause_resume_helper') as prh: