Add Identity v2 role and service tests

* Add current auth info (auth_ref) to ClientManager
* Fix identity.v2_0.role.ListUserRole to get default user/project
  from ClientManager.auth_ref
* Fix identity.v2_0.role.AddRole call to roles.add_user_role()

Change-Id: Ie8bf41c491d97b0292a2b86bdc9b7580989a7f97
This commit is contained in:
Dean Troyer 2013-08-27 16:57:30 -05:00 committed by Gerrit Code Review
parent eb405a88c4
commit 44c97cc099
6 changed files with 724 additions and 24 deletions
openstackclient

@ -1,4 +1,4 @@
# Copyright 2012-2013 OpenStack, LLC. # Copyright 2012-2013 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -62,8 +62,11 @@ class ClientManager(object):
self._api_version = api_version self._api_version = api_version
self._service_catalog = None self._service_catalog = None
self.auth_ref = None
if not self._url: if not self._url:
# Populate other password flow attributes # Populate other password flow attributes
self.auth_ref = self.identity.auth_ref
self._token = self.identity.auth_token self._token = self.identity.auth_token
self._service_catalog = self.identity.service_catalog self._service_catalog = self.identity.service_catalog

@ -1,4 +1,4 @@
# Copyright 2012-2013 OpenStack, LLC. # Copyright 2012-2013 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -48,6 +48,7 @@ def make_client(instance):
tenant_id=instance._project_id, tenant_id=instance._project_id,
auth_url=instance._auth_url, auth_url=instance._auth_url,
region_name=instance._region_name) region_name=instance._region_name)
instance.auth_ref = client.auth_ref
return client return client

@ -22,6 +22,7 @@ from cliff import command
from cliff import lister from cliff import lister
from cliff import show from cliff import show
from openstackclient.common import exceptions
from openstackclient.common import utils from openstackclient.common import utils
@ -59,9 +60,10 @@ class AddRole(show.ShowOne):
) )
user = utils.find_resource(identity_client.users, parsed_args.user) user = utils.find_resource(identity_client.users, parsed_args.user)
role = identity_client.roles.add_user_role( role = identity_client.roles.add_user_role(
user, user.id,
role, role.id,
project) project.id,
)
info = {} info = {}
info.update(role._info) info.update(role._info)
@ -150,14 +152,23 @@ class ListUserRole(lister.Lister):
def take_action(self, parsed_args): def take_action(self, parsed_args):
self.log.debug('take_action(%s)' % parsed_args) self.log.debug('take_action(%s)' % parsed_args)
identity_client = self.app.client_manager.identity identity_client = self.app.client_manager.identity
auth_ref = self.app.client_manager.auth_ref
# user-only roles are not supported in KSL so we are # Project and user are required, if not included in command args
# required to have a user and project; default to the # default to the values used for authentication. For token-flow
# values used for authentication if not specified # authentication they must be included on the command line.
if not parsed_args.project: if not parsed_args.project:
parsed_args.project = identity_client.auth_tenant_id if self.app.client_manager.auth_ref:
parsed_args.project = auth_ref.project_id
else:
msg = "Project must be specified"
raise exceptions.CommandError(msg)
if not parsed_args.user: if not parsed_args.user:
parsed_args.user = identity_client.auth_user_id if self.app.client_manager.auth_ref:
parsed_args.user = auth_ref.user_id
else:
msg = "User must be specified"
raise exceptions.CommandError(msg)
project = utils.find_resource( project = utils.find_resource(
identity_client.tenants, identity_client.tenants,

@ -17,24 +17,10 @@ import mock
from openstackclient.tests import fakes from openstackclient.tests import fakes
user_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
user_name = 'paul'
user_description = 'Sir Paul'
user_email = 'paul@applecorps.com'
project_id = '8-9-64' project_id = '8-9-64'
project_name = 'beatles' project_name = 'beatles'
project_description = 'Fab Four' project_description = 'Fab Four'
USER = {
'id': user_id,
'name': user_name,
'tenantId': project_id,
'email': user_email,
'enabled': True,
}
PROJECT = { PROJECT = {
'id': project_id, 'id': project_id,
'name': project_name, 'name': project_name,
@ -49,9 +35,46 @@ PROJECT_2 = {
'enabled': True, 'enabled': True,
} }
role_id = '1'
role_name = 'boss'
ROLE = {
'id': role_id,
'name': role_name,
}
service_id = '1925-10-11'
service_name = 'elmore'
service_description = 'Leonard, Elmore, rip'
service_type = 'author'
SERVICE = {
'id': service_id,
'name': service_name,
'description': service_description,
'type': service_type,
}
user_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
user_name = 'paul'
user_description = 'Sir Paul'
user_email = 'paul@applecorps.com'
USER = {
'id': user_id,
'name': user_name,
'tenantId': project_id,
'email': user_email,
'enabled': True,
}
class FakeIdentityv2Client(object): class FakeIdentityv2Client(object):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.roles = mock.Mock()
self.roles.resource_class = fakes.FakeResource(None, {})
self.services = mock.Mock()
self.services.resource_class = fakes.FakeResource(None, {})
self.tenants = mock.Mock() self.tenants = mock.Mock()
self.tenants.resource_class = fakes.FakeResource(None, {}) self.tenants.resource_class = fakes.FakeResource(None, {})
self.users = mock.Mock() self.users = mock.Mock()

@ -0,0 +1,419 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
from openstackclient.common import exceptions
from openstackclient.identity.v2_0 import role
from openstackclient.tests import fakes
from openstackclient.tests.identity import fakes as identity_fakes
from openstackclient.tests.identity import test_identity
class TestRole(test_identity.TestIdentity):
def setUp(self):
super(TestRole, self).setUp()
# Get a shortcut to the TenantManager Mock
self.projects_mock = self.app.client_manager.identity.tenants
self.projects_mock.reset_mock()
# Get a shortcut to the UserManager Mock
self.users_mock = self.app.client_manager.identity.users
self.users_mock.reset_mock()
# Get a shortcut to the RoleManager Mock
self.roles_mock = self.app.client_manager.identity.roles
self.roles_mock.reset_mock()
class TestRoleAdd(TestRole):
def setUp(self):
super(TestRoleAdd, self).setUp()
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
)
self.users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
self.roles_mock.add_user_role.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
# Get the command object to test
self.cmd = role.AddRole(self.app, None)
def test_role_add(self):
arglist = [
'--project', identity_fakes.project_name,
'--user', identity_fakes.user_name,
identity_fakes.role_name,
]
verifylist = [
('role', identity_fakes.role_name),
('project', identity_fakes.project_name),
('user', identity_fakes.user_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# RoleManager.add_user_role(user, role, tenant=None)
self.roles_mock.add_user_role.assert_called_with(
identity_fakes.user_id,
identity_fakes.role_id,
identity_fakes.project_id,
)
collist = ('id', 'name')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.role_id,
identity_fakes.role_name,
)
self.assertEqual(data, datalist)
class TestRoleCreate(TestRole):
def setUp(self):
super(TestRoleCreate, self).setUp()
self.roles_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
# Get the command object to test
self.cmd = role.CreateRole(self.app, None)
def test_role_create_no_options(self):
arglist = [
identity_fakes.role_name,
]
verifylist = [
('role_name', identity_fakes.role_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# RoleManager.create(name)
self.roles_mock.create.assert_called_with(
identity_fakes.role_name,
)
collist = ('id', 'name')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.role_id,
identity_fakes.role_name,
)
self.assertEqual(data, datalist)
class TestRoleDelete(TestRole):
def setUp(self):
super(TestRoleDelete, self).setUp()
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
self.roles_mock.delete.return_value = None
# Get the command object to test
self.cmd = role.DeleteRole(self.app, None)
def test_role_delete_no_options(self):
arglist = [
identity_fakes.role_name,
]
verifylist = [
('role', identity_fakes.role_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
self.cmd.take_action(parsed_args)
self.roles_mock.delete.assert_called_with(
identity_fakes.role_id,
)
class TestRoleList(TestRole):
def setUp(self):
super(TestRoleList, self).setUp()
self.roles_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
),
]
# Get the command object to test
self.cmd = role.ListRole(self.app, None)
def test_role_list_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.roles_mock.list.assert_called_with()
collist = ('ID', 'Name')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.role_id,
identity_fakes.role_name,
), )
self.assertEqual(tuple(data), datalist)
class TestUserRoleList(TestRole):
def setUp(self):
super(TestUserRoleList, self).setUp()
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
)
self.users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
self.roles_mock.roles_for_user.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
),
]
# Get the command object to test
self.cmd = role.ListUserRole(self.app, None)
def test_user_role_list_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# This argument combination should raise a CommandError
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args,
)
def test_user_role_list_no_options_def_creds(self):
auth_ref = self.app.client_manager.auth_ref = mock.MagicMock()
auth_ref.project_id.return_value = identity_fakes.project_id
auth_ref.user_id.return_value = identity_fakes.user_id
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.roles_mock.roles_for_user.assert_called_with(
identity_fakes.user_id,
identity_fakes.project_id,
)
collist = ('ID', 'Name', 'Project', 'User')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.role_id,
identity_fakes.role_name,
identity_fakes.project_name,
identity_fakes.user_name,
), )
self.assertEqual(tuple(data), datalist)
def test_user_role_list_project(self):
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT_2),
loaded=True,
)
arglist = [
'--project', identity_fakes.PROJECT_2['name'],
]
verifylist = [
('project', identity_fakes.PROJECT_2['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# This argument combination should raise a CommandError
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args,
)
def test_user_role_list_project_def_creds(self):
auth_ref = self.app.client_manager.auth_ref = mock.MagicMock()
auth_ref.project_id.return_value = identity_fakes.project_id
auth_ref.user_id.return_value = identity_fakes.user_id
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT_2),
loaded=True,
)
arglist = [
'--project', identity_fakes.PROJECT_2['name'],
]
verifylist = [
('project', identity_fakes.PROJECT_2['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.roles_mock.roles_for_user.assert_called_with(
identity_fakes.user_id,
identity_fakes.PROJECT_2['id'],
)
collist = ('ID', 'Name', 'Project', 'User')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.role_id,
identity_fakes.role_name,
identity_fakes.PROJECT_2['name'],
identity_fakes.user_name,
), )
self.assertEqual(tuple(data), datalist)
class TestRoleRemove(TestRole):
def setUp(self):
super(TestRoleRemove, self).setUp()
self.projects_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.PROJECT),
loaded=True,
)
self.users_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.USER),
loaded=True,
)
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
self.roles_mock.remove_user_role.return_value = None
# Get the command object to test
self.cmd = role.RemoveRole(self.app, None)
def test_role_remove(self):
arglist = [
'--project', identity_fakes.project_name,
'--user', identity_fakes.user_name,
identity_fakes.role_name,
]
verifylist = [
('role', identity_fakes.role_name),
('project', identity_fakes.project_name),
('user', identity_fakes.user_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
self.cmd.take_action(parsed_args)
# RoleManager.remove_user_role(user, role, tenant=None)
self.roles_mock.remove_user_role.assert_called_with(
identity_fakes.user_id,
identity_fakes.role_id,
identity_fakes.project_id,
)
class TestRoleShow(TestRole):
def setUp(self):
super(TestRoleShow, self).setUp()
self.roles_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.ROLE),
loaded=True,
)
# Get the command object to test
self.cmd = role.ShowRole(self.app, None)
def test_service_show(self):
arglist = [
identity_fakes.role_name,
]
verifylist = [
('role', identity_fakes.role_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# RoleManager.get(role)
self.roles_mock.get.assert_called_with(
identity_fakes.role_name,
)
collist = ('id', 'name')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.role_id,
identity_fakes.role_name,
)
self.assertEqual(data, datalist)

@ -0,0 +1,243 @@
# Copyright 2013 Nebula Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from openstackclient.identity.v2_0 import service
from openstackclient.tests import fakes
from openstackclient.tests.identity import fakes as identity_fakes
from openstackclient.tests.identity import test_identity
class TestService(test_identity.TestIdentity):
def setUp(self):
super(TestService, self).setUp()
# Get a shortcut to the ServiceManager Mock
self.services_mock = self.app.client_manager.identity.services
self.services_mock.reset_mock()
class TestServiceCreate(TestService):
def setUp(self):
super(TestServiceCreate, self).setUp()
self.services_mock.create.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.SERVICE),
loaded=True,
)
# Get the command object to test
self.cmd = service.CreateService(self.app, None)
def test_service_create_minimum_options(self):
arglist = [
'--type', identity_fakes.service_type,
identity_fakes.service_name,
]
verifylist = [
('type', identity_fakes.service_type),
('name', identity_fakes.service_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# ServiceManager.create(name, service_type, description)
self.services_mock.create.assert_called_with(
identity_fakes.service_name,
identity_fakes.service_type,
None,
)
collist = ('description', 'id', 'name', 'type')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.service_description,
identity_fakes.service_id,
identity_fakes.service_name,
identity_fakes.service_type,
)
self.assertEqual(data, datalist)
def test_service_create_description(self):
arglist = [
'--type', identity_fakes.service_type,
'--description', identity_fakes.service_description,
identity_fakes.service_name,
]
verifylist = [
('type', identity_fakes.service_type),
('description', identity_fakes.service_description),
('name', identity_fakes.service_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# ServiceManager.create(name, service_type, description)
self.services_mock.create.assert_called_with(
identity_fakes.service_name,
identity_fakes.service_type,
identity_fakes.service_description,
)
collist = ('description', 'id', 'name', 'type')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.service_description,
identity_fakes.service_id,
identity_fakes.service_name,
identity_fakes.service_type,
)
self.assertEqual(data, datalist)
class TestServiceDelete(TestService):
def setUp(self):
super(TestServiceDelete, self).setUp()
self.services_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.SERVICE),
loaded=True,
)
self.services_mock.delete.return_value = None
# Get the command object to test
self.cmd = service.DeleteService(self.app, None)
def test_service_delete_no_options(self):
arglist = [
identity_fakes.service_name,
]
verifylist = [
('service', identity_fakes.service_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
self.cmd.take_action(parsed_args)
self.services_mock.delete.assert_called_with(
identity_fakes.service_name,
)
class TestServiceList(TestService):
def setUp(self):
super(TestServiceList, self).setUp()
self.services_mock.list.return_value = [
fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.SERVICE),
loaded=True,
),
]
# Get the command object to test
self.cmd = service.ListService(self.app, None)
def test_service_list_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.services_mock.list.assert_called_with()
collist = ('ID', 'Name')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.service_id,
identity_fakes.service_name,
), )
self.assertEqual(tuple(data), datalist)
def test_service_list_long(self):
arglist = [
'--long',
]
verifylist = [
('long', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
self.services_mock.list.assert_called_with()
collist = ('ID', 'Name', 'Type', 'Description')
self.assertEqual(columns, collist)
datalist = ((
identity_fakes.service_id,
identity_fakes.service_name,
identity_fakes.service_type,
identity_fakes.service_description,
), )
self.assertEqual(tuple(data), datalist)
class TestServiceShow(TestService):
def setUp(self):
super(TestServiceShow, self).setUp()
self.services_mock.get.return_value = fakes.FakeResource(
None,
copy.deepcopy(identity_fakes.SERVICE),
loaded=True,
)
# Get the command object to test
self.cmd = service.ShowService(self.app, None)
def test_service_show(self):
arglist = [
identity_fakes.service_name,
]
verifylist = [
('service', identity_fakes.service_name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
# DisplayCommandBase.take_action() returns two tuples
columns, data = self.cmd.take_action(parsed_args)
# ServiceManager.get(id)
self.services_mock.get.assert_called_with(
identity_fakes.service_name,
)
collist = ('description', 'id', 'name', 'type')
self.assertEqual(columns, collist)
datalist = (
identity_fakes.service_description,
identity_fakes.service_id,
identity_fakes.service_name,
identity_fakes.service_type,
)
self.assertEqual(data, datalist)