keystone/keystone/tests/unit/test_cli.py

1430 lines
61 KiB
Python

# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import uuid
import argparse
import fixtures
import mock
import oslo_config.fixture
from oslo_db.sqlalchemy import migration
from oslo_log import log
from six.moves import configparser
from six.moves import range
from testtools import matchers
from keystone.auth import controllers
from keystone.cmd import cli
from keystone.cmd.doctor import caching
from keystone.cmd.doctor import credential
from keystone.cmd.doctor import database as doc_database
from keystone.cmd.doctor import debug
from keystone.cmd.doctor import federation
from keystone.cmd.doctor import ldap
from keystone.cmd.doctor import security_compliance
from keystone.cmd.doctor import tokens
from keystone.cmd.doctor import tokens_fernet
from keystone.common import provider_api
from keystone.common.sql import upgrades
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone.identity.mapping_backends import mapping as identity_mapping
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.ksfixtures import ldapdb
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class CliTestCase(unit.SQLDriverOverrides, unit.TestCase):
def config_files(self):
config_files = super(CliTestCase, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
return config_files
def test_token_flush(self):
self.useFixture(database.Database())
self.load_backends()
# NOTE(morgan): we are testing a direct instantiation of the
# persistence manager for flushing. We should clear this out so we
# don't error. CLI should never impact a running service
# and should never actually lock the registry for dependencies.
provider_api.ProviderAPIs._clear_registry_instances()
cli.TokenFlush.main()
class CliNoConfigTestCase(unit.BaseTestCase):
def setUp(self):
self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
self.config_fixture.register_cli_opt(cli.command_opt)
self.useFixture(fixtures.MockPatch(
'oslo_config.cfg.find_config_files', return_value=[]))
super(CliNoConfigTestCase, self).setUp()
# NOTE(crinkle): the command call doesn't have to actually work,
# that's what the other unit tests are for. So just mock it out.
class FakeConfCommand(object):
def __init__(self):
self.cmd_class = mock.Mock()
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', FakeConfCommand()))
self.logging = self.useFixture(fixtures.FakeLogger(level=log.WARN))
def test_cli(self):
expected_msg = 'Config file not found, using default configs.'
cli.main(argv=['keystone-manage', 'db_sync'])
self.assertThat(self.logging.output, matchers.Contains(expected_msg))
class CliBootStrapTestCase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(CliBootStrapTestCase, self).setUp()
def config_files(self):
self.config_fixture.register_cli_opt(cli.command_opt)
config_files = super(CliBootStrapTestCase, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
return config_files
def config(self, config_files):
CONF(args=['bootstrap', '--bootstrap-password', uuid.uuid4().hex],
project='keystone',
default_config_files=config_files)
def test_bootstrap(self):
bootstrap = cli.BootStrap()
self._do_test_bootstrap(bootstrap)
def _do_test_bootstrap(self, bootstrap):
bootstrap.do_bootstrap()
project = bootstrap.resource_manager.get_project_by_name(
bootstrap.project_name,
'default')
user = bootstrap.identity_manager.get_user_by_name(
bootstrap.username,
'default')
role = bootstrap.role_manager.get_role(bootstrap.role_id)
role_list = (
bootstrap.assignment_manager.get_roles_for_user_and_project(
user['id'],
project['id']))
self.assertIs(1, len(role_list))
self.assertEqual(role_list[0], role['id'])
system_roles = (
bootstrap.assignment_manager.list_system_grants_for_user(
user['id']
)
)
self.assertIs(1, len(system_roles))
self.assertEqual(system_roles[0]['id'], role['id'])
# NOTE(morganfainberg): Pass an empty context, it isn't used by
# `authenticate` method.
bootstrap.identity_manager.authenticate(
self.make_request(),
user['id'],
bootstrap.password)
if bootstrap.region_id:
region = bootstrap.catalog_manager.get_region(bootstrap.region_id)
self.assertEqual(self.region_id, region['id'])
if bootstrap.service_id:
svc = bootstrap.catalog_manager.get_service(bootstrap.service_id)
self.assertEqual(self.service_name, svc['name'])
self.assertEqual(set(['admin', 'public', 'internal']),
set(bootstrap.endpoints))
urls = {'public': self.public_url,
'internal': self.internal_url,
'admin': self.admin_url}
for interface, url in urls.items():
endpoint_id = bootstrap.endpoints[interface]
endpoint = bootstrap.catalog_manager.get_endpoint(endpoint_id)
self.assertEqual(self.region_id, endpoint['region_id'])
self.assertEqual(url, endpoint['url'])
self.assertEqual(svc['id'], endpoint['service_id'])
self.assertEqual(interface, endpoint['interface'])
def test_bootstrap_is_idempotent_when_password_does_not_change(self):
# NOTE(morganfainberg): Ensure we can run bootstrap with the same
# configuration multiple times without erroring.
bootstrap = cli.BootStrap()
self._do_test_bootstrap(bootstrap)
v3_token_controller = controllers.Auth()
v3_password_data = {
'identity': {
"methods": ["password"],
"password": {
"user": {
"name": bootstrap.username,
"password": bootstrap.password,
"domain": {
"id": CONF.identity.default_domain_id
}
}
}
}
}
auth_response = v3_token_controller.authenticate_for_token(
self.make_request(), v3_password_data)
token = auth_response.headers['X-Subject-Token']
self._do_test_bootstrap(bootstrap)
# build validation request
request = self.make_request(is_admin=True)
request.context_dict['subject_token_id'] = token
# Make sure the token we authenticate for is still valid.
v3_token_controller.validate_token(request)
def test_bootstrap_is_not_idempotent_when_password_does_change(self):
# NOTE(lbragstad): Ensure bootstrap isn't idempotent when run with
# different arguments or configuration values.
bootstrap = cli.BootStrap()
self._do_test_bootstrap(bootstrap)
v3_token_controller = controllers.Auth()
v3_password_data = {
'identity': {
"methods": ["password"],
"password": {
"user": {
"name": bootstrap.username,
"password": bootstrap.password,
"domain": {
"id": CONF.identity.default_domain_id
}
}
}
}
}
auth_response = v3_token_controller.authenticate_for_token(
self.make_request(), v3_password_data)
token = auth_response.headers['X-Subject-Token']
os.environ['OS_BOOTSTRAP_PASSWORD'] = uuid.uuid4().hex
self._do_test_bootstrap(bootstrap)
# build validation request
request = self.make_request(is_admin=True)
request.context_dict['subject_token_id'] = token
# Since the user account was recovered with a different password, we
# shouldn't be able to validate this token. Bootstrap should have
# persisted a revocation event because the user's password was updated.
# Since this token was obtained using the original password, it should
# now be invalid.
self.assertRaises(
exception.TokenNotFound,
v3_token_controller.validate_token,
request
)
def test_bootstrap_recovers_user(self):
bootstrap = cli.BootStrap()
self._do_test_bootstrap(bootstrap)
# Completely lock the user out.
user_id = bootstrap.identity_manager.get_user_by_name(
bootstrap.username,
'default')['id']
bootstrap.identity_manager.update_user(
user_id,
{'enabled': False,
'password': uuid.uuid4().hex})
# The second bootstrap run will recover the account.
self._do_test_bootstrap(bootstrap)
# Sanity check that the original password works again.
bootstrap.identity_manager.authenticate(
self.make_request(),
user_id,
bootstrap.password)
def test_bootstrap_with_ambiguous_role_names(self):
bootstrap = cli.BootStrap()
# bootstrap system to create the default admin role
self._do_test_bootstrap(bootstrap)
# create a domain-specific roles that share the same names as the
# default roles created by keystone-manage bootstrap
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
domain_roles = {}
for name in ['admin', 'member', 'reader']:
domain_role = {
'domain_id': domain['id'],
'id': uuid.uuid4().hex,
'name': name
}
domain_roles[name] = PROVIDERS.role_api.create_role(
domain_role['id'], domain_role
)
# ensure subsequent bootstrap attempts don't fail because of
# ambiguity
self._do_test_bootstrap(bootstrap)
class CliBootStrapTestCaseWithEnvironment(CliBootStrapTestCase):
def config(self, config_files):
CONF(args=['bootstrap'], project='keystone',
default_config_files=config_files)
def setUp(self):
super(CliBootStrapTestCaseWithEnvironment, self).setUp()
self.password = uuid.uuid4().hex
self.username = uuid.uuid4().hex
self.project_name = uuid.uuid4().hex
self.role_name = uuid.uuid4().hex
self.service_name = uuid.uuid4().hex
self.public_url = uuid.uuid4().hex
self.internal_url = uuid.uuid4().hex
self.admin_url = uuid.uuid4().hex
self.region_id = uuid.uuid4().hex
self.default_domain = {
'id': CONF.identity.default_domain_id,
'name': 'Default',
}
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_PASSWORD',
newvalue=self.password))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_USERNAME',
newvalue=self.username))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_PROJECT_NAME',
newvalue=self.project_name))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_ROLE_NAME',
newvalue=self.role_name))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_SERVICE_NAME',
newvalue=self.service_name))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_PUBLIC_URL',
newvalue=self.public_url))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_INTERNAL_URL',
newvalue=self.internal_url))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_ADMIN_URL',
newvalue=self.admin_url))
self.useFixture(
fixtures.EnvironmentVariable('OS_BOOTSTRAP_REGION_ID',
newvalue=self.region_id))
def test_assignment_created_with_user_exists(self):
# test assignment can be created if user already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
user_ref = unit.new_user_ref(self.default_domain['id'],
name=self.username,
password=self.password)
bootstrap.identity_manager.create_user(user_ref)
self._do_test_bootstrap(bootstrap)
def test_assignment_created_with_project_exists(self):
# test assignment can be created if project already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
project_ref = unit.new_project_ref(self.default_domain['id'],
name=self.project_name)
bootstrap.resource_manager.create_project(project_ref['id'],
project_ref)
self._do_test_bootstrap(bootstrap)
def test_assignment_created_with_role_exists(self):
# test assignment can be created if role already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
role = unit.new_role_ref(name=self.role_name)
bootstrap.role_manager.create_role(role['id'], role)
self._do_test_bootstrap(bootstrap)
def test_assignment_created_with_region_exists(self):
# test assignment can be created if region already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
region = unit.new_region_ref(id=self.region_id)
bootstrap.catalog_manager.create_region(region)
self._do_test_bootstrap(bootstrap)
def test_endpoints_created_with_service_exists(self):
# test assignment can be created if service already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
service = unit.new_service_ref(name=self.service_name)
bootstrap.catalog_manager.create_service(service['id'], service)
self._do_test_bootstrap(bootstrap)
def test_endpoints_created_with_endpoint_exists(self):
# test assignment can be created if endpoint already exists.
bootstrap = cli.BootStrap()
bootstrap.resource_manager.create_domain(self.default_domain['id'],
self.default_domain)
service = unit.new_service_ref(name=self.service_name)
bootstrap.catalog_manager.create_service(service['id'], service)
region = unit.new_region_ref(id=self.region_id)
bootstrap.catalog_manager.create_region(region)
endpoint = unit.new_endpoint_ref(interface='public',
service_id=service['id'],
url=self.public_url,
region_id=self.region_id)
bootstrap.catalog_manager.create_endpoint(endpoint['id'], endpoint)
self._do_test_bootstrap(bootstrap)
class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(CliDomainConfigAllTestCase, self).setUp()
self.load_backends()
self.config_fixture.config(
group='identity',
domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap')
self.domain_count = 3
self.setup_initial_domains()
self.logging = self.useFixture(
fixtures.FakeLogger(level=logging.INFO))
def config_files(self):
self.config_fixture.register_cli_opt(cli.command_opt)
config_files = super(CliDomainConfigAllTestCase, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
return config_files
def cleanup_domains(self):
for domain in self.domains:
if domain == 'domain_default':
# Not allowed to delete the default domain, but should at least
# delete any domain-specific config for it.
PROVIDERS.domain_config_api.delete_config(
CONF.identity.default_domain_id)
continue
this_domain = self.domains[domain]
this_domain['enabled'] = False
PROVIDERS.resource_api.update_domain(
this_domain['id'], this_domain
)
PROVIDERS.resource_api.delete_domain(this_domain['id'])
self.domains = {}
def config(self, config_files):
CONF(args=['domain_config_upload', '--all'], project='keystone',
default_config_files=config_files)
def setup_initial_domains(self):
def create_domain(domain):
return PROVIDERS.resource_api.create_domain(domain['id'], domain)
self.domains = {}
self.addCleanup(self.cleanup_domains)
for x in range(1, self.domain_count):
domain = 'domain%s' % x
self.domains[domain] = create_domain(
{'id': uuid.uuid4().hex, 'name': domain})
self.default_domain = unit.new_domain_ref(
description=u'The default domain',
id=CONF.identity.default_domain_id,
name=u'Default')
self.domains['domain_default'] = create_domain(self.default_domain)
def test_config_upload(self):
# The values below are the same as in the domain_configs_multi_ldap
# directory of test config_files.
default_config = {
'ldap': {'url': 'fake://memory',
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
'identity': {'driver': 'ldap'}
}
domain1_config = {
'ldap': {'url': 'fake://memory1',
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
'identity': {'driver': 'ldap',
'list_limit': '101'}
}
domain2_config = {
'ldap': {'url': 'fake://memory',
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=myroot,cn=com',
'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org',
'user_tree_dn': 'ou=Users,dc=myroot,dc=org'},
'identity': {'driver': 'ldap'}
}
# Clear backend dependencies, since cli loads these manually
provider_api.ProviderAPIs._clear_registry_instances()
cli.DomainConfigUpload.main()
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
CONF.identity.default_domain_id)
self.assertEqual(default_config, res)
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain1']['id'])
self.assertEqual(domain1_config, res)
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain2']['id'])
self.assertEqual(domain2_config, res)
class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
def config(self, config_files):
CONF(args=['domain_config_upload', '--domain-name', 'Default'],
project='keystone', default_config_files=config_files)
def test_config_upload(self):
# The values below are the same as in the domain_configs_multi_ldap
# directory of test config_files.
default_config = {
'ldap': {'url': 'fake://memory',
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
'identity': {'driver': 'ldap'}
}
# Clear backend dependencies, since cli loads these manually
provider_api.ProviderAPIs._clear_registry_instances()
cli.DomainConfigUpload.main()
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
CONF.identity.default_domain_id)
self.assertEqual(default_config, res)
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain1']['id'])
self.assertEqual({}, res)
res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
self.domains['domain2']['id'])
self.assertEqual({}, res)
def test_no_overwrite_config(self):
# Create a config for the default domain
default_config = {
'ldap': {'url': uuid.uuid4().hex},
'identity': {'driver': 'ldap'}
}
PROVIDERS.domain_config_api.create_config(
CONF.identity.default_domain_id, default_config)
# Now try and upload the settings in the configuration file for the
# default domain
provider_api.ProviderAPIs._clear_registry_instances()
with mock.patch('six.moves.builtins.print') as mock_print:
self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
file_name = ('keystone.%s.conf' % self.default_domain['name'])
error_msg = _(
'Domain: %(domain)s already has a configuration defined - '
'ignoring file: %(file)s.') % {
'domain': self.default_domain['name'],
'file': os.path.join(CONF.identity.domain_config_dir,
file_name)}
mock_print.assert_has_calls([mock.call(error_msg)])
res = PROVIDERS.domain_config_api.get_config(
CONF.identity.default_domain_id)
# The initial config should not have been overwritten
self.assertEqual(default_config, res)
class CliDomainConfigNoOptionsTestCase(CliDomainConfigAllTestCase):
def config(self, config_files):
CONF(args=['domain_config_upload'],
project='keystone', default_config_files=config_files)
def test_config_upload(self):
provider_api.ProviderAPIs._clear_registry_instances()
with mock.patch('six.moves.builtins.print') as mock_print:
self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
mock_print.assert_has_calls(
[mock.call(
_('At least one option must be provided, use either '
'--all or --domain-name'))])
class CliDomainConfigTooManyOptionsTestCase(CliDomainConfigAllTestCase):
def config(self, config_files):
CONF(args=['domain_config_upload', '--all', '--domain-name',
'Default'],
project='keystone', default_config_files=config_files)
def test_config_upload(self):
provider_api.ProviderAPIs._clear_registry_instances()
with mock.patch('six.moves.builtins.print') as mock_print:
self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
mock_print.assert_has_calls(
[mock.call(_('The --all option cannot be used with '
'the --domain-name option'))])
class CliDomainConfigInvalidDomainTestCase(CliDomainConfigAllTestCase):
def config(self, config_files):
self.invalid_domain_name = uuid.uuid4().hex
CONF(args=['domain_config_upload', '--domain-name',
self.invalid_domain_name],
project='keystone', default_config_files=config_files)
def test_config_upload(self):
provider_api.ProviderAPIs._clear_registry_instances()
with mock.patch('six.moves.builtins.print') as mock_print:
self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
file_name = 'keystone.%s.conf' % self.invalid_domain_name
error_msg = (_(
'Invalid domain name: %(domain)s found in config file name: '
'%(file)s - ignoring this file.') % {
'domain': self.invalid_domain_name,
'file': os.path.join(CONF.identity.domain_config_dir,
file_name)})
mock_print.assert_has_calls([mock.call(error_msg)])
class TestDomainConfigFinder(unit.BaseTestCase):
def setUp(self):
super(TestDomainConfigFinder, self).setUp()
self.logging = self.useFixture(fixtures.LoggerFixture())
@mock.patch('os.walk')
def test_finder_ignores_files(self, mock_walk):
mock_walk.return_value = [
['.', [], ['file.txt', 'keystone.conf', 'keystone.domain0.conf']],
]
domain_configs = list(cli._domain_config_finder('.'))
expected_domain_configs = [('./keystone.domain0.conf', 'domain0')]
self.assertThat(domain_configs,
matchers.Equals(expected_domain_configs))
expected_msg_template = ('Ignoring file (%s) while scanning '
'domain config directory')
self.assertThat(
self.logging.output,
matchers.Contains(expected_msg_template % 'file.txt'))
self.assertThat(
self.logging.output,
matchers.Contains(expected_msg_template % 'keystone.conf'))
class CliDBSyncTestCase(unit.BaseTestCase):
class FakeConfCommand(object):
def __init__(self, parent):
self.extension = False
self.check = parent.command_check
self.expand = parent.command_expand
self.migrate = parent.command_migrate
self.contract = parent.command_contract
self.version = None
def setUp(self):
super(CliDBSyncTestCase, self).setUp()
self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
self.config_fixture.register_cli_opt(cli.command_opt)
upgrades.offline_sync_database_to_version = mock.Mock()
upgrades.expand_schema = mock.Mock()
upgrades.migrate_data = mock.Mock()
upgrades.contract_schema = mock.Mock()
self.command_check = False
self.command_expand = False
self.command_migrate = False
self.command_contract = False
def _assert_correct_call(self, mocked_function):
for func in [upgrades.offline_sync_database_to_version,
upgrades.expand_schema,
upgrades.migrate_data,
upgrades.contract_schema]:
if func == mocked_function:
self.assertTrue(func.called)
else:
self.assertFalse(func.called)
def test_db_sync(self):
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
cli.DbSync.main()
self._assert_correct_call(
upgrades.offline_sync_database_to_version)
def test_db_sync_expand(self):
self.command_expand = True
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
cli.DbSync.main()
self._assert_correct_call(upgrades.expand_schema)
def test_db_sync_migrate(self):
self.command_migrate = True
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
cli.DbSync.main()
self._assert_correct_call(upgrades.migrate_data)
def test_db_sync_contract(self):
self.command_contract = True
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
cli.DbSync.main()
self._assert_correct_call(upgrades.contract_schema)
@mock.patch('keystone.cmd.cli.upgrades.get_db_version')
def test_db_sync_check_when_database_is_empty(self, mocked_get_db_version):
e = migration.exception.DBMigrationError("Invalid version")
mocked_get_db_version.side_effect = e
checker = cli.DbSync()
log_info = self.useFixture(fixtures.FakeLogger(level=log.INFO))
status = checker.check_db_sync_status()
self.assertIn("not currently under version control", log_info.output)
self.assertEqual(status, 2)
class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
sqldb = self.useFixture(database.Database())
super(TestMappingPopulate, self).setUp()
self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
self.ldapdb.clear()
self.load_backends()
sqldb.recreate()
self.load_fixtures(default_fixtures)
def config_files(self):
self.config_fixture.register_cli_opt(cli.command_opt)
config_files = super(TestMappingPopulate, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
return config_files
def config_overrides(self):
super(TestMappingPopulate, self).config_overrides()
self.config_fixture.config(group='identity', driver='ldap')
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
def config(self, config_files):
CONF(args=['mapping_populate', '--domain-name', 'Default'],
project='keystone',
default_config_files=config_files)
def test_mapping_populate(self):
# mapping_populate should create id mappings. Test plan:
# 0. Purge mappings
# 1. Fetch user list directly via backend. It will not create any
# mappings because it bypasses identity manager
# 2. Verify that users have no public_id yet
# 3. Execute mapping_populate. It should create id mappings
# 4. For the same users verify that they have public_id now
purge_filter = {}
PROVIDERS.id_mapping_api.purge_mappings(purge_filter)
hints = None
users = PROVIDERS.identity_api.driver.list_users(hints)
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity)
)
# backends are loaded again in the command handler
provider_api.ProviderAPIs._clear_registry_instances()
cli.MappingPopulate.main()
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity))
def test_bad_domain_name(self):
CONF(args=['mapping_populate', '--domain-name', uuid.uuid4().hex],
project='keystone')
# backends are loaded again in the command handler
provider_api.ProviderAPIs._clear_registry_instances()
# NOTE: assertEqual is used on purpose. assertFalse passes with None.
self.assertEqual(False, cli.MappingPopulate.main())
class CliDomainConfigUploadNothing(unit.BaseTestCase):
def setUp(self):
super(CliDomainConfigUploadNothing, self).setUp()
config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
config_fixture.register_cli_opt(cli.command_opt)
# NOTE(dstanek): since this is not testing any database
# functionality there is no need to go through the motions and
# setup a test database.
def fake_load_backends(self):
self.resource_manager = mock.Mock()
self.useFixture(fixtures.MockPatchObject(
cli.DomainConfigUploadFiles, 'load_backends', fake_load_backends))
tempdir = self.useFixture(fixtures.TempDir())
config_fixture.config(group='identity', domain_config_dir=tempdir.path)
self.logging = self.useFixture(
fixtures.FakeLogger(level=logging.DEBUG))
def test_uploading_all_from_an_empty_directory(self):
CONF(args=['domain_config_upload', '--all'], project='keystone',
default_config_files=[])
cli.DomainConfigUpload.main()
expected_msg = ('No domain configs uploaded from %r' %
CONF.identity.domain_config_dir)
self.assertThat(self.logging.output,
matchers.Contains(expected_msg))
class CachingDoctorTests(unit.TestCase):
def test_symptom_caching_disabled(self):
# Symptom Detected: Caching disabled
self.config_fixture.config(group='cache', enabled=False)
self.assertTrue(caching.symptom_caching_disabled())
# No Symptom Detected: Caching is enabled
self.config_fixture.config(group='cache', enabled=True)
self.assertFalse(caching.symptom_caching_disabled())
def test_caching_symptom_caching_enabled_without_a_backend(self):
# Success Case: Caching enabled and backend configured
self.config_fixture.config(group='cache', enabled=True)
self.config_fixture.config(group='cache', backend='dogpile.cache.null')
self.assertTrue(caching.symptom_caching_enabled_without_a_backend())
# Failure Case 1: Caching disabled and backend not configured
self.config_fixture.config(group='cache', enabled=False)
self.config_fixture.config(group='cache', backend='dogpile.cache.null')
self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
# Failure Case 2: Caching disabled and backend configured
self.config_fixture.config(group='cache', enabled=False)
self.config_fixture.config(group='cache',
backend='dogpile.cache.memory')
self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
# Failure Case 3: Caching enabled and backend configured
self.config_fixture.config(group='cache', enabled=True)
self.config_fixture.config(group='cache',
backend='dogpile.cache.memory')
self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
class CredentialDoctorTests(unit.TestCase):
def test_credential_and_fernet_key_repositories_match(self):
# Symptom Detected: Key repository paths are not unique
directory = self.useFixture(fixtures.TempDir()).path
self.config_fixture.config(group='credential',
key_repository=directory)
self.config_fixture.config(group='fernet_tokens',
key_repository=directory)
self.assertTrue(credential.symptom_unique_key_repositories())
def test_credential_and_fernet_key_repositories_are_unique(self):
# No Symptom Detected: Key repository paths are unique
self.config_fixture.config(group='credential',
key_repository='/etc/keystone/cred-repo')
self.config_fixture.config(group='fernet_tokens',
key_repository='/etc/keystone/fernet-repo')
self.assertFalse(credential.symptom_unique_key_repositories())
@mock.patch('keystone.cmd.doctor.credential.utils')
def test_usability_of_cred_fernet_key_repo_raised(self, mock_utils):
# Symptom Detected: credential fernet key repository is world readable
self.config_fixture.config(group='credential', provider='fernet')
mock_utils.TokenUtils().validate_key_repository.return_value = False
self.assertTrue(
credential.symptom_usability_of_credential_fernet_key_repository())
@mock.patch('keystone.cmd.doctor.credential.utils')
def test_usability_of_cred_fernet_key_repo_not_raised(self, mock_utils):
# No Symptom Detected: Custom driver is used
self.config_fixture.config(group='credential', provider='my-driver')
mock_utils.TokenUtils().validate_key_repository.return_value = True
self.assertFalse(
credential.symptom_usability_of_credential_fernet_key_repository())
# No Symptom Detected: key repository is not world readable
self.config_fixture.config(group='credential', provider='fernet')
mock_utils.TokenUtils().validate_key_repository.return_value = True
self.assertFalse(
credential.symptom_usability_of_credential_fernet_key_repository())
@mock.patch('keystone.cmd.doctor.credential.utils')
def test_keys_in_credential_fernet_key_repository_raised(self, mock_utils):
# Symptom Detected: Key repo is empty
self.config_fixture.config(group='credential', provider='fernet')
mock_utils.TokenUtils().load_keys.return_value = False
self.assertTrue(
credential.symptom_keys_in_credential_fernet_key_repository())
@mock.patch('keystone.cmd.doctor.credential.utils')
def test_keys_in_credential_fernet_key_repository_not_raised(
self, mock_utils):
# No Symptom Detected: Custom driver is used
self.config_fixture.config(group='credential', provider='my-driver')
mock_utils.TokenUtils().load_keys.return_value = True
self.assertFalse(
credential.symptom_keys_in_credential_fernet_key_repository())
# No Symptom Detected: Key repo is not empty, fernet is current driver
self.config_fixture.config(group='credential', provider='fernet')
mock_utils.TokenUtils().load_keys.return_value = True
self.assertFalse(
credential.symptom_keys_in_credential_fernet_key_repository())
class DatabaseDoctorTests(unit.TestCase):
def test_symptom_is_raised_if_database_connection_is_SQLite(self):
# Symptom Detected: Database connection is sqlite
self.config_fixture.config(
group='database',
connection='sqlite:///mydb')
self.assertTrue(
doc_database.symptom_database_connection_is_not_SQLite())
# No Symptom Detected: Database connection is MySQL
self.config_fixture.config(
group='database',
connection='mysql+mysqlconnector://admin:secret@localhost/mydb')
self.assertFalse(
doc_database.symptom_database_connection_is_not_SQLite())
class DebugDoctorTests(unit.TestCase):
def test_symptom_debug_mode_is_enabled(self):
# Symptom Detected: Debug mode is enabled
self.config_fixture.config(debug=True)
self.assertTrue(debug.symptom_debug_mode_is_enabled())
# No Symptom Detected: Debug mode is disabled
self.config_fixture.config(debug=False)
self.assertFalse(debug.symptom_debug_mode_is_enabled())
class FederationDoctorTests(unit.TestCase):
def test_symptom_comma_in_SAML_public_certificate_path(self):
# Symptom Detected: There is a comma in path to public cert file
self.config_fixture.config(group='saml', certfile='file,cert.pem')
self.assertTrue(
federation.symptom_comma_in_SAML_public_certificate_path())
# No Symptom Detected: There is no comma in the path
self.config_fixture.config(group='saml', certfile='signing_cert.pem')
self.assertFalse(
federation.symptom_comma_in_SAML_public_certificate_path())
def test_symptom_comma_in_SAML_private_key_file_path(self):
# Symptom Detected: There is a comma in path to private key file
self.config_fixture.config(group='saml', keyfile='file,key.pem')
self.assertTrue(
federation.symptom_comma_in_SAML_private_key_file_path())
# No Symptom Detected: There is no comma in the path
self.config_fixture.config(group='saml', keyfile='signing_key.pem')
self.assertFalse(
federation.symptom_comma_in_SAML_private_key_file_path())
class LdapDoctorTests(unit.TestCase):
def test_user_enabled_emulation_dn_ignored_raised(self):
# Symptom when user_enabled_emulation_dn is being ignored because the
# user did not enable the user_enabled_emulation
self.config_fixture.config(group='ldap', user_enabled_emulation=False)
self.config_fixture.config(
group='ldap',
user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
self.assertTrue(
ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
def test_user_enabled_emulation_dn_ignored_not_raised(self):
# No symptom when configuration set properly
self.config_fixture.config(group='ldap', user_enabled_emulation=True)
self.config_fixture.config(
group='ldap',
user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
self.assertFalse(
ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
# No symptom when both configurations disabled
self.config_fixture.config(group='ldap', user_enabled_emulation=False)
self.config_fixture.config(group='ldap',
user_enabled_emulation_dn=None)
self.assertFalse(
ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
def test_user_enabled_emulation_use_group_config_ignored_raised(self):
# Symptom when user enabled emulation isn't enabled but group_config is
# enabled
self.config_fixture.config(group='ldap', user_enabled_emulation=False)
self.config_fixture.config(
group='ldap',
user_enabled_emulation_use_group_config=True)
self.assertTrue(
ldap.
symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
def test_user_enabled_emulation_use_group_config_ignored_not_raised(self):
# No symptom when configuration deactivated
self.config_fixture.config(group='ldap', user_enabled_emulation=False)
self.config_fixture.config(
group='ldap',
user_enabled_emulation_use_group_config=False)
self.assertFalse(
ldap.
symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
# No symptom when configurations set properly
self.config_fixture.config(group='ldap', user_enabled_emulation=True)
self.config_fixture.config(
group='ldap',
user_enabled_emulation_use_group_config=True)
self.assertFalse(
ldap.
symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
def test_group_members_are_ids_disabled_raised(self):
# Symptom when objectclass is set to posixGroup but members_are_ids are
# not enabled
self.config_fixture.config(group='ldap',
group_objectclass='posixGroup')
self.config_fixture.config(group='ldap',
group_members_are_ids=False)
self.assertTrue(ldap.symptom_LDAP_group_members_are_ids_disabled())
def test_group_members_are_ids_disabled_not_raised(self):
# No symptom when the configurations are set properly
self.config_fixture.config(group='ldap',
group_objectclass='posixGroup')
self.config_fixture.config(group='ldap',
group_members_are_ids=True)
self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
# No symptom when configuration deactivated
self.config_fixture.config(group='ldap',
group_objectclass='groupOfNames')
self.config_fixture.config(group='ldap',
group_members_are_ids=False)
self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
@mock.patch('os.listdir')
@mock.patch('os.path.isdir')
def test_file_based_domain_specific_configs_raised(self, mocked_isdir,
mocked_listdir):
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=True)
self.config_fixture.config(
group='identity',
domain_configurations_from_database=False)
# Symptom if there is no existing directory
mocked_isdir.return_value = False
self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
# Symptom if there is an invalid filename inside the domain directory
mocked_isdir.return_value = True
mocked_listdir.return_value = ['openstack.domains.conf']
self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
@mock.patch('os.listdir')
@mock.patch('os.path.isdir')
def test_file_based_domain_specific_configs_not_raised(self, mocked_isdir,
mocked_listdir):
# No symptom if both configurations deactivated
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=False)
self.config_fixture.config(
group='identity',
domain_configurations_from_database=False)
self.assertFalse(
ldap.symptom_LDAP_file_based_domain_specific_configs())
# No symptom if directory exists with no invalid filenames
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=True)
self.config_fixture.config(
group='identity',
domain_configurations_from_database=False)
mocked_isdir.return_value = True
mocked_listdir.return_value = ['keystone.domains.conf']
self.assertFalse(
ldap.symptom_LDAP_file_based_domain_specific_configs())
@mock.patch('os.listdir')
@mock.patch('os.path.isdir')
@mock.patch('keystone.cmd.doctor.ldap.configparser.ConfigParser')
def test_file_based_domain_specific_configs_formatted_correctly_raised(
self, mocked_parser, mocked_isdir, mocked_listdir):
symptom = ('symptom_LDAP_file_based_domain_specific_configs'
'_formatted_correctly')
# Symptom Detected: Ldap domain specific configuration files are not
# formatted correctly
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=True)
self.config_fixture.config(
group='identity',
domain_configurations_from_database=False)
mocked_isdir.return_value = True
mocked_listdir.return_value = ['keystone.domains.conf']
mock_instance = mock.MagicMock()
mock_instance.read.side_effect = configparser.Error('No Section')
mocked_parser.return_value = mock_instance
self.assertTrue(getattr(ldap, symptom)())
@mock.patch('os.listdir')
@mock.patch('os.path.isdir')
def test_file_based_domain_specific_configs_formatted_correctly_not_raised(
self, mocked_isdir, mocked_listdir):
symptom = ('symptom_LDAP_file_based_domain_specific_configs'
'_formatted_correctly')
# No Symptom Detected: Domain_specific drivers is not enabled
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=False)
self.assertFalse(getattr(ldap, symptom)())
# No Symptom Detected: Domain configuration from database is enabled
self.config_fixture.config(
group='identity',
domain_specific_drivers_enabled=True)
self.assertFalse(getattr(ldap, symptom)())
self.config_fixture.config(
group='identity',
domain_configurations_from_database=True)
self.assertFalse(getattr(ldap, symptom)())
# No Symptom Detected: The directory in domain_config_dir doesn't exist
mocked_isdir.return_value = False
self.assertFalse(getattr(ldap, symptom)())
# No Symptom Detected: domain specific drivers are enabled, domain
# configurations from database are disabled, directory exists, and no
# exceptions found.
self.config_fixture.config(
group='identity',
domain_configurations_from_database=False)
mocked_isdir.return_value = True
# An empty directory should not raise this symptom
self.assertFalse(getattr(ldap, symptom)())
# Test again with a file inside the directory
mocked_listdir.return_value = ['keystone.domains.conf']
self.assertFalse(getattr(ldap, symptom)())
class SecurityComplianceDoctorTests(unit.TestCase):
def test_minimum_password_age_greater_than_password_expires_days(self):
# Symptom Detected: Minimum password age is greater than the password
# expires days. Both values are positive integers greater than zero.
self.config_fixture.config(group='security_compliance',
minimum_password_age=2)
self.config_fixture.config(group='security_compliance',
password_expires_days=1)
self.assertTrue(
security_compliance.
symptom_minimum_password_age_greater_than_expires_days())
def test_minimum_password_age_equal_to_password_expires_days(self):
# Symptom Detected: Minimum password age is equal to the password
# expires days. Both values are positive integers greater than zero.
self.config_fixture.config(group='security_compliance',
minimum_password_age=1)
self.config_fixture.config(group='security_compliance',
password_expires_days=1)
self.assertTrue(
security_compliance.
symptom_minimum_password_age_greater_than_expires_days())
def test_minimum_password_age_less_than_password_expires_days(self):
# No Symptom Detected: Minimum password age is less than password
# expires days. Both values are positive integers greater than zero.
self.config_fixture.config(group='security_compliance',
minimum_password_age=1)
self.config_fixture.config(group='security_compliance',
password_expires_days=2)
self.assertFalse(
security_compliance.
symptom_minimum_password_age_greater_than_expires_days())
def test_minimum_password_age_and_password_expires_days_deactivated(self):
# No Symptom Detected: when minimum_password_age's default value is 0
# and password_expires_days' default value is None
self.assertFalse(
security_compliance.
symptom_minimum_password_age_greater_than_expires_days())
def test_invalid_password_regular_expression(self):
# Symptom Detected: Regular expression is invalid
self.config_fixture.config(
group='security_compliance',
password_regex='^^(??=.*\d)$')
self.assertTrue(
security_compliance.symptom_invalid_password_regular_expression())
def test_valid_password_regular_expression(self):
# No Symptom Detected: Regular expression is valid
self.config_fixture.config(
group='security_compliance',
password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
self.assertFalse(
security_compliance.symptom_invalid_password_regular_expression())
def test_password_regular_expression_deactivated(self):
# No Symptom Detected: Regular expression deactivated to None
self.config_fixture.config(
group='security_compliance',
password_regex=None)
self.assertFalse(
security_compliance.symptom_invalid_password_regular_expression())
def test_password_regular_expression_description_not_set(self):
# Symptom Detected: Regular expression is set but description is not
self.config_fixture.config(
group='security_compliance',
password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
self.config_fixture.config(
group='security_compliance',
password_regex_description=None)
self.assertTrue(
security_compliance.
symptom_password_regular_expression_description_not_set())
def test_password_regular_expression_description_set(self):
# No Symptom Detected: Regular expression and description are set
desc = '1 letter, 1 digit, and a minimum length of 7 is required'
self.config_fixture.config(
group='security_compliance',
password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
self.config_fixture.config(
group='security_compliance',
password_regex_description=desc)
self.assertFalse(
security_compliance.
symptom_password_regular_expression_description_not_set())
def test_password_regular_expression_description_deactivated(self):
# No Symptom Detected: Regular expression and description are
# deactivated to None
self.config_fixture.config(
group='security_compliance', password_regex=None)
self.config_fixture.config(
group='security_compliance', password_regex_description=None)
self.assertFalse(
security_compliance.
symptom_password_regular_expression_description_not_set())
class TokensDoctorTests(unit.TestCase):
def test_unreasonable_max_token_size_raised(self):
# Symptom Detected: the max_token_size for uuid is not 32
self.config_fixture.config(group='token', provider='uuid')
self.config_fixture.config(max_token_size=33)
self.assertTrue(tokens.symptom_unreasonable_max_token_size())
# Symptom Detected: the max_token_size for fernet is greater than 255
self.config_fixture.config(group='token', provider='fernet')
self.config_fixture.config(max_token_size=256)
self.assertTrue(tokens.symptom_unreasonable_max_token_size())
def test_unreasonable_max_token_size_not_raised(self):
# No Symptom Detected: the max_token_size for uuid is 32
self.config_fixture.config(group='token', provider='uuid')
self.config_fixture.config(max_token_size=32)
self.assertFalse(tokens.symptom_unreasonable_max_token_size())
# No Symptom Detected: the max_token_size for fernet is 255 or less
self.config_fixture.config(group='token', provider='fernet')
self.config_fixture.config(max_token_size=255)
self.assertFalse(tokens.symptom_unreasonable_max_token_size())
class TokenFernetDoctorTests(unit.TestCase):
@mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
def test_usability_of_Fernet_key_repository_raised(self, mock_utils):
# Symptom Detected: Fernet key repo is world readable
self.config_fixture.config(group='token', provider='fernet')
mock_utils.TokenUtils().validate_key_repository.return_value = False
self.assertTrue(
tokens_fernet.symptom_usability_of_Fernet_key_repository())
@mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
def test_usability_of_Fernet_key_repository_not_raised(self, mock_utils):
# No Symptom Detected: UUID is used instead of fernet
self.config_fixture.config(group='token', provider='uuid')
mock_utils.TokenUtils().validate_key_repository.return_value = False
self.assertFalse(
tokens_fernet.symptom_usability_of_Fernet_key_repository())
# No Symptom Detected: configs set properly, key repo is not world
# readable but is user readable
self.config_fixture.config(group='token', provider='fernet')
mock_utils.TokenUtils().validate_key_repository.return_value = True
self.assertFalse(
tokens_fernet.symptom_usability_of_Fernet_key_repository())
@mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
def test_keys_in_Fernet_key_repository_raised(self, mock_utils):
# Symptom Detected: Fernet key repository is empty
self.config_fixture.config(group='token', provider='fernet')
mock_utils.TokenUtils().load_keys.return_value = False
self.assertTrue(
tokens_fernet.symptom_keys_in_Fernet_key_repository())
@mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
def test_keys_in_Fernet_key_repository_not_raised(self, mock_utils):
# No Symptom Detected: UUID is used instead of fernet
self.config_fixture.config(group='token', provider='uuid')
mock_utils.TokenUtils().load_keys.return_value = True
self.assertFalse(
tokens_fernet.symptom_usability_of_Fernet_key_repository())
# No Symptom Detected: configs set properly, key repo has been
# populated with keys
self.config_fixture.config(group='token', provider='fernet')
mock_utils.TokenUtils().load_keys.return_value = True
self.assertFalse(
tokens_fernet.symptom_usability_of_Fernet_key_repository())
class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase):
class FakeConfCommand(object):
def __init__(self, parent):
self.extension = False
self.all = parent.command_all
self.type = parent.command_type
self.domain_name = parent.command_domain_name
self.local_id = parent.command_local_id
self.public_id = parent.command_public_id
def setUp(self):
# Set up preset cli options and a parser
super(TestMappingPurge, self).setUp()
self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
self.config_fixture.register_cli_opt(cli.command_opt)
# For unit tests that should not throw any erorrs,
# Use the argument parser to test that the combinations work
parser_test = argparse.ArgumentParser()
subparsers = parser_test.add_subparsers()
self.parser = cli.MappingPurge.add_argument_parser(subparsers)
def test_mapping_purge_with_no_arguments_fails(self):
# Make sure the logic in main() actually catches no argument error
self.command_type = None
self.command_all = False
self.command_domain_name = None
self.command_local_id = None
self.command_public_id = None
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
self.assertRaises(ValueError, cli.MappingPurge.main)
def test_mapping_purge_with_all_and_other_argument_fails(self):
# Make sure the logic in main() actually catches invalid combinations
self.command_type = 'user'
self.command_all = True
self.command_domain_name = None
self.command_local_id = None
self.command_public_id = None
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
self.assertRaises(ValueError, cli.MappingPurge.main)
def test_mapping_purge_with_only_all_passes(self):
args = (['--all'])
res = self.parser.parse_args(args)
self.assertTrue(vars(res)['all'])
def test_mapping_purge_with_domain_name_argument_succeeds(self):
args = (['--domain-name', uuid.uuid4().hex])
self.parser.parse_args(args)
def test_mapping_purge_with_public_id_argument_succeeds(self):
args = (['--public-id', uuid.uuid4().hex])
self.parser.parse_args(args)
def test_mapping_purge_with_local_id_argument_succeeds(self):
args = (['--local-id', uuid.uuid4().hex])
self.parser.parse_args(args)
def test_mapping_purge_with_type_argument_succeeds(self):
args = (['--type', 'user'])
self.parser.parse_args(args)
args = (['--type', 'group'])
self.parser.parse_args(args)
def test_mapping_purge_with_invalid_argument_fails(self):
args = (['--invalid-option', 'some value'])
self.assertRaises(unit.UnexpectedExit, self.parser.parse_args, args)
def test_mapping_purge_with_all_other_combinations_passes(self):
args = (['--type', 'user', '--local-id', uuid.uuid4().hex])
self.parser.parse_args(args)
args.append('--domain-name')
args.append('test')
self.parser.parse_args(args)
args.append('--public-id')
args.append(uuid.uuid4().hex)
self.parser.parse_args(args)