diff --git a/keystone/cmd/cli.py b/keystone/cmd/cli.py index 5178962838..2e78ad0e24 100644 --- a/keystone/cmd/cli.py +++ b/keystone/cmd/cli.py @@ -970,7 +970,7 @@ class MappingPurge(BaseApp): if CONF.command.local_id is not None: mapping['local_id'] = CONF.command.local_id if CONF.command.type is not None: - mapping['type'] = CONF.command.type + mapping['entity_type'] = CONF.command.type mapping_manager.purge_mappings(mapping) diff --git a/keystone/tests/unit/test_cli.py b/keystone/tests/unit/test_cli.py index 247df99f87..7b296ab02b 100644 --- a/keystone/tests/unit/test_cli.py +++ b/keystone/tests/unit/test_cli.py @@ -1386,6 +1386,209 @@ class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase): args.append(uuid.uuid4().hex) self.parser.parse_args(args) + @mock.patch.object(keystone.identity.MappingManager, 'purge_mappings') + def test_mapping_purge_type_user(self, purge_mock): + # Make sure the logic in main() actually catches no argument error + self.command_type = 'user' + self.command_all = False + self.command_domain_name = None + self.command_local_id = uuid.uuid4().hex + self.command_public_id = uuid.uuid4().hex + self.useFixture(fixtures.MockPatchObject( + CONF, 'command', self.FakeConfCommand(self))) + + def fake_load_backends(): + return dict( + id_mapping_api=keystone.identity.core.MappingManager, + resource_api=None) + + self.useFixture(fixtures.MockPatch( + 'keystone.server.backends.load_backends', + side_effect=fake_load_backends)) + + cli.MappingPurge.main() + purge_mock.assert_called_with({'entity_type': 'user', + 'local_id': self.command_local_id, + 'public_id': self.command_public_id}) + + +class TestUserMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase): + + def setUp(self): + sqldb = self.useFixture(database.Database()) + super(TestUserMappingPurgeFunctional, self).setUp() + self.ldapdb = self.useFixture(ldapdb.LDAPDatabase()) + self.ldapdb.clear() + + self.load_backends() + + sqldb.recreate() + self.load_fixtures(default_fixtures) + + def config_files(self): + self.config_fixture.register_cli_opt(cli.command_opt) + config_files = super( + TestUserMappingPurgeFunctional, self + ).config_files() + config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf')) + return config_files + + def config_overrides(self): + super(TestUserMappingPurgeFunctional, self).config_overrides() + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='identity_mapping', + backward_compatible_ids=False) + + def config(self, config_files): + CONF(args=['mapping_purge', '--type', 'user'], + project='keystone', + default_config_files=config_files) + + def test_purge_by_user_type(self): + # Grab the list of the users from the backend directly to avoid + # populating the public_ids for each user. We do this so we can grab + # the local_id of a user before it's overwritten by the public_id. + hints = None + users = PROVIDERS.identity_api.driver.list_users(hints) + + # Create a new group in the backend directly. We do this so that we + # have control over the local_id, which is `id` here. After creating + # the group, let's list them so the id_mapping_api creates the public + # id appropriately. + group_ref = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': CONF.identity.default_domain_id + } + PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref) + PROVIDERS.identity_api.list_groups() + + # Make sure all users and groups have public ids by querying the + # id_mapping_api. + for user in users: + local_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': user['id'], + 'entity_type': identity_mapping.EntityType.USER} + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(local_entity)) + + group_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': group_ref['id'], + 'entity_type': identity_mapping.EntityType.GROUP} + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(group_entity) + ) + + # Purge all users mappings + provider_api.ProviderAPIs._clear_registry_instances() + cli.MappingPurge.main() + + # Check that all the user mappings were purged + for user in users: + local_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': user['id'], + 'entity_type': identity_mapping.EntityType.USER} + self.assertIsNone( + PROVIDERS.id_mapping_api.get_public_id(local_entity) + ) + + # Make sure the group mapping still exists + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(group_entity) + ) + + +class TestGroupMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase): + + def setUp(self): + sqldb = self.useFixture(database.Database()) + super(TestGroupMappingPurgeFunctional, self).setUp() + self.ldapdb = self.useFixture(ldapdb.LDAPDatabase()) + self.ldapdb.clear() + + self.load_backends() + + sqldb.recreate() + self.load_fixtures(default_fixtures) + + def config_files(self): + self.config_fixture.register_cli_opt(cli.command_opt) + config_files = super( + TestGroupMappingPurgeFunctional, self + ).config_files() + config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf')) + return config_files + + def config_overrides(self): + super(TestGroupMappingPurgeFunctional, self).config_overrides() + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='identity_mapping', + backward_compatible_ids=False) + + def config(self, config_files): + CONF(args=['mapping_purge', '--type', 'group'], + project='keystone', + default_config_files=config_files) + + def test_purge_by_group_type(self): + # Grab the list of the users from the backend directly to avoid + # populating the public_ids for each user. We do this so we can grab + # the local_id of a user before it's overwritten by the public_id. + hints = None + users = PROVIDERS.identity_api.driver.list_users(hints) + + # Create a new group in the backend directly. We do this so that we + # have control over the local_id, which is `id` here. After creating + # the group, let's list them so the id_mapping_api creates the public + # id appropriately. + group_ref = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': CONF.identity.default_domain_id + } + PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref) + PROVIDERS.identity_api.list_groups() + + # Make sure all users and groups have public ids by querying the + # id_mapping_api. + for user in users: + local_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': user['id'], + 'entity_type': identity_mapping.EntityType.USER} + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(local_entity)) + + group_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': group_ref['id'], + 'entity_type': identity_mapping.EntityType.GROUP} + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(group_entity) + ) + + # Purge group mappings + provider_api.ProviderAPIs._clear_registry_instances() + cli.MappingPurge.main() + + # Make sure the group mapping was purged + self.assertIsNone( + PROVIDERS.id_mapping_api.get_public_id(group_entity) + ) + + # Check that all the user mappings still exist + for user in users: + local_entity = { + 'domain_id': CONF.identity.default_domain_id, + 'local_id': user['id'], + 'entity_type': identity_mapping.EntityType.USER} + self.assertIsNotNone( + PROVIDERS.id_mapping_api.get_public_id(local_entity) + ) + class TestTokenFlush(unit.TestCase): diff --git a/releasenotes/notes/bug-1757022-664d0b0db1242bf8.yaml b/releasenotes/notes/bug-1757022-664d0b0db1242bf8.yaml new file mode 100644 index 0000000000..745d7e43c8 --- /dev/null +++ b/releasenotes/notes/bug-1757022-664d0b0db1242bf8.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + [`bug 1757022 `_] + In previous releases, ``keystone-manage mapping_purge --type {user,group}`` + command would purge all mapping incorrectly instead of only purging the + specified type mappings. ``keystone-manage mapping_purge --type {user,group}`` + now purges only specified type mappings as expected.