Implement CRUD operations for Identity Providers

Operations for:
    * adding Identity Provider
    * listing Identity Providers
    * showing Identity Provider
    * updating Identity Provider
    * deleting Identity Provider

Change-Id: I4557168309f93e4670116b5c3c0e29252ff0c40f
Implements: bp/add-openstackclient-federation-crud
This commit is contained in:
Marek Denis 2014-04-09 19:05:36 +02:00
parent ee22070473
commit ef9496a4fc
4 changed files with 581 additions and 0 deletions
openstackclient
setup.cfg

@ -0,0 +1,180 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Identity v3 IdentityProvider action implementations"""
import logging
import six
import sys
from cliff import command
from cliff import lister
from cliff import show
from openstackclient.common import utils
class CreateIdentityProvider(show.ShowOne):
"""Create identity_provider command"""
log = logging.getLogger(__name__ + '.CreateIdentityProvider')
def get_parser(self, prog_name):
parser = super(CreateIdentityProvider, self).get_parser(prog_name)
parser.add_argument(
'identity_provider_id',
metavar='<identity_provider_id>',
help='New identity provider ID (must be unique)'
)
parser.add_argument(
'--description',
metavar='<description>',
help='New identity provider description',
)
enable_identity_provider = parser.add_mutually_exclusive_group()
enable_identity_provider.add_argument(
'--enable',
dest='enabled',
action='store_true',
default=True,
help='Enable identity provider',
)
enable_identity_provider.add_argument(
'--disable',
dest='enabled',
action='store_false',
help='Disable the identity provider',
)
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
identity_client = self.app.client_manager.identity
idp = identity_client.identity_providers.create(
parsed_args.identity_provider_id,
description=parsed_args.description,
enabled=parsed_args.enabled)
info = {}
info.update(idp._info)
return zip(*sorted(six.iteritems(info)))
class DeleteIdentityProvider(command.Command):
"""Delete identity provider"""
log = logging.getLogger(__name__ + '.DeleteIdentityProvider')
def get_parser(self, prog_name):
parser = super(DeleteIdentityProvider, self).get_parser(prog_name)
parser.add_argument(
'identity_provider',
metavar='<identity_provider>',
help='ID of the identity provider to be deleted',
)
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
identity_client = self.app.client_manager.identity
identity_client.identity_providers.delete(
parsed_args.identity_provider)
return
class ListIdentityProvider(lister.Lister):
"""List identity providers"""
log = logging.getLogger(__name__ + '.ListIdentityProvider')
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
columns = ('ID', 'Enabled', 'Description')
data = self.app.client_manager.identity.identity_providers.list()
return (columns,
(utils.get_item_properties(
s, columns,
formatters={},
) for s in data))
class SetIdentityProvider(command.Command):
"""Set identity provider"""
log = logging.getLogger(__name__ + '.SetIdentityProvider')
def get_parser(self, prog_name):
parser = super(SetIdentityProvider, self).get_parser(prog_name)
parser.add_argument(
'identity_provider',
metavar='<identity_provider>',
help='ID of the identity provider to be changed',
)
enable_identity_provider = parser.add_mutually_exclusive_group()
enable_identity_provider.add_argument(
'--enable',
action='store_true',
help='Enable the identity provider',
)
enable_identity_provider.add_argument(
'--disable',
action='store_true',
help='Disable the identity provider',
)
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
identity_client = self.app.client_manager.identity
if parsed_args.enable is True:
enabled = True
elif parsed_args.disable is True:
enabled = False
else:
sys.stdout.write("Identity Provider not updated, "
"no arguments present")
return (None, None)
identity_provider = identity_client.identity_providers.update(
parsed_args.identity_provider, enabled=enabled)
info = {}
info.update(identity_provider._info)
return zip(*sorted(six.iteritems(info)))
class ShowIdentityProvider(show.ShowOne):
"""Show identity provider"""
log = logging.getLogger(__name__ + '.ShowIdentityProvider')
def get_parser(self, prog_name):
parser = super(ShowIdentityProvider, self).get_parser(prog_name)
parser.add_argument(
'identity_provider',
metavar='<identity_provider>',
help='ID of the identity provider to be displayed',
)
return parser
def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args)
identity_client = self.app.client_manager.identity
identity_provider = utils.find_resource(
identity_client.identity_providers,
parsed_args.identity_provider)
info = {}
info.update(identity_provider._info)
return zip(*sorted(six.iteritems(info)))

@ -105,6 +105,15 @@ TOKEN_WITH_DOMAIN_ID = {
'user_id': user_id,
}
idp_id = 'test_idp'
idp_description = 'super exciting IdP description'
IDENTITY_PROVIDER = {
'id': idp_id,
'enabled': True,
'description': idp_description
}
class FakeIdentityv3Client(object):
def __init__(self, **kwargs):
@ -125,6 +134,14 @@ class FakeIdentityv3Client(object):
self.management_url = kwargs['endpoint']
class FakeFederatedClient(FakeIdentityv3Client):
def __init__(self, **kwargs):
super(FakeFederatedClient, self).__init__(**kwargs)
self.identity_providers = mock.Mock()
self.identity_providers.resource_class = fakes.FakeResource(None, {})
class TestIdentityv3(utils.TestCommand):
def setUp(self):
super(TestIdentityv3, self).setUp()
@ -133,3 +150,13 @@ class TestIdentityv3(utils.TestCommand):
endpoint=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
)
class TestFederatedIdentity(utils.TestCommand):
def setUp(self):
super(TestFederatedIdentity, self).setUp()
self.app.client_manager.identity = FakeFederatedClient(
endpoint=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN
)

@ -0,0 +1,368 @@
# Copyright 2014 CERN.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from openstackclient.identity.v3 import identity_provider
from openstackclient.tests import fakes
from openstackclient.tests.identity.v3 import fakes as identity_fakes
class TestIdentityProvider(identity_fakes.TestFederatedIdentity):
def setUp(self):
super(TestIdentityProvider, self).setUp()
self.identity_providers_mock = self.app.client_manager.\
identity.identity_providers
self.identity_providers_mock.reset_mock()
class TestIdentityProviderCreate(TestIdentityProvider):
def setUp(self):
super(TestIdentityProviderCreate, self).setUp()
self.identity_providers_mock.create.return_value = \
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True
)
self.cmd = identity_provider.CreateIdentityProvider(
self.app, None)
def test_create_identity_provider_no_options(self):
arglist = [
identity_fakes.idp_id
]
verifylist = [
('identity_provider_id', identity_fakes.idp_id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'enabled': True,
'description': None,
}
self.identity_providers_mock.create.assert_called_with(
identity_fakes.idp_id, **kwargs)
collist = ('description', 'enabled', 'id')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.idp_description,
True,
identity_fakes.idp_id,
)
self.assertEqual(data, datalist)
def test_create_identity_provider_description(self):
arglist = ['--description', identity_fakes.idp_description,
identity_fakes.idp_id]
verifylist = [
('identity_provider_id', identity_fakes.idp_id),
('description', identity_fakes.idp_description)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'description': identity_fakes.idp_description,
'enabled': True,
}
self.identity_providers_mock.create.assert_called_with(
identity_fakes.idp_id, **kwargs)
collist = ('description', 'enabled', 'id')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.idp_description, True, identity_fakes.idp_id,
)
self.assertEqual(data, datalist)
def test_create_identity_provider_disabled(self):
# Prepare FakeResource object
IDENTITY_PROVIDER = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER)
IDENTITY_PROVIDER['enabled'] = False
IDENTITY_PROVIDER['description'] = None
self.identity_providers_mock.create.return_value = \
fakes.FakeResource(
None,
IDENTITY_PROVIDER,
loaded=True
)
arglist = ['--disable',
identity_fakes.idp_id]
verifylist = [
('identity_provider_id', identity_fakes.idp_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Set expected values
kwargs = {
'enabled': False,
'description': None
}
self.identity_providers_mock.create.assert_called_with(
identity_fakes.idp_id, **kwargs)
collist = ('description', 'enabled', 'id')
self.assertEqual(columns, collist)
datalist = (
None,
False,
identity_fakes.idp_id,
)
self.assertEqual(data, datalist)
class TestIdentityProviderDelete(TestIdentityProvider):
def setUp(self):
super(TestIdentityProviderDelete, self).setUp()
# This is the return value for utils.find_resource()
self.identity_providers_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True)
self.identity_providers_mock.delete.return_value = None
self.cmd = identity_provider.DeleteIdentityProvider(
self.app, None)
def test_delete_identity_provider(self):
arglist = [
identity_fakes.idp_id
]
verifylist = [
('identity_provider', identity_fakes.idp_id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.identity_providers_mock.delete.assert_called_with(
identity_fakes.idp_id,
)
class TestIdentityProviderList(TestIdentityProvider):
def setUp(self):
super(TestIdentityProviderList, self).setUp()
self.identity_providers_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True,
)
self.identity_providers_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True,
),
]
# Get the command object to test
self.cmd = identity_provider.ListIdentityProvider(self.app, None)
def test_identity_provider_list_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.identity_providers_mock.list.assert_called_with()
collist = ('ID', 'Enabled', 'Description')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.idp_id,
True,
identity_fakes.idp_description
), )
self.assertEqual(tuple(data), datalist)
class TestIdentityProviderShow(TestIdentityProvider):
def setUp(self):
super(TestIdentityProviderShow, self).setUp()
self.identity_providers_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True
)
# Get the command object to test
self.cmd = identity_provider.ShowIdentityProvider(self.app, None)
def test_identity_provider_show(self):
arglist = [
identity_fakes.idp_id
]
verifylist = [
('identity_provider', identity_fakes.idp_id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.identity_providers_mock.get.assert_called_with(
identity_fakes.idp_id)
collist = ('description', 'enabled', 'id' )
self.assertEqual(columns, collist)
datalist = (
identity_fakes.idp_description,
True,
identity_fakes.idp_id
)
self.assertEqual(data, datalist)
class TestIdentityProviderSet(TestIdentityProvider):
def setUp(self):
super(TestIdentityProviderSet, self).setUp()
self.cmd = identity_provider.SetIdentityProvider(self.app, None)
def test_identity_provider_disable(self):
"""Disable Identity Provider
Set Identity Provider's ``enabled`` attribute to False.
"""
def prepare(self):
"""Prepare fake return objects before the test is executed"""
updated_idp = copy.deepcopy(identity_fakes.IDENTITY_PROVIDER)
updated_idp['enabled'] = False
resources = fakes.FakeResource(
None,
updated_idp,
loaded=True
)
self.identity_providers_mock.update.return_value = resources
prepare(self)
arglist = [
'--disable', identity_fakes.idp_id
]
verifylist = [
('identity_provider', identity_fakes.idp_id),
('enable', False),
('disable', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.identity_providers_mock.update.assert_called_with(
identity_fakes.idp_id, enabled=False)
collist = ('description', 'enabled', 'id' )
self.assertEqual(columns, collist)
datalist = (
identity_fakes.idp_description,
False,
identity_fakes.idp_id
)
self.assertEqual(datalist, data)
def test_identity_provider_enable(self):
"""Enable Identity Provider.
Set Identity Provider's ``enabled`` attribute to True.
"""
def prepare(self):
"""Prepare fake return objects before the test is executed"""
resources = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True
)
self.identity_providers_mock.update.return_value = resources
prepare(self)
arglist = [
'--enable', identity_fakes.idp_id
]
verifylist = [
('identity_provider', identity_fakes.idp_id),
('enable', True),
('disable', False)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.identity_providers_mock.update.assert_called_with(
identity_fakes.idp_id, enabled=True)
collist = ('description', 'enabled', 'id' )
self.assertEqual(columns, collist)
datalist = (
identity_fakes.idp_description,
True,
identity_fakes.idp_id
)
self.assertEqual(data, datalist)
def test_identity_provider_no_options(self):
def prepare(self):
"""Prepare fake return objects before the test is executed"""
resources = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True
)
self.identity_providers_mock.get.return_value = resources
resources = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.IDENTITY_PROVIDER),
loaded=True
)
self.identity_providers_mock.update.return_value = resources
prepare(self)
arglist = [
identity_fakes.idp_id
]
verifylist = [
('identity_provider', identity_fakes.idp_id),
('enable', False),
('disable', False)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# expect take_action() to return (None, None) as
# neither --enable nor --disable was specified
self.assertEqual(columns, None)
self.assertEqual(data, None)

@ -200,6 +200,12 @@ openstack.identity.v3 =
group_set = openstackclient.identity.v3.group:SetGroup
group_show = openstackclient.identity.v3.group:ShowGroup
identity_provider_create = openstackclient.identity.v3.identity_provider:CreateIdentityProvider
identity_provider_delete = openstackclient.identity.v3.identity_provider:DeleteIdentityProvider
identity_provider_list = openstackclient.identity.v3.identity_provider:ListIdentityProvider
identity_provider_set = openstackclient.identity.v3.identity_provider:SetIdentityProvider
identity_provider_show = openstackclient.identity.v3.identity_provider:ShowIdentityProvider
policy_create = openstackclient.identity.v3.policy:CreatePolicy
policy_delete = openstackclient.identity.v3.policy:DeletePolicy
policy_list = openstackclient.identity.v3.policy:ListPolicy