Fix keystone-manage mapping_purge with --type option

"keystone-manage mapping_purge" has "--type" option in order to select
mappings which have to be deleted. However, the option doesn't work
and all mappings are purged when specifying this option.
The command passes a value of the option to purge_mappings API with
a key name "type", but the API expects "entity_type".

Change-Id: Ib222d70d4cf6bb61847cef049ec1ded5c2ff2c8b
Closes-Bug: #1757022
This commit is contained in:
DaiHanada 2018-02-07 14:21:38 +09:00 committed by Lance Bragstad
parent 56237b709e
commit 74684cf53b
3 changed files with 212 additions and 1 deletions

View File

@ -970,7 +970,7 @@ class MappingPurge(BaseApp):
if CONF.command.local_id is not None: if CONF.command.local_id is not None:
mapping['local_id'] = CONF.command.local_id mapping['local_id'] = CONF.command.local_id
if CONF.command.type is not None: if CONF.command.type is not None:
mapping['type'] = CONF.command.type mapping['entity_type'] = CONF.command.type
mapping_manager.purge_mappings(mapping) mapping_manager.purge_mappings(mapping)

View File

@ -1386,6 +1386,209 @@ class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase):
args.append(uuid.uuid4().hex) args.append(uuid.uuid4().hex)
self.parser.parse_args(args) self.parser.parse_args(args)
@mock.patch.object(keystone.identity.MappingManager, 'purge_mappings')
def test_mapping_purge_type_user(self, purge_mock):
# Make sure the logic in main() actually catches no argument error
self.command_type = 'user'
self.command_all = False
self.command_domain_name = None
self.command_local_id = uuid.uuid4().hex
self.command_public_id = uuid.uuid4().hex
self.useFixture(fixtures.MockPatchObject(
CONF, 'command', self.FakeConfCommand(self)))
def fake_load_backends():
return dict(
id_mapping_api=keystone.identity.core.MappingManager,
resource_api=None)
self.useFixture(fixtures.MockPatch(
'keystone.server.backends.load_backends',
side_effect=fake_load_backends))
cli.MappingPurge.main()
purge_mock.assert_called_with({'entity_type': 'user',
'local_id': self.command_local_id,
'public_id': self.command_public_id})
class TestUserMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
sqldb = self.useFixture(database.Database())
super(TestUserMappingPurgeFunctional, self).setUp()
self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
self.ldapdb.clear()
self.load_backends()
sqldb.recreate()
self.load_fixtures(default_fixtures)
def config_files(self):
self.config_fixture.register_cli_opt(cli.command_opt)
config_files = super(
TestUserMappingPurgeFunctional, self
).config_files()
config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
return config_files
def config_overrides(self):
super(TestUserMappingPurgeFunctional, self).config_overrides()
self.config_fixture.config(group='identity', driver='ldap')
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
def config(self, config_files):
CONF(args=['mapping_purge', '--type', 'user'],
project='keystone',
default_config_files=config_files)
def test_purge_by_user_type(self):
# Grab the list of the users from the backend directly to avoid
# populating the public_ids for each user. We do this so we can grab
# the local_id of a user before it's overwritten by the public_id.
hints = None
users = PROVIDERS.identity_api.driver.list_users(hints)
# Create a new group in the backend directly. We do this so that we
# have control over the local_id, which is `id` here. After creating
# the group, let's list them so the id_mapping_api creates the public
# id appropriately.
group_ref = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id
}
PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
PROVIDERS.identity_api.list_groups()
# Make sure all users and groups have public ids by querying the
# id_mapping_api.
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity))
group_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': group_ref['id'],
'entity_type': identity_mapping.EntityType.GROUP}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(group_entity)
)
# Purge all users mappings
provider_api.ProviderAPIs._clear_registry_instances()
cli.MappingPurge.main()
# Check that all the user mappings were purged
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity)
)
# Make sure the group mapping still exists
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(group_entity)
)
class TestGroupMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
sqldb = self.useFixture(database.Database())
super(TestGroupMappingPurgeFunctional, self).setUp()
self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
self.ldapdb.clear()
self.load_backends()
sqldb.recreate()
self.load_fixtures(default_fixtures)
def config_files(self):
self.config_fixture.register_cli_opt(cli.command_opt)
config_files = super(
TestGroupMappingPurgeFunctional, self
).config_files()
config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
return config_files
def config_overrides(self):
super(TestGroupMappingPurgeFunctional, self).config_overrides()
self.config_fixture.config(group='identity', driver='ldap')
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
def config(self, config_files):
CONF(args=['mapping_purge', '--type', 'group'],
project='keystone',
default_config_files=config_files)
def test_purge_by_group_type(self):
# Grab the list of the users from the backend directly to avoid
# populating the public_ids for each user. We do this so we can grab
# the local_id of a user before it's overwritten by the public_id.
hints = None
users = PROVIDERS.identity_api.driver.list_users(hints)
# Create a new group in the backend directly. We do this so that we
# have control over the local_id, which is `id` here. After creating
# the group, let's list them so the id_mapping_api creates the public
# id appropriately.
group_ref = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id
}
PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
PROVIDERS.identity_api.list_groups()
# Make sure all users and groups have public ids by querying the
# id_mapping_api.
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity))
group_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': group_ref['id'],
'entity_type': identity_mapping.EntityType.GROUP}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(group_entity)
)
# Purge group mappings
provider_api.ProviderAPIs._clear_registry_instances()
cli.MappingPurge.main()
# Make sure the group mapping was purged
self.assertIsNone(
PROVIDERS.id_mapping_api.get_public_id(group_entity)
)
# Check that all the user mappings still exist
for user in users:
local_entity = {
'domain_id': CONF.identity.default_domain_id,
'local_id': user['id'],
'entity_type': identity_mapping.EntityType.USER}
self.assertIsNotNone(
PROVIDERS.id_mapping_api.get_public_id(local_entity)
)
class TestTokenFlush(unit.TestCase): class TestTokenFlush(unit.TestCase):

View File

@ -0,0 +1,8 @@
---
fixes:
- |
[`bug 1757022 <https://bugs.launchpad.net/keystone/+bug/1757022>`_]
In previous releases, ``keystone-manage mapping_purge --type {user,group}``
command would purge all mapping incorrectly instead of only purging the
specified type mappings. ``keystone-manage mapping_purge --type {user,group}``
now purges only specified type mappings as expected.