identity: Remove duplicated _find_sdk_id method

We have a few instances of this. Settle on one.

Change-Id: Id115fea1c59ad75ec8e00d665e587020f7177a55
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-11-14 11:48:01 +00:00
parent a5e4d5f0fa
commit 55fd501657
4 changed files with 130 additions and 141 deletions

View File

@@ -82,32 +82,12 @@ def _add_identity_and_resource_options_to_parser(parser):
common.add_inherited_option_to_parser(parser)
def _find_sdk_id(
find_command, name_or_id, validate_actor_existence=True, **kwargs
):
try:
resource = find_command(
name_or_id=name_or_id, ignore_missing=False, **kwargs
)
# Mimic the behavior of
# openstackclient.identity.common._find_identity_resource()
# and ignore if we don't have permission to find a resource.
except sdk_exc.ForbiddenException:
return name_or_id
except sdk_exc.ResourceNotFound as exc:
if not validate_actor_existence:
return name_or_id
raise exceptions.CommandError from exc
return resource.id
def _process_identity_and_resource_options(
parsed_args, identity_client, validate_actor_existence=True
):
def _find_user():
domain_id = (
_find_sdk_id(
common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.user_domain,
validate_actor_existence=validate_actor_existence,
@@ -115,7 +95,7 @@ def _process_identity_and_resource_options(
if parsed_args.user_domain
else None
)
return _find_sdk_id(
return common._find_sdk_id(
identity_client.find_user,
name_or_id=parsed_args.user,
validate_actor_existence=validate_actor_existence,
@@ -124,7 +104,7 @@ def _process_identity_and_resource_options(
def _find_group():
domain_id = (
_find_sdk_id(
common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.group_domain,
validate_actor_existence=validate_actor_existence,
@@ -132,7 +112,7 @@ def _process_identity_and_resource_options(
if parsed_args.group_domain
else None
)
return _find_sdk_id(
return common._find_sdk_id(
identity_client.find_group,
name_or_id=parsed_args.group,
validate_actor_existence=validate_actor_existence,
@@ -141,7 +121,7 @@ def _process_identity_and_resource_options(
def _find_project():
domain_id = (
_find_sdk_id(
common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.project_domain,
validate_actor_existence=validate_actor_existence,
@@ -149,7 +129,7 @@ def _process_identity_and_resource_options(
if parsed_args.project_domain
else None
)
return _find_sdk_id(
return common._find_sdk_id(
identity_client.find_project,
name_or_id=parsed_args.project,
validate_actor_existence=validate_actor_existence,
@@ -162,7 +142,7 @@ def _process_identity_and_resource_options(
kwargs['system'] = parsed_args.system
elif parsed_args.user and parsed_args.domain:
kwargs['user'] = _find_user()
kwargs['domain'] = _find_sdk_id(
kwargs['domain'] = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.domain,
validate_actor_existence=validate_actor_existence,
@@ -175,7 +155,7 @@ def _process_identity_and_resource_options(
kwargs['system'] = parsed_args.system
elif parsed_args.group and parsed_args.domain:
kwargs['group'] = _find_group()
kwargs['domain'] = _find_sdk_id(
kwargs['domain'] = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.domain,
validate_actor_existence=validate_actor_existence,
@@ -228,10 +208,10 @@ class AddRole(command.Command):
domain_id = None
if parsed_args.role_domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain, name_or_id=parsed_args.role_domain
)
role = _find_sdk_id(
role = common._find_sdk_id(
identity_client.find_role,
name_or_id=parsed_args.role,
domain_id=domain_id,
@@ -328,7 +308,7 @@ class CreateRole(command.ShowOne):
create_kwargs = {}
if parsed_args.domain:
create_kwargs['domain_id'] = _find_sdk_id(
create_kwargs['domain_id'] = common._find_sdk_id(
identity_client.find_domain, name_or_id=parsed_args.domain
)
@@ -381,13 +361,13 @@ class DeleteRole(command.Command):
domain_id = None
if parsed_args.domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain, parsed_args.domain
)
errors = 0
for role in parsed_args.roles:
try:
role_id = _find_sdk_id(
role_id = common._find_sdk_id(
identity_client.find_role,
name_or_id=role,
domain_id=domain_id,
@@ -482,11 +462,11 @@ class RemoveRole(command.Command):
domain_id = None
if parsed_args.role_domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.role_domain,
)
role = _find_sdk_id(
role = common._find_sdk_id(
identity_client.find_role,
name_or_id=parsed_args.role,
domain_id=domain_id,
@@ -582,7 +562,7 @@ class SetRole(command.Command):
domain_id = None
if parsed_args.domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.domain,
)
@@ -591,7 +571,7 @@ class SetRole(command.Command):
if parsed_args.immutable is not None:
update_kwargs["options"] = {"immutable": parsed_args.immutable}
role = _find_sdk_id(
role = common._find_sdk_id(
identity_client.find_role,
name_or_id=parsed_args.role,
domain_id=domain_id,
@@ -623,7 +603,7 @@ class ShowRole(command.ShowOne):
domain_id = None
if parsed_args.domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.domain,
)

View File

@@ -13,7 +13,6 @@
"""Identity v3 Assignment action implementations"""
from openstack import exceptions as sdk_exceptions
from osc_lib.command import command
from openstackclient.i18n import _
@@ -51,15 +50,6 @@ def _format_role_assignment_(assignment, include_names):
)
def _find_sdk_id(find_command, name_or_id, **kwargs):
try:
return find_command(
name_or_id=name_or_id, ignore_missing=False, **kwargs
).id
except sdk_exceptions.ForbiddenException:
return name_or_id
class ListRoleAssignment(command.Lister):
_description = _("List role assignments")
@@ -135,12 +125,12 @@ class ListRoleAssignment(command.Lister):
role_id = None
role_domain_id = None
if parsed_args.role_domain:
role_domain_id = _find_sdk_id(
role_domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.role_domain,
)
if parsed_args.role:
role_id = _find_sdk_id(
role_id = common._find_sdk_id(
identity_client.find_role,
name_or_id=parsed_args.role,
domain_id=role_domain_id,
@@ -148,21 +138,21 @@ class ListRoleAssignment(command.Lister):
user_domain_id = None
if parsed_args.user_domain:
user_domain_id = _find_sdk_id(
user_domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.user_domain,
)
user_id = None
if parsed_args.user:
user_id = _find_sdk_id(
user_id = common._find_sdk_id(
identity_client.find_user,
name_or_id=parsed_args.user,
domain_id=user_domain_id,
)
elif parsed_args.authuser:
if auth_ref:
user_id = _find_sdk_id(
user_id = common._find_sdk_id(
identity_client.find_user,
name_or_id=auth_ref.user_id,
)
@@ -173,21 +163,21 @@ class ListRoleAssignment(command.Lister):
domain_id = None
if parsed_args.domain:
domain_id = _find_sdk_id(
domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.domain,
)
project_domain_id = None
if parsed_args.project_domain:
project_domain_id = _find_sdk_id(
project_domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.project_domain,
)
project_id = None
if parsed_args.project:
project_id = _find_sdk_id(
project_id = common._find_sdk_id(
identity_client.find_project,
name_or_id=common._get_token_resource(
identity_client, 'project', parsed_args.project
@@ -196,21 +186,21 @@ class ListRoleAssignment(command.Lister):
)
elif parsed_args.authproject:
if auth_ref:
project_id = _find_sdk_id(
project_id = common._find_sdk_id(
identity_client.find_project,
name_or_id=auth_ref.project_id,
)
group_domain_id = None
if parsed_args.group_domain:
group_domain_id = _find_sdk_id(
group_domain_id = common._find_sdk_id(
identity_client.find_domain,
name_or_id=parsed_args.group_domain,
)
group_id = None
if parsed_args.group:
group_id = _find_sdk_id(
group_id = common._find_sdk_id(
identity_client.find_group,
name_or_id=parsed_args.group,
domain_id=group_domain_id,

View File

@@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from openstack import exceptions as sdk_exc
from openstack.identity.v3 import user as _user
from openstack.test import fakes as sdk_fakes
from osc_lib import exceptions
from openstackclient.identity import common
from openstackclient.tests.unit import utils as test_utils
class TestFindSDKId(test_utils.TestCase):
def setUp(self):
super().setUp()
self.user = sdk_fakes.generate_fake_resource(_user.User)
self.identity_sdk_client = mock.Mock()
self.identity_sdk_client.find_user = mock.Mock()
def test_find_sdk_id_validate(self):
self.identity_sdk_client.find_user.side_effect = [self.user]
result = common._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [self.user]
result = common._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_not_found_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ResourceNotFound,
]
self.assertRaises(
exceptions.CommandError,
common._find_sdk_id,
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
def test_find_sdk_id_not_found_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ResourceNotFound,
]
result = common._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_forbidden_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ForbiddenException,
]
result = common._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_forbidden_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ForbiddenException,
]
result = common._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)

View File

@@ -15,8 +15,6 @@
from unittest import mock
from osc_lib import exceptions
from openstack import exceptions as sdk_exc
from openstack.identity.v3 import domain as _domain
from openstack.identity.v3 import group as _group
@@ -25,10 +23,10 @@ from openstack.identity.v3 import role as _role
from openstack.identity.v3 import system as _system
from openstack.identity.v3 import user as _user
from openstack.test import fakes as sdk_fakes
from osc_lib import exceptions
from openstackclient.identity.v3 import role
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit import utils as test_utils
class TestRoleInherited(identity_fakes.TestIdentityv3):
@@ -36,85 +34,6 @@ class TestRoleInherited(identity_fakes.TestIdentityv3):
return True
class TestFindSDKId(test_utils.TestCase):
def setUp(self):
super().setUp()
self.user = sdk_fakes.generate_fake_resource(_user.User)
self.identity_sdk_client = mock.Mock()
self.identity_sdk_client.find_user = mock.Mock()
def test_find_sdk_id_validate(self):
self.identity_sdk_client.find_user.side_effect = [self.user]
result = role._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [self.user]
result = role._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_not_found_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ResourceNotFound,
]
self.assertRaises(
exceptions.CommandError,
role._find_sdk_id,
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
def test_find_sdk_id_not_found_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ResourceNotFound,
]
result = role._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_forbidden_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ForbiddenException,
]
result = role._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=True,
)
self.assertEqual(self.user.id, result)
def test_find_sdk_id_forbidden_no_validate(self):
self.identity_sdk_client.find_user.side_effect = [
sdk_exc.ForbiddenException,
]
result = role._find_sdk_id(
self.identity_sdk_client.find_user,
name_or_id=self.user.id,
validate_actor_existence=False,
)
self.assertEqual(self.user.id, result)
class TestRoleAdd(identity_fakes.TestIdentityv3):
def _is_inheritance_testcase(self):
return False