Merge "Add support for querying role assignments."

This commit is contained in:
Jenkins 2015-12-21 16:15:52 +00:00 committed by Gerrit Code Review
commit 2486cae218
5 changed files with 145 additions and 0 deletions

View File

@ -677,6 +677,11 @@ class RoleDelete(task_manager.Task):
return client.keystone_client.roles.delete(**self.args) return client.keystone_client.roles.delete(**self.args)
class RoleAssignmentList(task_manager.Task):
def main(self, client):
return client.keystone_client.role_assignments.list(**self.args)
class StackList(task_manager.Task): class StackList(task_manager.Task):
def main(self, client): def main(self, client):
return client.heat_client.stacks.list() return client.heat_client.stacks.list()

View File

@ -363,6 +363,56 @@ def normalize_groups(domains):
return meta.obj_list_to_dict(ret) return meta.obj_list_to_dict(ret)
def normalize_role_assignments(assignments):
"""Put role_assignments into a form that works with search/get interface.
Role assignments have the structure::
[
{
"role": {
"id": "--role-id--"
},
"scope": {
"domain": {
"id": "--domain-id--"
}
},
"user": {
"id": "--user-id--"
}
},
]
Which is hard to work with in the rest of our interface. Map this to be::
[
{
"id": "--role-id--",
"domain": "--domain-id--",
"user": "--user-id--",
}
]
Scope can be "domain" or "project" and "user" can also be "group".
:param list assignments: A list of dictionaries of role assignments.
:returns: A list of flattened/normalized role assignment dicts.
"""
new_assignments = []
for assignment in assignments:
new_val = {'id': assignment['role']['id']}
for scope in ('project', 'domain'):
if scope in assignment['scope']:
new_val[scope] = assignment['scope'][scope]['id']
for assignee in ('user', 'group'):
if assignee in assignment:
new_val[assignee] = assignment[assignee]['id']
new_assignments.append(new_val)
return new_assignments
def valid_kwargs(*valid_args): def valid_kwargs(*valid_args):
# This decorator checks if argument passed as **kwargs to a function are # This decorator checks if argument passed as **kwargs to a function are
# present in valid_args. # present in valid_args.

View File

@ -1286,6 +1286,43 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
""" """
return _utils._get_entity(self.search_roles, name_or_id, filters) return _utils._get_entity(self.search_roles, name_or_id, filters)
def list_role_assignments(self, filters=None):
"""List Keystone role assignments
:param dict filters: Dict of filter conditions. Acceptable keys are::
- 'user' (string) - User ID to be used as query filter.
- 'group' (string) - Group ID to be used as query filter.
- 'project' (string) - Project ID to be used as query filter.
- 'domain' (string) - Domain ID to be used as query filter.
- 'role' (string) - Role ID to be used as query filter.
- 'os_inherit_extension_inherited_to' (string) - Return inherited
role assignments for either 'projects' or 'domains'
- 'effective' (boolean) - Return effective role assignments.
- 'include_subtree' (boolean) - Include subtree
'user' and 'group' are mutually exclusive, as are 'domain' and
'project'.
:returns: a list of dicts containing the role assignment description.
Contains the following attributes::
- id: <role id>
- user|group: <user or group id>
- project|domain: <project or domain id>
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
if not filters:
filters = {}
with _utils.shade_exceptions("Failed to list role assignments"):
assignments = self.manager.submitTask(
_tasks.RoleAssignmentList(**filters)
)
return _utils.normalize_role_assignments(assignments)
def create_flavor(self, name, ram, vcpus, disk, flavorid="auto", def create_flavor(self, name, ram, vcpus, disk, flavorid="auto",
ephemeral=0, swap=0, rxtx_factor=1.0, is_public=True): ephemeral=0, swap=0, rxtx_factor=1.0, is_public=True):
"""Create a new flavor. """Create a new flavor.

View File

@ -79,3 +79,13 @@ class TestIdentity(base.TestCase):
role = self.cloud.create_role(role_name) role = self.cloud.create_role(role_name)
self.assertIsNotNone(role) self.assertIsNotNone(role)
self.assertTrue(self.cloud.delete_role(role_name)) self.assertTrue(self.cloud.delete_role(role_name))
# TODO(Shrews): Once we can support assigning roles within shade, we
# need to make this test a little more specific, and add more for testing
# filtering functionality.
def test_list_role_assignments(self):
if self.cloud.cloud_config.get_api_version('identity') in ('2', '2.0'):
self.skipTest("Identity service does not support role assignments")
assignments = self.cloud.list_role_assignments()
self.assertIsInstance(assignments, list)
self.assertTrue(len(assignments) > 0)

View File

@ -12,13 +12,31 @@
# limitations under the License. # limitations under the License.
import mock import mock
import testtools
import shade import shade
from shade import meta from shade import meta
from shade import _utils
from shade.tests.unit import base from shade.tests.unit import base
from shade.tests import fakes from shade.tests import fakes
RAW_ROLE_ASSIGNMENTS = [
{
"links": {"assignment": "http://example"},
"role": {"id": "123456"},
"scope": {"domain": {"id": "161718"}},
"user": {"id": "313233"}
},
{
"links": {"assignment": "http://example"},
"group": {"id": "101112"},
"role": {"id": "123456"},
"scope": {"project": {"id": "456789"}}
}
]
class TestIdentityRoles(base.TestCase): class TestIdentityRoles(base.TestCase):
def setUp(self): def setUp(self):
@ -63,3 +81,28 @@ class TestIdentityRoles(base.TestCase):
mock_get.return_value = meta.obj_to_dict(role_obj) mock_get.return_value = meta.obj_to_dict(role_obj)
self.assertTrue(self.cloud.delete_role('1234')) self.assertTrue(self.cloud.delete_role('1234'))
self.assertTrue(mock_keystone.roles.delete.called) self.assertTrue(mock_keystone.roles.delete.called)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_list_role_assignments(self, mock_keystone):
mock_keystone.role_assignments.list.return_value = RAW_ROLE_ASSIGNMENTS
ret = self.cloud.list_role_assignments()
mock_keystone.role_assignments.list.assert_called_once_with()
normalized_assignments = _utils.normalize_role_assignments(
RAW_ROLE_ASSIGNMENTS
)
self.assertEqual(normalized_assignments, ret)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_list_role_assignments_filters(self, mock_keystone):
params = dict(user='123', domain='456', effective=True)
self.cloud.list_role_assignments(filters=params)
mock_keystone.role_assignments.list.assert_called_once_with(**params)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_list_role_assignments_exception(self, mock_keystone):
mock_keystone.role_assignments.list.side_effect = Exception()
with testtools.ExpectedException(
shade.OpenStackCloudException,
"Failed to list role assignments"
):
self.cloud.list_role_assignments()