CRUD operations for federated protocols

Openstackclient needs to have a capability to manage federated protocols
(like saml2, openid connect, abfab). This patch allows users to
administrate such operations from the commandline.

Change-Id: I59eef2acdda60c7ec795d1bfe31e8e960b4478a1
Implements: bp/add-openstackclient-federation-crud
This commit is contained in:
Marek Denis 2014-10-02 09:36:13 +02:00 committed by Steve Martinelli
parent d405b1b12d
commit 14c61a0ace
4 changed files with 397 additions and 0 deletions

View File

@ -0,0 +1,182 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Identity v3 Protocols actions implementations"""
import logging
import six
from cliff import command
from cliff import lister
from cliff import show
from openstackclient.common import utils
class CreateProtocol(show.ShowOne):
"""Create new Federation Protocol tied to an Identity Provider"""
log = logging.getLogger(__name__ + 'CreateProtocol')
def get_parser(self, prog_name):
parser = super(CreateProtocol, self).get_parser(prog_name)
parser.add_argument(
'federation_protocol',
metavar='<name>',
help='Protocol (must be unique per Identity Provider')
parser.add_argument(
'--identity-provider',
metavar='<identity-provider>',
help=('Identity Provider you want to add the Protocol to '
'(must already exist)'), required=True)
parser.add_argument(
'--mapping',
metavar='<mapping>', required=True,
help='Mapping you want to be used (must already exist)')
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
identity_client = self.app.client_manager.identity
protocol = identity_client.federation.protocols.create(
protocol_id=parsed_args.federation_protocol,
identity_provider=parsed_args.identity_provider,
mapping=parsed_args.mapping)
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response
# from Keystone, however it should be listed to the user. Add it
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
return zip(*sorted(six.iteritems(info)))
class DeleteProtocol(command.Command):
"""Delete Federation Protocol tied to a Identity Provider"""
log = logging.getLogger(__name__ + '.DeleteProtocol')
def get_parser(self, prog_name):
parser = super(DeleteProtocol, self).get_parser(prog_name)
parser.add_argument(
'federation_protocol',
metavar='<name>',
help='Protocol (must be unique per Identity Provider')
parser.add_argument(
'--identity-provider',
metavar='<identity-provider>', required=True,
help='Identity Provider the Protocol is tied to')
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
identity_client = self.app.client_manager.identity
identity_client.federation.protocols.delete(
parsed_args.identity_provider, parsed_args.federation_protocol)
return
class ListProtocols(lister.Lister):
"""List Protocols tied to an Identity Provider"""
log = logging.getLogger(__name__ + '.ListProtocols')
def get_parser(self, prog_name):
parser = super(ListProtocols, self).get_parser(prog_name)
parser.add_argument(
'--identity-provider',
metavar='<identity-provider>', required=True,
help='Identity Provider the Protocol is tied to')
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
protocols = identity_client.federation.protocols.list(
parsed_args.identity_provider)
columns = ('id', 'mapping')
response_attributes = ('id', 'mapping_id')
items = [utils.get_item_properties(s, response_attributes)
for s in protocols]
return (columns, items)
class SetProtocol(command.Command):
"""Set Protocol tied to an Identity Provider"""
log = logging.getLogger(__name__ + '.SetProtocol')
def get_parser(self, prog_name):
parser = super(SetProtocol, self).get_parser(prog_name)
parser.add_argument(
'federation_protocol',
metavar='<name>',
help='Protocol (must be unique per Identity Provider')
parser.add_argument(
'--identity-provider',
metavar='<identity-provider>', required=True,
help=('Identity Provider you want to add the Protocol to '
'(must already exist)'))
parser.add_argument(
'--mapping',
metavar='<mapping>', required=True,
help='Mapping you want to be used (must already exist)')
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
protocol = identity_client.federation.protocols.update(
parsed_args.identity_provider, parsed_args.federation_protocol,
parsed_args.mapping)
info = dict(protocol._info)
# NOTE(marek-denis): Identity provider is not included in a response
# from Keystone, however it should be listed to the user. Add it
# manually to the output list, simply reusing value provided by the
# user.
info['identity_provider'] = parsed_args.identity_provider
info['mapping'] = info.pop('mapping_id')
return zip(*sorted(six.iteritems(info)))
class ShowProtocol(show.ShowOne):
"""Show Protocol tied to an Identity Provider"""
log = logging.getLogger(__name__ + '.ShowProtocol')
def get_parser(self, prog_name):
parser = super(ShowProtocol, self).get_parser(prog_name)
parser.add_argument(
'federation_protocol',
metavar='<name>',
help='Protocol (must be unique per Identity Provider')
parser.add_argument(
'--identity-provider',
metavar='<identity-provider>', required=True,
help=('Identity Provider you want to add the Protocol to '
'(must already exist)'))
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
protocol = identity_client.federation.protocols.get(
parsed_args.identity_provider, parsed_args.federation_protocol)
info = dict(protocol._info)
info['mapping'] = info.pop('mapping_id')
return zip(*sorted(six.iteritems(info)))

View File

@ -190,6 +190,28 @@ IDENTITY_PROVIDER = {
'description': idp_description
}
protocol_id = 'protocol'
mapping_id = 'test_mapping'
mapping_id_updated = 'prod_mapping'
PROTOCOL_ID_MAPPING = {
'id': protocol_id,
'mapping': mapping_id
}
PROTOCOL_OUTPUT = {
'id': protocol_id,
'mapping_id': mapping_id,
'identity_provider': idp_id
}
PROTOCOL_OUTPUT_UPDATED = {
'id': protocol_id,
'mapping_id': mapping_id_updated,
'identity_provider': idp_id
}
# Assignments
ASSIGNMENT_WITH_PROJECT_ID_AND_USER_ID = {
@ -285,6 +307,8 @@ class FakeFederationManager(object):
self.identity_providers.resource_class = fakes.FakeResource(None, {})
self.mappings = mock.Mock()
self.mappings.resource_class = fakes.FakeResource(None, {})
self.protocols = mock.Mock()
self.protocols.resource_class = fakes.FakeResource(None, {})
class FakeFederatedClient(FakeIdentityv3Client):

View File

@ -0,0 +1,185 @@
# Copyright 2014 CERN.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from openstackclient.identity.v3 import federation_protocol
from openstackclient.tests import fakes
from openstackclient.tests.identity.v3 import fakes as identity_fakes
class TestProtocol(identity_fakes.TestFederatedIdentity):
def setUp(self):
super(TestProtocol, self).setUp()
federation_lib = self.app.client_manager.identity.federation
self.protocols_mock = federation_lib.protocols
self.protocols_mock.reset_mock()
class TestProtocolCreate(TestProtocol):
def setUp(self):
super(TestProtocolCreate, self).setUp()
proto = copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT)
resource = fakes.FakeResource(None, proto, loaded=True)
self.protocols_mock.create.return_value = resource
self.cmd = federation_protocol.CreateProtocol(self.app, None)
def test_create_protocol(self):
argslist = [
identity_fakes.protocol_id,
'--identity-provider', identity_fakes.idp_id,
'--mapping', identity_fakes.mapping_id
]
verifylist = [
('federation_protocol', identity_fakes.protocol_id),
('identity_provider', identity_fakes.idp_id),
('mapping', identity_fakes.mapping_id)
]
parsed_args = self.check_parser(self.cmd, argslist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.create.assert_called_with(
protocol_id=identity_fakes.protocol_id,
identity_provider=identity_fakes.idp_id,
mapping=identity_fakes.mapping_id)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (identity_fakes.protocol_id,
identity_fakes.idp_id,
identity_fakes.mapping_id)
self.assertEqual(datalist, data)
class TestProtocolDelete(TestProtocol):
def setUp(self):
super(TestProtocolDelete, self).setUp()
# This is the return value for utils.find_resource()
self.protocols_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROTOCOL_OUTPUT),
loaded=True,
)
self.protocols_mock.delete.return_value = None
self.cmd = federation_protocol.DeleteProtocol(self.app, None)
def test_delete_identity_provider(self):
arglist = [
'--identity-provider', identity_fakes.idp_id,
identity_fakes.protocol_id
]
verifylist = [
('federation_protocol', identity_fakes.protocol_id),
('identity_provider', identity_fakes.idp_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.protocols_mock.delete.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id)
class TestProtocolList(TestProtocol):
def setUp(self):
super(TestProtocolList, self).setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True)
self.protocols_mock.list.return_value = [fakes.FakeResource(
None, identity_fakes.PROTOCOL_ID_MAPPING, loaded=True)]
self.cmd = federation_protocol.ListProtocols(self.app, None)
def test_list_protocols(self):
arglist = ['--identity-provider', identity_fakes.idp_id]
verifylist = [('identity_provider', identity_fakes.idp_id)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.list.assert_called_with(identity_fakes.idp_id)
class TestProtocolSet(TestProtocol):
def setUp(self):
super(TestProtocolSet, self).setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=True)
self.protocols_mock.update.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT_UPDATED, loaded=True)
self.cmd = federation_protocol.SetProtocol(self.app, None)
def test_set_new_mapping(self):
arglist = [
identity_fakes.protocol_id,
'--identity-provider', identity_fakes.idp_id,
'--mapping', identity_fakes.mapping_id
]
verifylist = [('identity_provider', identity_fakes.idp_id),
('federation_protocol', identity_fakes.protocol_id),
('mapping', identity_fakes.mapping_id)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.update.assert_called_with(
identity_fakes.idp_id, identity_fakes.protocol_id,
identity_fakes.mapping_id)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (identity_fakes.protocol_id, identity_fakes.idp_id,
identity_fakes.mapping_id_updated)
self.assertEqual(datalist, data)
class TestProtocolShow(TestProtocol):
def setUp(self):
super(TestProtocolShow, self).setUp()
self.protocols_mock.get.return_value = fakes.FakeResource(
None, identity_fakes.PROTOCOL_OUTPUT, loaded=False)
self.cmd = federation_protocol.ShowProtocol(self.app, None)
def test_show_protocol(self):
arglist = [identity_fakes.protocol_id, '--identity-provider',
identity_fakes.idp_id]
verifylist = [('federation_protocol', identity_fakes.protocol_id),
('identity_provider', identity_fakes.idp_id)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.protocols_mock.get.assert_called_with(identity_fakes.idp_id,
identity_fakes.protocol_id)
collist = ('id', 'identity_provider', 'mapping')
self.assertEqual(collist, columns)
datalist = (identity_fakes.protocol_id,
identity_fakes.idp_id,
identity_fakes.mapping_id)
self.assertEqual(datalist, data)

View File

@ -230,6 +230,12 @@ openstack.identity.v3 =
project_set = openstackclient.identity.v3.project:SetProject
project_show = openstackclient.identity.v3.project:ShowProject
federation_protocol_create = openstackclient.identity.v3.federation_protocol:CreateProtocol
federation_protocol_delete = openstackclient.identity.v3.federation_protocol:DeleteProtocol
federation_protocol_list = openstackclient.identity.v3.federation_protocol:ListProtocols
federation_protocol_set = openstackclient.identity.v3.federation_protocol:SetProtocol
federation_protocol_show = openstackclient.identity.v3.federation_protocol:ShowProtocol
request_token_authorize = openstackclient.identity.v3.token:AuthorizeRequestToken
request_token_create = openstackclient.identity.v3.token:CreateRequestToken