Merge "identity: Migrate 'trust' commands to SDK"

This commit is contained in:
Zuul 2024-12-19 21:33:28 +00:00 committed by Gerrit Code Review
commit 6ce1f7730c
3 changed files with 257 additions and 228 deletions

View File

@ -14,9 +14,10 @@
"""Identity v3 Trust action implementations"""
import datetime
import itertools
import logging
from keystoneclient import exceptions as identity_exc
from openstack import exceptions as sdk_exceptions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
@ -28,6 +29,25 @@ from openstackclient.identity import common
LOG = logging.getLogger(__name__)
def _format_trust(trust):
columns = (
'expires_at',
'id',
'is_impersonation',
'project_id',
'redelegated_trust_id',
'redelegation_count',
'remaining_uses',
'roles',
'trustee_user_id',
'trustor_user_id',
)
return (
columns,
utils.get_item_properties(trust, columns),
)
class CreateTrust(command.ShowOne):
_description = _("Create new trust")
@ -52,6 +72,7 @@ class CreateTrust(command.ShowOne):
parser.add_argument(
'--role',
metavar='<role>',
dest='roles',
action='append',
default=[],
help=_(
@ -62,7 +83,7 @@ class CreateTrust(command.ShowOne):
)
parser.add_argument(
'--impersonate',
dest='impersonate',
dest='is_impersonation',
action='store_true',
default=False,
help=_(
@ -92,58 +113,60 @@ class CreateTrust(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
kwargs = {}
# NOTE(stevemar): Find the two users, project and roles that
# are necessary for making a trust usable, the API dictates that
# trustee, project and role are optional, but that makes the trust
# pointless, and trusts are immutable, so let's enforce it at the
# client level.
trustor_id = common.find_user(
identity_client, parsed_args.trustor, parsed_args.trustor_domain
).id
trustee_id = common.find_user(
identity_client, parsed_args.trustee, parsed_args.trustee_domain
).id
project_id = common.find_project(
identity_client, parsed_args.project, parsed_args.project_domain
).id
try:
trustor_id = identity_client.find_user(
parsed_args.trustor, parsed_args.trustor_domain
).id
kwargs['trustor_id'] = trustor_id
except sdk_exceptions.ForbiddenException:
kwargs['trustor_id'] = parsed_args.trustor
try:
trustee_id = identity_client.find_user(
parsed_args.trustee, parsed_args.trustee_domain
).id
kwargs['trustee_id'] = trustee_id
except sdk_exceptions.ForbiddenException:
kwargs['trustee_id'] = parsed_args.trustee
try:
project_id = identity_client.find_project(
parsed_args.project, parsed_args.project_domain
).id
kwargs['project_id'] = project_id
except sdk_exceptions.ForbiddenException:
kwargs['project_id'] = parsed_args.project
role_ids = []
for role in parsed_args.role:
for role in parsed_args.roles:
try:
role_id = utils.find_resource(
identity_client.roles,
role,
).id
except identity_exc.Forbidden:
role_id = identity_client.find_role(role).id
except sdk_exceptions.ForbiddenException:
role_id = role
role_ids.append(role_id)
kwargs['roles'] = role_ids
expires_at = None
if parsed_args.expiration:
expires_at = datetime.datetime.strptime(
parsed_args.expiration, '%Y-%m-%dT%H:%M:%S'
)
kwargs['expires_at'] = expires_at
trust = identity_client.trusts.create(
trustee_id,
trustor_id,
impersonation=parsed_args.impersonate,
project=project_id,
role_ids=role_ids,
expires_at=expires_at,
)
if parsed_args.is_impersonation:
kwargs['is_impersonation'] = parsed_args.is_impersonation
trust._info.pop('roles_links', None)
trust._info.pop('links', None)
trust = identity_client.create_trust(**kwargs)
# Format roles into something sensible
roles = trust._info.pop('roles')
msg = ' '.join(r['name'] for r in roles)
trust._info['roles'] = msg
return zip(*sorted(trust._info.items()))
return _format_trust(trust)
class DeleteTrust(command.Command):
@ -160,13 +183,15 @@ class DeleteTrust(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
errors = 0
for trust in parsed_args.trust:
try:
trust_obj = utils.find_resource(identity_client.trusts, trust)
identity_client.trusts.delete(trust_obj.id)
trust_obj = identity_client.find_trust(
trust, ignore_missing=False
)
identity_client.delete_trust(trust_obj.id)
except Exception as e:
errors += 1
LOG.error(
@ -220,7 +245,7 @@ class ListTrust(command.Lister):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
auth_ref = self.app.client_manager.auth_ref
if parsed_args.authuser and any(
@ -243,38 +268,50 @@ class ListTrust(command.Lister):
raise exceptions.CommandError(msg)
if parsed_args.authuser:
if auth_ref:
user = common.find_user(identity_client, auth_ref.user_id)
# We need two calls here as we want trusts with
# either the trustor or the trustee set to current user
# using a single call would give us trusts with both
# trustee and trustor set to current user
data1 = identity_client.trusts.list(trustor_user=user)
data2 = identity_client.trusts.list(trustee_user=user)
data = set(data1 + data2)
# We need two calls here as we want trusts with
# either the trustor or the trustee set to current user
# using a single call would give us trusts with both
# trustee and trustor set to current user
data = list(
{
x.id: x
for x in itertools.chain(
identity_client.trusts(
trustor_user_id=auth_ref.user_id
),
identity_client.trusts(
trustee_user_id=auth_ref.user_id
),
)
}.values()
)
else:
trustor = None
if parsed_args.trustor:
trustor = common.find_user(
identity_client,
parsed_args.trustor,
parsed_args.trustor_domain,
)
try:
trustor_id = identity_client.find_user(
parsed_args.trustor, parsed_args.trustor_domain
).id
trustor = trustor_id
except sdk_exceptions.ForbiddenException:
trustor = parsed_args.trustor
trustee = None
if parsed_args.trustee:
trustee = common.find_user(
identity_client,
parsed_args.trustor,
parsed_args.trustor_domain,
)
try:
trustee_id = identity_client.find_user(
parsed_args.trustee, parsed_args.trustee_domain
).id
trustee = trustee_id
except sdk_exceptions.ForbiddenException:
trustee = parsed_args.trustee
data = self.app.client_manager.identity.trusts.list(
trustor_user=trustor,
trustee_user=trustee,
data = identity_client.trusts(
trustor_user_id=trustor,
trustee_user_id=trustee,
)
columns = (
column_headers = (
'ID',
'Expires At',
'Impersonation',
@ -282,9 +319,17 @@ class ListTrust(command.Lister):
'Trustee User ID',
'Trustor User ID',
)
columns = (
'id',
'expires_at',
'is_impersonation',
'project_id',
'trustee_user_id',
'trustor_user_id',
)
return (
columns,
column_headers,
(
utils.get_item_properties(
s,
@ -309,15 +354,9 @@ class ShowTrust(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
trust = utils.find_resource(identity_client.trusts, parsed_args.trust)
identity_client = self.app.client_manager.sdk_connection.identity
trust = identity_client.find_trust(
parsed_args.trust, ignore_missing=False
)
trust._info.pop('roles_links', None)
trust._info.pop('links', None)
# Format roles into something sensible
roles = trust._info.pop('roles')
msg = ' '.join(r['name'] for r in roles)
trust._info['roles'] = msg
return zip(*sorted(trust._info.items()))
return _format_trust(trust)

View File

@ -11,58 +11,36 @@
# under the License.
#
import copy
from unittest import mock
from osc_lib import exceptions
from osc_lib import utils
from openstack import exceptions as sdk_exceptions
from openstack.identity.v3 import project as _project
from openstack.identity.v3 import role as _role
from openstack.identity.v3 import trust as _trust
from openstack.identity.v3 import user as _user
from openstack.test import fakes as sdk_fakes
from openstackclient.identity.v3 import trust
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
class TestTrust(identity_fakes.TestIdentityv3):
class TestTrustCreate(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
self.trusts_mock = self.identity_client.trusts
self.trusts_mock.reset_mock()
self.projects_mock = self.identity_client.projects
self.projects_mock.reset_mock()
self.users_mock = self.identity_client.users
self.users_mock.reset_mock()
self.roles_mock = self.identity_client.roles
self.roles_mock.reset_mock()
self.trust = sdk_fakes.generate_fake_resource(_trust.Trust)
self.identity_sdk_client.create_trust.return_value = self.trust
self.project = sdk_fakes.generate_fake_resource(_project.Project)
self.identity_sdk_client.find_project.return_value = self.project
class TestTrustCreate(TestTrust):
def setUp(self):
super().setUp()
self.user = sdk_fakes.generate_fake_resource(_user.User)
self.identity_sdk_client.find_user.return_value = self.user
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
)
self.users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
self.trusts_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.TRUST),
loaded=True,
)
self.role = sdk_fakes.generate_fake_resource(_role.Role)
self.identity_sdk_client.find_role.return_value = self.role
# Get the command object to test
self.cmd = trust.CreateTrust(self.app, None)
@ -70,18 +48,17 @@ class TestTrustCreate(TestTrust):
def test_trust_create_basic(self):
arglist = [
'--project',
identity_fakes.project_id,
self.project.id,
'--role',
identity_fakes.role_id,
identity_fakes.user_id,
identity_fakes.user_id,
self.role.id,
self.user.id,
self.user.id,
]
verifylist = [
('project', identity_fakes.project_id),
('impersonate', False),
('role', [identity_fakes.role_id]),
('trustor', identity_fakes.user_id),
('trustee', identity_fakes.user_id),
('project', self.project.id),
('roles', [self.role.id]),
('trustor', self.user.id),
('trustee', self.user.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -92,76 +69,71 @@ class TestTrustCreate(TestTrust):
# Set expected values
kwargs = {
'impersonation': False,
'project': identity_fakes.project_id,
'role_ids': [identity_fakes.role_id],
'expires_at': None,
'project_id': self.project.id,
'roles': [self.role.id],
}
# TrustManager.create(trustee_id, trustor_id, impersonation=,
# project=, role_names=, expires_at=)
self.trusts_mock.create.assert_called_with(
identity_fakes.user_id, identity_fakes.user_id, **kwargs
self.identity_sdk_client.create_trust.assert_called_with(
trustor_id=self.user.id, trustee_id=self.user.id, **kwargs
)
collist = (
'expires_at',
'id',
'impersonation',
'is_impersonation',
'project_id',
'redelegated_trust_id',
'redelegation_count',
'remaining_uses',
'roles',
'trustee_user_id',
'trustor_user_id',
)
self.assertEqual(collist, columns)
datalist = (
identity_fakes.trust_expires,
identity_fakes.trust_id,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.expires_at,
self.trust.id,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.redelegated_trust_id,
self.trust.redelegation_count,
self.trust.remaining_uses,
self.trust.roles,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
)
self.assertEqual(datalist, data)
class TestTrustDelete(TestTrust):
class TestTrustDelete(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
# This is the return value for utils.find_resource()
self.trusts_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.TRUST),
loaded=True,
)
self.trusts_mock.delete.return_value = None
self.trust = sdk_fakes.generate_fake_resource(_trust.Trust)
self.identity_sdk_client.delete_trust.return_value = None
self.identity_sdk_client.find_trust.return_value = self.trust
# Get the command object to test
self.cmd = trust.DeleteTrust(self.app, None)
def test_trust_delete(self):
arglist = [
identity_fakes.trust_id,
self.trust.id,
]
verifylist = [('trust', [identity_fakes.trust_id])]
verifylist = [('trust', [self.trust.id])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.trusts_mock.delete.assert_called_with(
identity_fakes.trust_id,
self.identity_sdk_client.delete_trust.assert_called_with(
self.trust.id,
)
self.assertIsNone(result)
@mock.patch.object(utils, 'find_resource')
def test_delete_multi_trusts_with_exception(self, find_mock):
find_mock.side_effect = [
self.trusts_mock.get.return_value,
exceptions.CommandError,
]
def test_delete_multi_trusts_with_exception(self):
arglist = [
identity_fakes.trust_id,
self.trust.id,
'unexist_trust',
]
verifylist = [
@ -169,32 +141,37 @@ class TestTrustDelete(TestTrust):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.identity_sdk_client.find_trust.side_effect = [
self.trust,
sdk_exceptions.ResourceNotFound,
]
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 trusts failed to delete.', str(e))
find_mock.assert_any_call(self.trusts_mock, identity_fakes.trust_id)
find_mock.assert_any_call(self.trusts_mock, 'unexist_trust')
self.assertEqual(2, find_mock.call_count)
self.trusts_mock.delete.assert_called_once_with(
identity_fakes.trust_id
self.identity_sdk_client.find_trust.assert_has_calls(
[
mock.call(self.trust.id, ignore_missing=False),
mock.call('unexist_trust', ignore_missing=False),
]
)
self.identity_sdk_client.delete_trust.assert_called_once_with(
self.trust.id
)
class TestTrustList(TestTrust):
class TestTrustList(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
self.trusts_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.TRUST),
loaded=True,
),
]
self.trust = sdk_fakes.generate_fake_resource(_trust.Trust)
self.identity_sdk_client.trusts.return_value = [self.trust]
self.user = sdk_fakes.generate_fake_resource(_user.User)
self.identity_sdk_client.find_user.return_value = self.user
# Get the command object to test
self.cmd = trust.ListTrust(self.app, None)
@ -209,9 +186,9 @@ class TestTrustList(TestTrust):
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
self.trusts_mock.list.assert_called_with(
trustor_user=None,
trustee_user=None,
self.identity_sdk_client.trusts.assert_called_with(
trustor_user_id=None,
trustee_user_id=None,
)
collist = (
@ -225,12 +202,12 @@ class TestTrustList(TestTrust):
self.assertEqual(collist, columns)
datalist = (
(
identity_fakes.trust_id,
identity_fakes.trust_expires,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.id,
self.trust.expires_at,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
),
)
self.assertEqual(datalist, tuple(data))
@ -238,7 +215,6 @@ class TestTrustList(TestTrust):
def test_trust_list_auth_user(self):
self.app.client_manager.auth_ref = mock.Mock()
auth_ref = self.app.client_manager.auth_ref
auth_ref.user_id.return_value = identity_fakes.user_id
arglist = ['--auth-user']
verifylist = [
@ -253,11 +229,11 @@ class TestTrustList(TestTrust):
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
self.trusts_mock.list.assert_any_call(
trustor_user=self.users_mock.get()
)
self.trusts_mock.list.assert_any_call(
trustee_user=self.users_mock.get()
self.identity_sdk_client.trusts.assert_has_calls(
[
mock.call(trustor_user_id=auth_ref.user_id),
mock.call(trustee_user_id=auth_ref.user_id),
]
)
collist = (
@ -271,21 +247,21 @@ class TestTrustList(TestTrust):
self.assertEqual(collist, columns)
datalist = (
(
identity_fakes.trust_id,
identity_fakes.trust_expires,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.id,
self.trust.expires_at,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
),
)
self.assertEqual(datalist, tuple(data))
def test_trust_list_trustee(self):
arglist = ['--trustee', identity_fakes.user_name]
arglist = ['--trustee', self.user.name]
verifylist = [
('trustor', None),
('trustee', identity_fakes.user_name),
('trustee', self.user.name),
('authuser', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -295,9 +271,9 @@ class TestTrustList(TestTrust):
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
self.trusts_mock.list.assert_any_call(
trustee_user=self.users_mock.get(),
trustor_user=None,
self.identity_sdk_client.trusts.assert_called_with(
trustee_user_id=self.user.id,
trustor_user_id=None,
)
collist = (
@ -311,21 +287,21 @@ class TestTrustList(TestTrust):
self.assertEqual(collist, columns)
datalist = (
(
identity_fakes.trust_id,
identity_fakes.trust_expires,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.id,
self.trust.expires_at,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
),
)
self.assertEqual(datalist, tuple(data))
def test_trust_list_trustor(self):
arglist = ['--trustor', identity_fakes.user_name]
arglist = ['--trustor', self.user.name]
verifylist = [
('trustee', None),
('trustor', identity_fakes.user_name),
('trustor', self.user.name),
('authuser', False),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -335,9 +311,9 @@ class TestTrustList(TestTrust):
# containing the data to be listed.
columns, data = self.cmd.take_action(parsed_args)
self.trusts_mock.list.assert_any_call(
trustor_user=self.users_mock.get(),
trustee_user=None,
self.identity_sdk_client.trusts.assert_called_once_with(
trustor_user_id=self.user.id,
trustee_user_id=None,
)
collist = (
@ -351,36 +327,33 @@ class TestTrustList(TestTrust):
self.assertEqual(collist, columns)
datalist = (
(
identity_fakes.trust_id,
identity_fakes.trust_expires,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.id,
self.trust.expires_at,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
),
)
self.assertEqual(datalist, tuple(data))
class TestTrustShow(TestTrust):
class TestTrustShow(identity_fakes.TestIdentityv3):
def setUp(self):
super().setUp()
self.trusts_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.TRUST),
loaded=True,
)
self.trust = sdk_fakes.generate_fake_resource(_trust.Trust)
self.identity_sdk_client.find_trust.return_value = self.trust
# Get the command object to test
self.cmd = trust.ShowTrust(self.app, None)
def test_trust_show(self):
arglist = [
identity_fakes.trust_id,
self.trust.id,
]
verifylist = [
('trust', identity_fakes.trust_id),
('trust', self.trust.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -389,25 +362,33 @@ class TestTrustShow(TestTrust):
# data to be shown.
columns, data = self.cmd.take_action(parsed_args)
self.trusts_mock.get.assert_called_with(identity_fakes.trust_id)
self.identity_sdk_client.find_trust.assert_called_with(
self.trust.id, ignore_missing=False
)
collist = (
'expires_at',
'id',
'impersonation',
'is_impersonation',
'project_id',
'redelegated_trust_id',
'redelegation_count',
'remaining_uses',
'roles',
'trustee_user_id',
'trustor_user_id',
)
self.assertEqual(collist, columns)
datalist = (
identity_fakes.trust_expires,
identity_fakes.trust_id,
identity_fakes.trust_impersonation,
identity_fakes.project_id,
identity_fakes.role_name,
identity_fakes.user_id,
identity_fakes.user_id,
self.trust.expires_at,
self.trust.id,
self.trust.is_impersonation,
self.trust.project_id,
self.trust.redelegated_trust_id,
self.trust.redelegation_count,
self.trust.remaining_uses,
self.trust.roles,
self.trust.trustee_user_id,
self.trust.trustor_user_id,
)
self.assertEqual(datalist, data)

View File

@ -0,0 +1,9 @@
---
upgrade:
- |
The following commands have been migrated to SDK:
- ``trust create``
- ``trust list``
- ``trust delete``
- ``trust show``