Merge "Identity: Migrate 'federation protocol' commands to SDK"

This commit is contained in:
Zuul
2026-02-13 15:21:45 +00:00
committed by Gerrit Code Review
2 changed files with 136 additions and 128 deletions

View File

@@ -26,6 +26,15 @@ from openstackclient.i18n import _
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _format_protocol(protocol):
columns = ('name', 'idp_id', 'mapping_id')
column_headers = ('id', 'identity_provider', 'mapping')
return (
column_headers,
utils.get_item_properties(protocol, columns),
)
class CreateProtocol(command.ShowOne): class CreateProtocol(command.ShowOne):
_description = _("Create new federation protocol") _description = _("Create new federation protocol")
@@ -58,21 +67,15 @@ class CreateProtocol(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.federation.protocols.create(
protocol_id=parsed_args.federation_protocol, protocol = identity_client.create_federation_protocol(
identity_provider=parsed_args.identity_provider, name=parsed_args.federation_protocol,
mapping=parsed_args.mapping, idp_id=parsed_args.identity_provider,
mapping_id=parsed_args.mapping,
) )
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response return _format_protocol(protocol)
# from Keystone, however it should be listed to the user. Add it
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
info.pop('links', None)
return zip(*sorted(info.items()))
class DeleteProtocol(command.Command): class DeleteProtocol(command.Command):
@@ -99,12 +102,15 @@ class DeleteProtocol(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.sdk_connection.identity
result = 0 result = 0
for i in parsed_args.federation_protocol: for i in parsed_args.federation_protocol:
try: try:
identity_client.federation.protocols.delete( identity_client.delete_federation_protocol(
parsed_args.identity_provider, i idp_id=parsed_args.identity_provider,
protocol=i,
ignore_missing=False,
) )
except Exception as e: except Exception as e:
result += 1 result += 1
@@ -140,9 +146,9 @@ class ListProtocols(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.sdk_connection.identity
protocols = identity_client.federation.protocols.list( protocols = identity_client.federation_protocols(
parsed_args.identity_provider parsed_args.identity_provider
) )
columns = ('id', 'mapping') columns = ('id', 'mapping')
@@ -181,21 +187,16 @@ class SetProtocol(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.federation.protocols.update( kwargs = {'idp_id': parsed_args.identity_provider}
parsed_args.identity_provider, if parsed_args.federation_protocol:
parsed_args.federation_protocol, kwargs['name'] = parsed_args.federation_protocol
parsed_args.mapping, if parsed_args.mapping:
) kwargs['mapping_id'] = parsed_args.mapping
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response protocol = identity_client.update_federation_protocol(**kwargs)
# from Keystone, however it should be listed to the user. Add it return _format_protocol(protocol)
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
return zip(*sorted(info.items()))
class ShowProtocol(command.ShowOne): class ShowProtocol(command.ShowOne):
@@ -220,12 +221,10 @@ class ShowProtocol(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.sdk_connection.identity
protocol = identity_client.federation.protocols.get( protocol = identity_client.get_federation_protocol(
parsed_args.identity_provider, parsed_args.federation_protocol idp_id=parsed_args.identity_provider,
protocol=parsed_args.federation_protocol,
) )
info = dict(protocol._info) return _format_protocol(protocol)
info['mapping'] = info.pop('mapping_id')
info.pop('links', None)
return zip(*sorted(info.items()))

View File

@@ -12,202 +12,211 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy from openstack.identity.v3 import federation_protocol as _federation_protocol
from openstack.identity.v3 import mapping as _mapping
from openstack.test import fakes as sdk_fakes
from openstackclient.identity.v3 import federation_protocol from openstackclient.identity.v3 import federation_protocol
from openstackclient.tests.unit import fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
class TestProtocol(identity_fakes.TestFederatedIdentity): class TestProtocolCreate(identity_fakes.TestFederatedIdentity):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
federation_lib = self.identity_client.federation self.proto = sdk_fakes.generate_fake_resource(
self.protocols_mock = federation_lib.protocols _federation_protocol.FederationProtocol
self.protocols_mock.reset_mock() )
self.identity_sdk_client.create_federation_protocol.return_value = (
self.proto
class TestProtocolCreate(TestProtocol): )
def setUp(self):
super().setUp()
proto = copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT)
resource = fakes.FakeResource(None, proto, loaded=True)
self.protocols_mock.create.return_value = resource
self.cmd = federation_protocol.CreateProtocol(self.app, None) self.cmd = federation_protocol.CreateProtocol(self.app, None)
def test_create_protocol(self): def test_create_protocol(self):
argslist = [ argslist = [
identity_fakes.protocol_id, self.proto.name,
'--identity-provider', '--identity-provider',
identity_fakes.idp_id, self.proto.idp_id,
'--mapping', '--mapping',
identity_fakes.mapping_id, self.proto.mapping_id,
] ]
verifylist = [ verifylist = [
('federation_protocol', identity_fakes.protocol_id), ('federation_protocol', self.proto.name),
('identity_provider', identity_fakes.idp_id), ('identity_provider', self.proto.idp_id),
('mapping', identity_fakes.mapping_id), ('mapping', self.proto.mapping_id),
] ]
parsed_args = self.check_parser(self.cmd, argslist, verifylist) parsed_args = self.check_parser(self.cmd, argslist, verifylist)
columns, data = self.cmd.take_action(parsed_args) columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.create.assert_called_with( self.identity_sdk_client.create_federation_protocol.assert_called_with(
protocol_id=identity_fakes.protocol_id, name=self.proto.id,
identity_provider=identity_fakes.idp_id, idp_id=self.proto.idp_id,
mapping=identity_fakes.mapping_id, mapping_id=self.proto.mapping_id,
) )
collist = ('id', 'identity_provider', 'mapping') collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns) self.assertEqual(collist, columns)
datalist = ( datalist = (
identity_fakes.protocol_id, self.proto.id,
identity_fakes.idp_id, self.proto.idp_id,
identity_fakes.mapping_id, self.proto.mapping_id,
) )
self.assertEqual(datalist, data) self.assertEqual(datalist, data)
class TestProtocolDelete(TestProtocol): class TestProtocolDelete(identity_fakes.TestFederatedIdentity):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
# This is the return value for utils.find_resource() self.proto = sdk_fakes.generate_fake_resource(
self.protocols_mock.get.return_value = fakes.FakeResource( _federation_protocol.FederationProtocol
None,
copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT),
loaded=True,
) )
self.identity_sdk_client.delete_federation_protocol.return_value = None
self.protocols_mock.delete.return_value = None
self.cmd = federation_protocol.DeleteProtocol(self.app, None) self.cmd = federation_protocol.DeleteProtocol(self.app, None)
def test_delete_identity_provider(self): def test_delete_protocol(self):
arglist = [ arglist = [
'--identity-provider', '--identity-provider',
identity_fakes.idp_id, self.proto.idp_id,
identity_fakes.protocol_id, self.proto.name,
] ]
verifylist = [ verifylist = [
('federation_protocol', [identity_fakes.protocol_id]), ('federation_protocol', [self.proto.id]),
('identity_provider', identity_fakes.idp_id), ('identity_provider', self.proto.idp_id),
] ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.protocols_mock.delete.assert_called_with( self.identity_sdk_client.delete_federation_protocol.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id idp_id=self.proto.idp_id,
protocol=self.proto.id,
ignore_missing=False,
) )
self.assertIsNone(result) self.assertIsNone(result)
class TestProtocolList(TestProtocol): class TestProtocolList(identity_fakes.TestFederatedIdentity):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource( self.proto1 = sdk_fakes.generate_fake_resource(
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True _federation_protocol.FederationProtocol
) )
self.proto2 = sdk_fakes.generate_fake_resource(
self.protocols_mock.list.return_value = [ _federation_protocol.FederationProtocol, idp_id=self.proto1
fakes.FakeResource( )
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True self.identity_sdk_client.federation_protocols.return_value = [
) self.proto1,
self.proto2,
] ]
self.cmd = federation_protocol.ListProtocols(self.app, None) self.cmd = federation_protocol.ListProtocols(self.app, None)
def test_list_protocols(self): def test_list_protocols(self):
arglist = ['--identity-provider', identity_fakes.idp_id] arglist = ['--identity-provider', self.proto1.idp_id]
verifylist = [('identity_provider', identity_fakes.idp_id)] verifylist = [('identity_provider', self.proto1.idp_id)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args) columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.list.assert_called_with(identity_fakes.idp_id) self.identity_sdk_client.federation_protocols.assert_called_with(
self.proto1.idp_id
)
self.assertEqual(columns, ('id', 'mapping'))
datalist = (
(
self.proto1.name,
self.proto1.mapping_id,
),
(
self.proto2.name,
self.proto2.mapping_id,
),
)
self.assertEqual(datalist, tuple(data))
class TestProtocolSet(TestProtocol): class TestProtocolSet(identity_fakes.TestFederatedIdentity):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource( self.proto = sdk_fakes.generate_fake_resource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=True _federation_protocol.FederationProtocol
) )
self.protocols_mock.update.return_value = fakes.FakeResource( self.mapping = sdk_fakes.generate_fake_resource(_mapping.Mapping)
None, identity_fakes.PROTOCOL_OUTPUT_UPDATED, loaded=True self.identity_sdk_client.update_federation_protocol.return_value = (
self.proto
) )
self.cmd = federation_protocol.SetProtocol(self.app, None) self.cmd = federation_protocol.SetProtocol(self.app, None)
def test_set_new_mapping(self): def test_set_new_mapping(self):
arglist = [ arglist = [
identity_fakes.protocol_id, self.proto.name,
'--identity-provider', '--identity-provider',
identity_fakes.idp_id, self.proto.idp_id,
'--mapping', '--mapping',
identity_fakes.mapping_id, self.mapping.name,
] ]
verifylist = [ verifylist = [
('identity_provider', identity_fakes.idp_id), ('identity_provider', self.proto.idp_id),
('federation_protocol', identity_fakes.protocol_id), ('federation_protocol', self.proto.name),
('mapping', identity_fakes.mapping_id), ('mapping', self.mapping.name),
] ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args) columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.update.assert_called_with( self.identity_sdk_client.update_federation_protocol.assert_called_with(
identity_fakes.idp_id, idp_id=self.proto.idp_id,
identity_fakes.protocol_id, name=self.proto.name,
identity_fakes.mapping_id, mapping_id=self.mapping.id,
) )
collist = ('id', 'identity_provider', 'mapping') collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns) self.assertEqual(collist, columns)
datalist = ( datalist = (
identity_fakes.protocol_id, self.proto.name,
identity_fakes.idp_id, self.proto.idp_id,
identity_fakes.mapping_id_updated, self.proto.mapping_id,
) )
self.assertEqual(datalist, data) self.assertEqual(datalist, data)
class TestProtocolShow(TestProtocol): class TestProtocolShow(identity_fakes.TestFederatedIdentity):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.protocols_mock.get.return_value = fakes.FakeResource( self.proto = sdk_fakes.generate_fake_resource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=False _federation_protocol.FederationProtocol
)
self.identity_sdk_client.get_federation_protocol.return_value = (
self.proto
) )
self.cmd = federation_protocol.ShowProtocol(self.app, None) self.cmd = federation_protocol.ShowProtocol(self.app, None)
def test_show_protocol(self): def test_show_protocol(self):
arglist = [ arglist = [
identity_fakes.protocol_id, self.proto.name,
'--identity-provider', '--identity-provider',
identity_fakes.idp_id, self.proto.idp_id,
] ]
verifylist = [ verifylist = [
('federation_protocol', identity_fakes.protocol_id), ('federation_protocol', self.proto.name),
('identity_provider', identity_fakes.idp_id), ('identity_provider', self.proto.idp_id),
] ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args) columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.get.assert_called_with( self.identity_sdk_client.get_federation_protocol.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id idp_id=self.proto.idp_id, protocol=self.proto.name
) )
collist = ('id', 'identity_provider', 'mapping') collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns) self.assertEqual(collist, columns)
datalist = ( datalist = (
identity_fakes.protocol_id, self.proto.name,
identity_fakes.idp_id, self.proto.idp_id,
identity_fakes.mapping_id, self.proto.mapping_id,
) )
self.assertEqual(datalist, data) self.assertEqual(datalist, data)