Identity: Migrate 'federation protocol' commands to SDK

Change-Id: I4f1303b7b6c7ea2e67e5770745db060f08ba7761
Signed-off-by: 0weng <oweng@osuosl.org>
This commit is contained in:
0weng
2026-02-06 13:56:46 -08:00
parent 911e643f2c
commit d135994772
2 changed files with 136 additions and 128 deletions

View File

@@ -26,6 +26,15 @@ from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
def _format_protocol(protocol):
columns = ('name', 'idp_id', 'mapping_id')
column_headers = ('id', 'identity_provider', 'mapping')
return (
column_headers,
utils.get_item_properties(protocol, columns),
)
class CreateProtocol(command.ShowOne):
_description = _("Create new federation protocol")
@@ -58,21 +67,15 @@ class CreateProtocol(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
protocol = identity_client.federation.protocols.create(
protocol_id=parsed_args.federation_protocol,
identity_provider=parsed_args.identity_provider,
mapping=parsed_args.mapping,
identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.create_federation_protocol(
name=parsed_args.federation_protocol,
idp_id=parsed_args.identity_provider,
mapping_id=parsed_args.mapping,
)
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response
# from Keystone, however it should be listed to the user. Add it
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
info.pop('links', None)
return zip(*sorted(info.items()))
return _format_protocol(protocol)
class DeleteProtocol(command.Command):
@@ -99,12 +102,15 @@ class DeleteProtocol(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
result = 0
for i in parsed_args.federation_protocol:
try:
identity_client.federation.protocols.delete(
parsed_args.identity_provider, i
identity_client.delete_federation_protocol(
idp_id=parsed_args.identity_provider,
protocol=i,
ignore_missing=False,
)
except Exception as e:
result += 1
@@ -140,9 +146,9 @@ class ListProtocols(command.Lister):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
protocols = identity_client.federation.protocols.list(
protocols = identity_client.federation_protocols(
parsed_args.identity_provider
)
columns = ('id', 'mapping')
@@ -181,21 +187,16 @@ class SetProtocol(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.federation.protocols.update(
parsed_args.identity_provider,
parsed_args.federation_protocol,
parsed_args.mapping,
)
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response
# from Keystone, however it should be listed to the user. Add it
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
return zip(*sorted(info.items()))
kwargs = {'idp_id': parsed_args.identity_provider}
if parsed_args.federation_protocol:
kwargs['name'] = parsed_args.federation_protocol
if parsed_args.mapping:
kwargs['mapping_id'] = parsed_args.mapping
protocol = identity_client.update_federation_protocol(**kwargs)
return _format_protocol(protocol)
class ShowProtocol(command.ShowOne):
@@ -220,12 +221,10 @@ class ShowProtocol(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.federation.protocols.get(
parsed_args.identity_provider, parsed_args.federation_protocol
protocol = identity_client.get_federation_protocol(
idp_id=parsed_args.identity_provider,
protocol=parsed_args.federation_protocol,
)
info = dict(protocol._info)
info['mapping'] = info.pop('mapping_id')
info.pop('links', None)
return zip(*sorted(info.items()))
return _format_protocol(protocol)

View File

@@ -12,202 +12,211 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from openstack.identity.v3 import federation_protocol as _federation_protocol
from openstack.identity.v3 import mapping as _mapping
from openstack.test import fakes as sdk_fakes
from openstackclient.identity.v3 import federation_protocol
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
class TestProtocol(identity_fakes.TestFederatedIdentity):
class TestProtocolCreate(identity_fakes.TestFederatedIdentity):
def setUp(self):
super().setUp()
federation_lib = self.identity_client.federation
self.protocols_mock = federation_lib.protocols
self.protocols_mock.reset_mock()
class TestProtocolCreate(TestProtocol):
def setUp(self):
super().setUp()
proto = copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT)
resource = fakes.FakeResource(None, proto, loaded=True)
self.protocols_mock.create.return_value = resource
self.proto = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol
)
self.identity_sdk_client.create_federation_protocol.return_value = (
self.proto
)
self.cmd = federation_protocol.CreateProtocol(self.app, None)
def test_create_protocol(self):
argslist = [
identity_fakes.protocol_id,
self.proto.name,
'--identity-provider',
identity_fakes.idp_id,
self.proto.idp_id,
'--mapping',
identity_fakes.mapping_id,
self.proto.mapping_id,
]
verifylist = [
('federation_protocol', identity_fakes.protocol_id),
('identity_provider', identity_fakes.idp_id),
('mapping', identity_fakes.mapping_id),
('federation_protocol', self.proto.name),
('identity_provider', self.proto.idp_id),
('mapping', self.proto.mapping_id),
]
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.create.assert_called_with(
protocol_id=identity_fakes.protocol_id,
identity_provider=identity_fakes.idp_id,
mapping=identity_fakes.mapping_id,
self.identity_sdk_client.create_federation_protocol.assert_called_with(
name=self.proto.id,
idp_id=self.proto.idp_id,
mapping_id=self.proto.mapping_id,
)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (
identity_fakes.protocol_id,
identity_fakes.idp_id,
identity_fakes.mapping_id,
self.proto.id,
self.proto.idp_id,
self.proto.mapping_id,
)
self.assertEqual(datalist, data)
class TestProtocolDelete(TestProtocol):
class TestProtocolDelete(identity_fakes.TestFederatedIdentity):
def setUp(self):
super().setUp()
# This is the return value for utils.find_resource()
self.protocols_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT),
loaded=True,
self.proto = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol
)
self.protocols_mock.delete.return_value = None
self.identity_sdk_client.delete_federation_protocol.return_value = None
self.cmd = federation_protocol.DeleteProtocol(self.app, None)
def test_delete_identity_provider(self):
def test_delete_protocol(self):
arglist = [
'--identity-provider',
identity_fakes.idp_id,
identity_fakes.protocol_id,
self.proto.idp_id,
self.proto.name,
]
verifylist = [
('federation_protocol', [identity_fakes.protocol_id]),
('identity_provider', identity_fakes.idp_id),
('federation_protocol', [self.proto.id]),
('identity_provider', self.proto.idp_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.protocols_mock.delete.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id
self.identity_sdk_client.delete_federation_protocol.assert_called_with(
idp_id=self.proto.idp_id,
protocol=self.proto.id,
ignore_missing=False,
)
self.assertIsNone(result)
class TestProtocolList(TestProtocol):
class TestProtocolList(identity_fakes.TestFederatedIdentity):
def setUp(self):
super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True
self.proto1 = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol
)
self.protocols_mock.list.return_value = [
fakes.FakeResource(
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True
)
self.proto2 = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol, idp_id=self.proto1
)
self.identity_sdk_client.federation_protocols.return_value = [
self.proto1,
self.proto2,
]
self.cmd = federation_protocol.ListProtocols(self.app, None)
def test_list_protocols(self):
arglist = ['--identity-provider', identity_fakes.idp_id]
verifylist = [('identity_provider', identity_fakes.idp_id)]
arglist = ['--identity-provider', self.proto1.idp_id]
verifylist = [('identity_provider', self.proto1.idp_id)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.list.assert_called_with(identity_fakes.idp_id)
self.identity_sdk_client.federation_protocols.assert_called_with(
self.proto1.idp_id
)
self.assertEqual(columns, ('id', 'mapping'))
datalist = (
(
self.proto1.name,
self.proto1.mapping_id,
),
(
self.proto2.name,
self.proto2.mapping_id,
),
)
self.assertEqual(datalist, tuple(data))
class TestProtocolSet(TestProtocol):
class TestProtocolSet(identity_fakes.TestFederatedIdentity):
def setUp(self):
super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=True
self.proto = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol
)
self.protocols_mock.update.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT_UPDATED, loaded=True
self.mapping = sdk_fakes.generate_fake_resource(_mapping.Mapping)
self.identity_sdk_client.update_federation_protocol.return_value = (
self.proto
)
self.cmd = federation_protocol.SetProtocol(self.app, None)
def test_set_new_mapping(self):
arglist = [
identity_fakes.protocol_id,
self.proto.name,
'--identity-provider',
identity_fakes.idp_id,
self.proto.idp_id,
'--mapping',
identity_fakes.mapping_id,
self.mapping.name,
]
verifylist = [
('identity_provider', identity_fakes.idp_id),
('federation_protocol', identity_fakes.protocol_id),
('mapping', identity_fakes.mapping_id),
('identity_provider', self.proto.idp_id),
('federation_protocol', self.proto.name),
('mapping', self.mapping.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.update.assert_called_with(
identity_fakes.idp_id,
identity_fakes.protocol_id,
identity_fakes.mapping_id,
self.identity_sdk_client.update_federation_protocol.assert_called_with(
idp_id=self.proto.idp_id,
name=self.proto.name,
mapping_id=self.mapping.id,
)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (
identity_fakes.protocol_id,
identity_fakes.idp_id,
identity_fakes.mapping_id_updated,
self.proto.name,
self.proto.idp_id,
self.proto.mapping_id,
)
self.assertEqual(datalist, data)
class TestProtocolShow(TestProtocol):
class TestProtocolShow(identity_fakes.TestFederatedIdentity):
def setUp(self):
super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=False
self.proto = sdk_fakes.generate_fake_resource(
_federation_protocol.FederationProtocol
)
self.identity_sdk_client.get_federation_protocol.return_value = (
self.proto
)
self.cmd = federation_protocol.ShowProtocol(self.app, None)
def test_show_protocol(self):
arglist = [
identity_fakes.protocol_id,
self.proto.name,
'--identity-provider',
identity_fakes.idp_id,
self.proto.idp_id,
]
verifylist = [
('federation_protocol', identity_fakes.protocol_id),
('identity_provider', identity_fakes.idp_id),
('federation_protocol', self.proto.name),
('identity_provider', self.proto.idp_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.get.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id
self.identity_sdk_client.get_federation_protocol.assert_called_with(
idp_id=self.proto.idp_id, protocol=self.proto.name
)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (
identity_fakes.protocol_id,
identity_fakes.idp_id,
identity_fakes.mapping_id,
self.proto.name,
self.proto.idp_id,
self.proto.mapping_id,
)
self.assertEqual(datalist, data)