diff --git a/README.md b/README.md index 5bbfd424..ab020963 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,47 @@ Here are the essential commands (filenames are arbitrary): See appendix [Policy Overrides][cdg-appendix-n] in the [OpenStack Charms Deployment Guide][cdg] for a thorough treatment of this feature. +## Security Compliance config option "password-security-compliance" + +The `password-security-compliance` configuration option sets the +`[security_compliance]` section of Keystone's configuration file. + +The configuration option is a YAML dictionary, that is one level deep, with the +following keys (and value formats). + +```yaml +lockout_failure_attempts: +lockout_duration: +disable_user_account_days_inactive: +change_password_upon_first_use: +password_expires_days: +password_regex: +password_regex_description: +unique_last_password_count: +minimum_password_age: +``` + +It can be set by placing the keys and values in a file and then using the Juju +command: + + juju config keystone --file path/to/config.yaml + +Note that, in this, case the `config.yaml` file requires the YAML key +`password-security-compliance:` with the desired config keys and values on the +following lines, indented for a dictionary. + +> **Note**: Please ensure that the page [Security compliance and PCI-DSS][SCPD] + is consulted before setting these option. + +The charm will protect service accounts (accounts requested by other units that +are in the service domain) against being forced to change their password. +Operators should also ensure that any other accounts are protected as per the +above referenced note. + +If the config value cannot be parsed as YAML and/or the options are not +able to be parsed as their indicated types then the charm will enter a blocked +state until the config value is changed. + ## Token Support As the keystone charm supports multiple releases of the OpenStack software, it @@ -294,3 +335,4 @@ For general charm questions refer to the OpenStack [Charm Guide][cg]. [cdg]: https://docs.openstack.org/project-deploy-guide/charm-deployment-guide [cdg-appendix-n]: https://docs.openstack.org/project-deploy-guide/charm-deployment-guide/latest/app-policy-overrides.html [lp-bugs-charm-keystone]: https://bugs.launchpad.net/charm-keystone/+filebug +[SCPD]: https://docs.openstack.org/keystone/latest/admin/configuration.html#security-compliance-and-pci-dss diff --git a/config.yaml b/config.yaml index 6d42cca6..6c19c508 100644 --- a/config.yaml +++ b/config.yaml @@ -416,3 +416,17 @@ options: override YAML files in the service's policy.d directory. The resource file should be a ZIP file containing at least one yaml file with a .yaml or .yml extension. If False then remove the overrides. + password-security-compliance: + type: string + default: + description: | + This config item, if set, is used to configure the [security_compliance] + section of the keystone.conf file. This is used to control how the + security around passwords is manged. Please refer to the charm README + for full details. + . + The config item should be a YAML complaint string that sets the various + allowed options in [security_compliance]. The settings are checked by + the charm, but it's possible that it may break things unexpectedly. + Please ensure that the the README and relevant documentation is consulted + before setting this configuration option. diff --git a/hooks/keystone_context.py b/hooks/keystone_context.py index a98cfc8a..3a65d94c 100644 --- a/hooks/keystone_context.py +++ b/hooks/keystone_context.py @@ -14,6 +14,7 @@ import os import json +import yaml from charmhelpers.contrib.openstack import context @@ -24,6 +25,7 @@ from charmhelpers.contrib.hahelpers.cluster import ( ) from charmhelpers.core.hookenv import ( + cached, charm_dir, config, log, @@ -213,6 +215,10 @@ class KeystoneContext(context.OSContextGenerator): flags = context.config_flags_parser(ldap_flags) ctxt['ldap_config_flags'] = flags + ctxt['password_security_compliance'] = ( + self._decode_password_security_compliance_string( + config('password-security-compliance'))) + # Base endpoint URL's which are used in keystone responses # to unauthenticated requests to redirect clients to the # correct auth URL. @@ -237,6 +243,79 @@ class KeystoneContext(context.OSContextGenerator): return ctxt + ALLOWED_SECURITY_COMPLIANCE_SCHEMA = { + 'lockout_failure_attempts': int, + 'lockout_duration': int, + 'disable_user_account_days_inactive': int, + 'change_password_upon_first_use': bool, + 'password_expires_days': int, + 'password_regex': str, + 'password_regex_description': str, + 'unique_last_password_count': int, + 'minimum_password_age': int, + } + + @classmethod + @cached + def _decode_password_security_compliance_string(cls, maybe_yaml): + """Decode string to dict for 'password-security-compliance' + + Perform some validation on the string and return either None, + if the string is not valid, or a dictionary of the security + compliance keys and values. + + :param maybe_yaml: the config item that is (hopefully) YAML format + :type maybe_yaml: str + :returns: a dictionary of keys: values or None if the value is not + valid. + :rtype: Optional[Dict[str, Union[str, int, bool]]] + """ + cmp_release = CompareOpenStackReleases(os_release('keystone')) + if cmp_release < 'newton': + log("'password-security-compliance' isn't valid for releases " + "before Newton.", + level='ERROR') + return None + try: + config_items = yaml.safe_load(maybe_yaml) + except Exception as e: + log("Couldn't decode config value for " + "'password-security-compliance': Invalid YAML?: {}" + .format(str(e)), + level='ERROR') + return None + # ensure that the top level is a dictionary. + if type(config_items) != dict: + log("Couldn't decode config value for " + "'password-security-compliance'. It doesn't appear to be a " + "dictionary: {}".format(str(config_items)), + level='ERROR') + return None + # check that the keys present are valid ones. + config_keys = config_items.keys() + allowed_keys = cls.ALLOWED_SECURITY_COMPLIANCE_SCHEMA.keys() + invalid_keys = [k for k in config_keys if k not in allowed_keys] + if invalid_keys: + log("Invalid config key(s) found in config " + "'password-security-compliance' setting: {}" + .format(", ".join(invalid_keys)), + level='ERROR') + return None + # check that the types are valid + valid_types = cls.ALLOWED_SECURITY_COMPLIANCE_SCHEMA + invalid_types = {k: (type(v) != valid_types[k]) + for k, v in config_items.items()} + if any(invalid_types.values()): + log("Invalid config value type(s) found in config " + "'password-security-compliance' setting: {}" + .format(", ".join(["{}: {} -- should be an {}" + .format(k, type(config_items[k]).__name__, + valid_types[k].__name__) + for k, v in invalid_types.items()])), + level='ERROR') + return None + return config_items + class KeystoneLoggingContext(context.OSContextGenerator): diff --git a/hooks/keystone_hooks.py b/hooks/keystone_hooks.py index aee00eb4..4cb9a014 100755 --- a/hooks/keystone_hooks.py +++ b/hooks/keystone_hooks.py @@ -85,6 +85,7 @@ from keystone_context import fernet_enabled from keystone_utils import ( add_service_to_keystone, + ensure_all_service_accounts_protected_for_pci_dss_options, add_credentials_to_keystone, determine_packages, disable_unused_apache_sites, @@ -711,6 +712,9 @@ def upgrade_charm(): log('Cluster leader - ensuring endpoint configuration is up to ' 'date', level=DEBUG) update_all_identity_relation_units() + # also ensure that the PCI-DSS protection is in place for service + # accounts. + ensure_all_service_accounts_protected_for_pci_dss_options() # call the policy overrides handler which will install any policy overrides maybe_do_policyd_overrides(os_release('keystone'), 'keystone') diff --git a/hooks/keystone_utils.py b/hooks/keystone_utils.py index 8a45b8cd..63b36bfc 100644 --- a/hooks/keystone_utils.py +++ b/hooks/keystone_utils.py @@ -1052,6 +1052,44 @@ def create_user(name, password, tenant=None, domain=None): .format(name, tenant_id, domain_id), level=DEBUG) +def get_user_dict(user, **kwargs): + """Delegate update_user call to the manager + + :param user: the user to fetch + :type user: str + :returns: a dictionary of the user keys:values + :rtype: Optional[Dict[str, ANY]] + """ + manager = get_manager() + return manager.get_user_details_dict(user, **kwargs) + + +def update_user(user, **kwargs): + """Delegate update_user call to the manager + + :param user: the user to modify + :type user: str + :returns: a dictionary of the user keys:values after the update + :rtype: Dict[str, ANY] + """ + manager = get_manager() + return manager.update_user(user, **kwargs) + + +def list_users_for_domain(domain=None, domain_id=None): + """Delegate list_users_for_domain to the manager + + :param domain: The domain name. + :type domain: Optional[str] + :param domain_id: The domain_id string + :type domain_id: Optional[str] + :returns: a list of user dictionaries in the domain + :rtype: List[Dict[str, ANY]] + """ + manager = get_manager() + return manager.list_users_for_domain(domain, domain_id) + + def get_manager(api_version=None): return KeystoneManagerProxy(api_version=api_version) @@ -1599,9 +1637,66 @@ def create_service_credentials(user, new_roles=None): tenant=tenant, new_roles=new_roles, grants=[config('admin-role')], domain=SERVICE_DOMAIN) + # protect the user from pci_dss password shenanigans + protect_user_account_from_pci_dss_force_change_password(user) return passwd +def protect_user_account_from_pci_dss_force_change_password(user_name): + """Protect the user account against forcing a password change option + + The PCI-DSS inspired option `change_password_upon_first_use` causes the + user to have to change their login password on first use. Obviously, this + is a disaster for service accounts. This function sets the option + `ignore_change_password_upon_first_use` on the specified user account so + that service accounts do not get locked out of the cloud. + It also sets the 'ignore_password_expiry' to ensure that passwords do not + get expired. + + This is only applied in a keystone v3 environment, as the PCI-DSS options + are only supported on keystone v3 onwards. + + :param user_name: the user to apply the protected option to. + :type user_name: str + """ + if get_api_version() < 3: + return + tenant = config('service-tenant') + if not tenant: + raise ValueError("No service tenant provided in config") + for domain in (DEFAULT_DOMAIN, SERVICE_DOMAIN): + user = get_user_dict(user_name, domain=domain) + if user is None: + log("User {} in domain {} doesn't exist. Can't set " + "'ignore_change_password_upon_first_use' option True for it." + .format(user_name, domain)) + continue + options = user.get('options', {}) + ignore_option = options.get('ignore_change_password_upon_first_use', + False) + ignore_password_option = options.get('ignore_password_expiry', False) + if ignore_option is False or ignore_password_option is False: + options['ignore_change_password_upon_first_use'] = True + options['ignore_password_expiry'] = True + log("Setting 'ignore_change_password_upon_first_use' and " + "'ignore_password_expiry' to True for" + "user {} in domain {}.".format(user_name, domain)) + update_user(user['id'], options=options) + + +def ensure_all_service_accounts_protected_for_pci_dss_options(): + """This function ensures that the 'ignore_change_password_upon_first_use' + is set for all of the accounts in the SERVICE_DOMAIN, and then the + DEFAULT_DOMAIN. + """ + if get_api_version() < 3: + return + log("Ensuring all service users are protected from PCI-DSS options") + users = list_users_for_domain(domain=SERVICE_DOMAIN) + for user in users: + protect_user_account_from_pci_dss_force_change_password(user['name']) + + def add_service_to_keystone(relation_id=None, remote_unit=None): manager = get_manager() settings = relation_get(rid=relation_id, unit=remote_unit) @@ -2063,7 +2158,7 @@ def get_optional_interfaces(): return optional_interfaces -def check_optional_relations(configs): +def check_extra_for_assess_status(configs): """Check that if we have a relation_id for high availability that we can get the hacluster config. If we can't then we are blocked. This function is called from assess_status/set_os_workload_status as the charm_func and @@ -2080,6 +2175,12 @@ def check_optional_relations(configs): return ('blocked', 'hacluster missing configuration: ' 'vip, vip_iface, vip_cidr') + # verify that the config item, if set, is actually usable and valid + conf = config('password-security-compliance') + if (conf and (keystone_context.KeystoneContext + ._decode_password_security_compliance_string(conf) is None)): + return ('blocked', + "'password-security-compliance' is invalid") # return 'unknown' as the lowest priority to not clobber an existing # status. return 'unknown', '' @@ -2124,7 +2225,7 @@ def assess_status_func(configs, exclude_ha_resource=False): determine_ports()) return make_assess_status_func( configs, required_interfaces, - charm_func=check_optional_relations, + charm_func=check_extra_for_assess_status, services=_services, ports=_ports) diff --git a/hooks/manager.py b/hooks/manager.py index 6a215968..6d1dc14f 100755 --- a/hooks/manager.py +++ b/hooks/manager.py @@ -406,6 +406,89 @@ class KeystoneManager3(KeystoneManager): def update_password(self, user, password): self.api.users.update(user, password=password) + def get_user_details_dict(self, user, **kwargs): + """Get the user details dictionary for a user. + + This fetches the user details for a user and domain or domain_id. + It uses the lowercase name for the user; all users as far as the + keystone charm are concerned are the same if lower cased. + + :param user: the user name to look for. + :type user: str + :returns: a dictionary of key:value pairs representing the user + :rtype: Optional[Dict[str, ANY]] + :raises: RuntimeError if no domain or domain_id is passed. + ValueError if the domain_id cannot be resolved + """ + domain_id = kwargs.get('domain_id', None) + domain = kwargs.get('domain', None) + if not domain_id: + if not domain: + raise RuntimeError( + "Can't resolve a domain as no domain or domain_id " + "supplid.") + domain_id = manager.resolve_domain_id(domain) + if not domain_id: + raise ValueError( + 'Could not resolve domain_id for {} when checking if ' + ' user {} exists'.format(domain, user)) + for u in self.api.users.list(domain=domain_id): + if user.lower() == u.name.lower(): + if domain_id == u.domain_id: + return u.to_dict() + return None + + def update_user(self, user, **kwargs): + """Update the user with data from the **kwargs. + + It is the responsibility of the caller to fully define the user + that needs to be udpated. e.g. preferably the user is a + :class:`keystoneclient.v3.users.User` + + :param user: The user to be updated. + :type user: Union[str, keystoneclient.v3.users.User] + :params **kwargs: the keys, values to be udpated. + :type **kwargs: Dict[str, str] + :returns: the dictionary representation of the updated user + :rtype: Dict[str, ANY] + """ + res = self.api.users.update(user, **kwargs) + return res.to_dict() + + def list_users_for_domain(self, domain=None, domain_id=None): + """Return a list of all the users in a domain. + + This returns a list of the users in the specified domain_id or + domain_id resolved from the domain name. The return value is a + restricted list of dictionary items: + + { + 'name': + 'id': + } + + One of either the :param:`domain` or :param:`domain_id` must be + supplied or otherwise the function raises a RuntimeError. + + :param domain: The domain name. + :type domain: Optional[str] + :param domain_id: The domain_id string + :type domain_id: Optional[str] + :returns: a list of user dictionaries in the domain + :rtype: List[Dict[str, ANY]] + :raises: RuntimeError if no domain or domain_id is passed. + ValueError if the domain_id cannot be resolved from the domain + """ + if domain is None and domain_id is None: + raise RuntimeError("Must supply either domain or domain_id param") + domain_id = domain_id or manager.resolve_domain_id(domain) + if domain_id is None: + raise ValueError( + 'Could not resolve domain_id for {}.'.format(domain)) + users = [{'name': u.name, 'id': u.id} + for u in self.api.users.list(domain=domain_id)] + return users + def roles_for_user(self, user_id, tenant_id=None, domain_id=None): # Specify either a domain or project, not both if domain_id: @@ -547,6 +630,7 @@ if __name__ == '__main__': # endless loop whilst we process messages from the caller while True: try: + result = None data = uds_client.receive() if data == "QUIT" or data is None: break @@ -587,8 +671,9 @@ if __name__ == '__main__': traceback.print_exc() result = {'error': str(e)} finally: - result_json = json.dumps(result, **JSON_ENCODE_OPTIONS) - uds_client.send(result_json) + if result is not None: + result_json = json.dumps(result, **JSON_ENCODE_OPTIONS) + uds_client.send(result_json) # normal exit exit(0) diff --git a/templates/ocata/keystone.conf b/templates/ocata/keystone.conf index b111f127..5640df96 100644 --- a/templates/ocata/keystone.conf +++ b/templates/ocata/keystone.conf @@ -115,12 +115,19 @@ group_allow_delete = False {% endif -%} {% endif -%} -{% if api_version == 3 -%} +{% if api_version == 3 %} [resource] admin_project_domain_name = {{ admin_domain_name }} admin_project_name = admin {% endif -%} +{% if password_security_compliance %} +[security_compliance] +{% for k, v in password_security_compliance.items() -%} +{{ k }} = {{ v }} +{% endfor -%} +{% endif -%} + {% include "parts/section-federation" %} {% include "section-oslo-middleware" %} diff --git a/templates/queens/keystone.conf b/templates/queens/keystone.conf index 5b84455b..c699921e 100644 --- a/templates/queens/keystone.conf +++ b/templates/queens/keystone.conf @@ -114,12 +114,19 @@ group_allow_delete = False {% endif -%} {% endif -%} -{% if api_version == 3 -%} +{% if api_version == 3 %} [resource] admin_project_domain_name = {{ admin_domain_name }} admin_project_name = admin {% endif -%} +{% if password_security_compliance %} +[security_compliance] +{% for k, v in password_security_compliance.items() -%} +{{ k }} = {{ v }} +{% endfor -%} +{% endif -%} + {% include "parts/section-federation" %} {% include "section-oslo-middleware" %} diff --git a/templates/rocky/keystone.conf b/templates/rocky/keystone.conf index 4b8ce500..e2f870eb 100644 --- a/templates/rocky/keystone.conf +++ b/templates/rocky/keystone.conf @@ -98,12 +98,19 @@ group_allow_delete = False {% endif -%} {% endif -%} -{% if api_version == 3 -%} +{% if api_version == 3 %} [resource] admin_project_domain_name = {{ admin_domain_name }} admin_project_name = admin {% endif -%} +{% if password_security_compliance %} +[security_compliance] +{% for k, v in password_security_compliance.items() -%} +{{ k }} = {{ v }} +{% endfor -%} +{% endif -%} + {% include "parts/section-federation" %} {% include "section-oslo-middleware" %} diff --git a/unit_tests/test_keystone_contexts.py b/unit_tests/test_keystone_contexts.py index 7bbfa66e..3e10a523 100644 --- a/unit_tests/test_keystone_contexts.py +++ b/unit_tests/test_keystone_contexts.py @@ -16,7 +16,7 @@ import collections import importlib import os -from mock import patch, MagicMock +from mock import patch, MagicMock, ANY with patch('charmhelpers.contrib.openstack.' 'utils.snap_install_requested') as snap_install_requested: snap_install_requested.return_value = False @@ -506,3 +506,78 @@ class TestKeystoneContexts(CharmTestCase): [u'simple_token_secret', u'foobar']]}} self.assertEqual(ctxt(), exp) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_pre_newton( + self, mock_log): + self.os_release.return_value = 'mitaka' + self.assertIsNone( + context. + KeystoneContext. + _decode_password_security_compliance_string("")) + mock_log.assert_called_once_with(ANY, level='ERROR') + self.assertIn("Newton", mock_log.call_args.args[0]) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_invalid_yaml( + self, mock_log): + self.os_release.return_value = 'ocata' + self.assertIsNone( + context. + KeystoneContext. + _decode_password_security_compliance_string("hello: this: one")) + mock_log.assert_called_once_with(ANY, level='ERROR') + self.assertIn("Invalid YAML", mock_log.call_args.args[0]) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_yaml_not_dict( + self, mock_log): + self.os_release.return_value = 'pike' + self.assertIsNone( + context. + KeystoneContext. + _decode_password_security_compliance_string("hello")) + mock_log.assert_called_once_with(ANY, level='ERROR') + self.assertIn("dictionary", mock_log.call_args.args[0]) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_invalid_key( + self, mock_log): + self.os_release.return_value = 'queens' + self.assertIsNone( + context. + KeystoneContext. + _decode_password_security_compliance_string( + "lockout_failure_attempts: 5\nlookout_duration: 180\n")) + mock_log.assert_called_once_with(ANY, level='ERROR') + self.assertIn("Invalid config key(s)", mock_log.call_args.args[0]) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_invalid_type( + self, mock_log): + self.os_release.return_value = 'rocky' + self.assertIsNone( + context. + KeystoneContext. + _decode_password_security_compliance_string( + "lockout_failure_attempts: hello")) + mock_log.assert_called_once_with(ANY, level='ERROR') + self.assertIn("Invalid config value", mock_log.call_args.args[0]) + + @patch.object(context, 'log') + def test__decode_password_security_compliance_string_valid( + self, mock_log): + self.os_release.return_value = 'stein' + ctxt = (context. + KeystoneContext. + _decode_password_security_compliance_string( + "lockout_failure_attempts: 5\n" + "lockout_duration: 180\n" + "password_expires_days: 30\n")) + mock_log.assert_not_called() + self.assertEqual(ctxt, + { + "lockout_failure_attempts": 5, + "lockout_duration": 180, + "password_expires_days": 30, + }) diff --git a/unit_tests/test_keystone_hooks.py b/unit_tests/test_keystone_hooks.py index 6eb2e3ea..11212cc3 100644 --- a/unit_tests/test_keystone_hooks.py +++ b/unit_tests/test_keystone_hooks.py @@ -582,6 +582,8 @@ class KeystoneRelationTests(CharmTestCase): cmd = ['a2dissite', 'openstack_https_frontend'] self.check_call.assert_called_with(cmd) + @patch.object(hooks, + 'ensure_all_service_accounts_protected_for_pci_dss_options') @patch.object(hooks, 'maybe_do_policyd_overrides') @patch.object(hooks, 'update_all_identity_relation_units') @patch.object(utils, 'os_release') @@ -598,7 +600,8 @@ class KeystoneRelationTests(CharmTestCase): mock_is_db_ready, os_release, update, - mock_maybe_do_policyd_overrides): + mock_maybe_do_policyd_overrides, + mock_protect_service_accounts): os_release.return_value = 'havana' mock_is_db_initialised.return_value = True mock_is_db_ready.return_value = True @@ -615,7 +618,10 @@ class KeystoneRelationTests(CharmTestCase): mock_stop_manager_instance.assert_called_once_with() mock_maybe_do_policyd_overrides.assert_called_once_with( ANY, "keystone") + mock_protect_service_accounts.assert_called_once_with() + @patch.object(hooks, + 'ensure_all_service_accounts_protected_for_pci_dss_options') @patch.object(hooks, 'maybe_do_policyd_overrides') @patch.object(hooks, 'update_all_identity_relation_units') @patch.object(utils, 'os_release') @@ -632,7 +638,8 @@ class KeystoneRelationTests(CharmTestCase): mock_is_db_ready, os_release, update, - mock_maybe_do_policyd_overrides): + mock_maybe_do_policyd_overrides, + mock_protect_service_accounts): os_release.return_value = 'havana' mock_is_db_initialised.return_value = True mock_is_db_ready.return_value = True @@ -649,6 +656,7 @@ class KeystoneRelationTests(CharmTestCase): mock_stop_manager_instance.assert_called_once_with() mock_maybe_do_policyd_overrides.assert_called_once_with( ANY, "keystone") + mock_protect_service_accounts.assert_called_once_with() @patch.object(hooks, 'update_all_identity_relation_units') @patch.object(hooks, 'is_db_initialised') diff --git a/unit_tests/test_keystone_utils.py b/unit_tests/test_keystone_utils.py index 1908bd12..fa4c6d9d 100644 --- a/unit_tests/test_keystone_utils.py +++ b/unit_tests/test_keystone_utils.py @@ -14,6 +14,7 @@ import builtins import collections +import copy from mock import patch, call, MagicMock, mock_open, Mock import json import os @@ -643,6 +644,148 @@ class TestKeystoneUtils(CharmTestCase): utils.create_service_credentials('serviceA') mock_create_user_credentials.assert_has_calls(calls) + @patch.object(utils, + 'protect_user_account_from_pci_dss_force_change_password') + @patch.object(utils, 'set_service_password') + @patch.object(utils, 'get_service_password') + @patch.object(utils, 'create_user_credentials') + def test_create_service_credentials_v3(self, + mock_create_user_credentials, + get_callback, + set_callback, + mock_protect_user_accounts): + get_callback.return_value = 'passA' + cfg = {'service-tenant': 'tenantA', 'admin-role': 'Admin', + 'preferred-api-version': 3} + self.config.side_effect = lambda key: cfg.get(key, None) + calls = [ + call('serviceA', get_callback, set_callback, domain='default', + grants=['Admin'], new_roles=None, tenant='tenantA'), + call('serviceA', get_callback, set_callback, + domain='service_domain', grants=['Admin'], new_roles=None, + tenant='tenantA')] + + utils.create_service_credentials('serviceA') + mock_create_user_credentials.assert_has_calls(calls) + mock_protect_user_accounts.assert_called_once_with('serviceA') + + @patch.object(utils, 'get_api_version') + def test_protect_user_account_from_pci_dss_force_change_password_v2( + self, mock_get_api_version): + mock_get_api_version.return_value = 2 + utils.protect_user_account_from_pci_dss_force_change_password('u1') + self.config.assert_not_called() + + @patch.object(utils, 'get_api_version') + def test_protect_user_account_from_pci_dss_v3_no_tenant( + self, mock_get_api_version): + mock_get_api_version.return_value = 3 + cfg = {} + self.config.side_effect = lambda key: cfg.get(key, None) + with self.assertRaises(ValueError): + utils.protect_user_account_from_pci_dss_force_change_password('u1') + self.config.assert_called_once_with('service-tenant') + + @patch.object(utils, 'update_user') + @patch.object(utils, 'get_user_dict') + @patch.object(utils, 'get_api_version') + def test_protect_user_account_from_pci_dss_v3( + self, + mock_get_api_version, + mock_get_user_dict, + mock_update_user): + mock_get_api_version.return_value = 3 + cfg = {'service-tenant': 'tenantA'} + self.config.side_effect = lambda key: cfg.get(key, None) + users = { + 'u1': { + 'id': 'u1id', + 'options': {'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True}}, + 'u2': { + 'id': 'u2id', + 'options': {'ignore_change_password_upon_first_use': False, + 'ignore_password_expiry': True}}, + 'u3': {'id': 'u3id', 'options': {}}, + 'u4': {'id': 'u4id'}, + } + mock_get_user_dict.side_effect = ( + lambda key, **kwargs: copy.deepcopy(users.get(key, None))) + + # test user exists and option exists and is true + utils.protect_user_account_from_pci_dss_force_change_password('u1') + calls = [call('u1', utils.DEFAULT_DOMAIN), + call('u1', utils.SERVICE_DOMAIN)] + mock_get_user_dict.has_calls(calls) + mock_update_user.assert_not_called() + + # test user exists and option exists and is False + mock_get_user_dict.reset_mock() + utils.protect_user_account_from_pci_dss_force_change_password('u2') + calls = [call('u2id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + ), + call('u2id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + )] + mock_update_user.assert_has_calls(calls) + + # test user exists and the option doesn't exist + mock_get_user_dict.reset_mock() + mock_update_user.reset_mock(calls) + utils.protect_user_account_from_pci_dss_force_change_password('u3') + calls = [call('u3id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + ), + call('u3id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + )] + mock_update_user.assert_has_calls(calls) + # test user exists and no options exist + mock_get_user_dict.reset_mock() + mock_update_user.reset_mock(calls) + utils.protect_user_account_from_pci_dss_force_change_password('u4') + calls = [call('u4id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + ), + call('u4id', + options={'ignore_change_password_upon_first_use': True, + 'ignore_password_expiry': True} + )] + mock_update_user.assert_has_calls(calls) + # test user doesn't exist + mock_get_user_dict.reset_mock() + mock_update_user.reset_mock(calls) + utils.protect_user_account_from_pci_dss_force_change_password('u5') + mock_update_user.assert_not_called() + + @patch.object(utils, + 'protect_user_account_from_pci_dss_force_change_password') + @patch.object(utils, 'list_users_for_domain') + @patch.object(utils, 'get_api_version') + def test_ensure_all_service_accounts_protected_for_pci_dss_options( + self, + mock_get_api_version, + mock_list_users_for_domain, + mock_protect_service_accounts, + ): + # test it does nothing for less that keystone v3 + mock_get_api_version.return_value = 2 + utils.ensure_all_service_accounts_protected_for_pci_dss_options() + mock_protect_service_accounts.assert_not_called() + # now check that service accounts get protected. + mock_get_api_version.return_value = 3 + mock_list_users_for_domain.return_value = [ + {'name': 'u1'}, {'name': 'u2'}] + utils.ensure_all_service_accounts_protected_for_pci_dss_options() + mock_protect_service_accounts.assert_has_calls([ + call('u1'), call('u2')]) + def test_ensure_valid_service_incorrect(self): utils.ensure_valid_service('fakeservice') self.log.assert_called_with("Invalid service requested: 'fakeservice'") @@ -961,13 +1104,13 @@ class TestKeystoneUtils(CharmTestCase): @patch.object(utils, 'services') @patch.object(utils, 'get_optional_interfaces') @patch.object(utils, 'REQUIRED_INTERFACES') - @patch.object(utils, 'check_optional_relations') + @patch.object(utils, 'check_extra_for_assess_status') @patch.object(utils, 'get_managed_services_and_ports') @patch.object(utils, 'make_assess_status_func') def test_assess_status_func(self, make_assess_status_func, get_managed_services_and_ports, - check_optional_relations, + check_extra_for_assess_status, REQUIRED_INTERFACES, get_optional_interfaces, services, @@ -981,7 +1124,9 @@ class TestKeystoneUtils(CharmTestCase): make_assess_status_func.assert_called_once_with( 'test-config', {'int': ['test 1'], 'opt': ['test 2']}, - charm_func=check_optional_relations, services=['s1'], ports=[200]) + charm_func=check_extra_for_assess_status, + services=['s1'], + ports=[200]) def test_pause_unit_helper(self): with patch.object(utils, '_pause_resume_helper') as prh: