Add CRUD support for domain specific roles
This patch expands the definition of a role to include an optional domain_id, which is included in the uniqueness contraint along with the role name. The V8 role driver wrapper is updated to ensure that older drivers are oblivious to this change (and an attempt to create a domain specific role with a V8 driver will raise a NotImplemented exception). An additional item of note, is that since the uniqueness constraint contains a nullable value, we actually store a special value (as opposed to None) in the sql attribute so as to still use the sql uniqueness constraint to ensure we do not have race conditions in multi-process keystone configurations. This special value is entirely hidden within the sql driver and is not exposed to the manager layer and above. We do not support domain specific roles in the LDAP backend. Follow-on patches will modify the policy rules for the role APIs as well as modify the implied roles mechanism to support domain specific roles so that they are actually useful. Partially Implements: blueprint domain-specific-roles Change-Id: I74a8403a7bda271342708d282fd3f3f8aae63dc9
This commit is contained in:
parent
7a0874f6f6
commit
407eabde41
|
@ -303,13 +303,21 @@ class RoleV3(controller.V3Controller):
|
|||
ref = self.role_api.create_role(ref['id'], ref, initiator)
|
||||
return RoleV3.wrap_member(context, ref)
|
||||
|
||||
@controller.filterprotected('name')
|
||||
@controller.filterprotected('name', 'domain_id')
|
||||
def list_roles(self, context, filters):
|
||||
hints = RoleV3.build_driver_hints(context, filters)
|
||||
refs = self.role_api.list_roles(
|
||||
hints=hints)
|
||||
return RoleV3.wrap_collection(context, refs, hints=hints)
|
||||
|
||||
def list_roles_wrapper(self, context):
|
||||
# If there is no domain_id filter defined, then we only want to return
|
||||
# global roles, so we set the domain_id filter to None.
|
||||
params = context['query_string']
|
||||
if 'domain_id' not in params:
|
||||
context['query_string']['domain_id'] = None
|
||||
return self.list_roles(context)
|
||||
|
||||
@controller.protected()
|
||||
def get_role(self, context, role_id):
|
||||
ref = self.role_api.get_role(role_id)
|
||||
|
|
|
@ -1436,6 +1436,12 @@ class RoleManager(manager.Manager):
|
|||
return self.driver.list_roles(hints or driver_hints.Hints())
|
||||
|
||||
def update_role(self, role_id, role, initiator=None):
|
||||
original_role = self.driver.get_role(role_id)
|
||||
if ('domain_id' in role and
|
||||
role['domain_id'] != original_role['domain_id']):
|
||||
raise exception.ValidationError(
|
||||
message=_('Update of `domain_id` is not allowed.'))
|
||||
|
||||
ret = self.driver.update_role(role_id, role)
|
||||
notifications.Audit.updated(self._ROLE, role_id, initiator)
|
||||
self.get_role.invalidate(self, role_id)
|
||||
|
@ -1624,6 +1630,16 @@ class V9RoleWrapperForV8Driver(RoleDriverV9):
|
|||
since we do not guarantee to support new
|
||||
functionality with legacy drivers.
|
||||
|
||||
This V8 wrapper contains the following support for newer manager code:
|
||||
|
||||
- The current manager code expects a role entity to have a domain_id
|
||||
attribute, with a non-None value indicating a domain specific role. V8
|
||||
drivers will only understand global roles, hence if a non-None domain_id
|
||||
is passed to this wrapper, it will raise a NotImplemented exception.
|
||||
If a None-valued domain_id is passed in, it will be trimmed off before
|
||||
the underlying driver is called (and a None-valued domain_id attribute
|
||||
is added in for any entities returned to the manager.
|
||||
|
||||
"""
|
||||
|
||||
@versionutils.deprecated(
|
||||
|
@ -1634,20 +1650,49 @@ class V9RoleWrapperForV8Driver(RoleDriverV9):
|
|||
def __init__(self, wrapped_driver):
|
||||
self.driver = wrapped_driver
|
||||
|
||||
def _append_null_domain_id(self, role_or_list):
|
||||
def _append_null_domain_id_to_dict(role):
|
||||
if 'domain_id' not in role:
|
||||
role['domain_id'] = None
|
||||
return role
|
||||
|
||||
if isinstance(role_or_list, list):
|
||||
return [_append_null_domain_id_to_dict(x) for x in role_or_list]
|
||||
else:
|
||||
return _append_null_domain_id_to_dict(role_or_list)
|
||||
|
||||
def _trim_and_assert_null_domain_id(self, role):
|
||||
if 'domain_id' in role:
|
||||
if role['domain_id'] is not None:
|
||||
raise exception.NotImplemented(
|
||||
_('Domain specific roles are not supported in the V8 '
|
||||
'role driver'))
|
||||
else:
|
||||
new_role = role.copy()
|
||||
new_role.pop('domain_id')
|
||||
return new_role
|
||||
else:
|
||||
return role
|
||||
|
||||
def create_role(self, role_id, role):
|
||||
return self.driver.create_role(role_id, role)
|
||||
new_role = self._trim_and_assert_null_domain_id(role)
|
||||
return self._append_null_domain_id(
|
||||
self.driver.create_role(role_id, new_role))
|
||||
|
||||
def list_roles(self, hints):
|
||||
return self.driver.list_roles(hints)
|
||||
return self._append_null_domain_id(self.driver.list_roles(hints))
|
||||
|
||||
def list_roles_from_ids(self, role_ids):
|
||||
return self.driver.list_roles_from_ids(role_ids)
|
||||
return self._append_null_domain_id(
|
||||
self.driver.list_roles_from_ids(role_ids))
|
||||
|
||||
def get_role(self, role_id):
|
||||
return self.driver.get_role(role_id)
|
||||
return self._append_null_domain_id(self.driver.get_role(role_id))
|
||||
|
||||
def update_role(self, role_id, role):
|
||||
return self.driver.update_role(role_id, role)
|
||||
update_role = self._trim_and_assert_null_domain_id(role)
|
||||
return self._append_null_domain_id(
|
||||
self.driver.update_role(role_id, update_role))
|
||||
|
||||
def delete_role(self, role_id):
|
||||
self.driver.delete_role(role_id)
|
||||
|
|
|
@ -16,6 +16,14 @@ from keystone.common import driver_hints
|
|||
from keystone.common import sql
|
||||
from keystone import exception
|
||||
|
||||
# NOTE(henry-nash): From the manager and above perspective, the domain_id
|
||||
# attribute of a role is nullable. However, to ensure uniqueness in
|
||||
# multi-process configurations, it is better to still use a sql uniqueness
|
||||
# constraint. Since the support for a nullable component of a uniqueness
|
||||
# constraint across different sql databases is mixed, we instead store a
|
||||
# special value to represent null, as defined in NULL_DOMAIN_ID beloe.
|
||||
NULL_DOMAIN_ID = '<<null>>'
|
||||
|
||||
|
||||
class Role(assignment.RoleDriverV9):
|
||||
|
||||
|
@ -28,6 +36,16 @@ class Role(assignment.RoleDriverV9):
|
|||
|
||||
@driver_hints.truncated
|
||||
def list_roles(self, hints):
|
||||
# If there is a filter on domain_id and the value is None, then to
|
||||
# ensure that the sql filtering works correctly, we need to patch
|
||||
# the value to be NULL_DOMAIN_ID. This is safe to do here since we
|
||||
# know we are able to satisfy any filter of this type in the call to
|
||||
# filter_limit_query() below, which will remove the filter from the
|
||||
# hints (hence ensuring our substitution is not exposed to the caller).
|
||||
for f in hints.filters:
|
||||
if (f['name'] == 'domain_id' and f['value'] is None):
|
||||
f['value'] = NULL_DOMAIN_ID
|
||||
|
||||
with sql.transaction() as session:
|
||||
query = session.query(RoleTable)
|
||||
refs = sql.filter_limit_query(RoleTable, query, hints)
|
||||
|
@ -153,9 +171,28 @@ class ImpliedRoleTable(sql.ModelBase, sql.DictBase):
|
|||
|
||||
|
||||
class RoleTable(sql.ModelBase, sql.DictBase):
|
||||
|
||||
def to_dict(self, include_extra_dict=False):
|
||||
d = super(RoleTable, self).to_dict(
|
||||
include_extra_dict=include_extra_dict)
|
||||
if d['domain_id'] == NULL_DOMAIN_ID:
|
||||
d['domain_id'] = None
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, role_dict):
|
||||
if 'domain_id' in role_dict and role_dict['domain_id'] is None:
|
||||
new_dict = role_dict.copy()
|
||||
new_dict['domain_id'] = NULL_DOMAIN_ID
|
||||
else:
|
||||
new_dict = role_dict
|
||||
return super(RoleTable, cls).from_dict(new_dict)
|
||||
|
||||
__tablename__ = 'role'
|
||||
attributes = ['id', 'name']
|
||||
attributes = ['id', 'name', 'domain_id']
|
||||
id = sql.Column(sql.String(64), primary_key=True)
|
||||
name = sql.Column(sql.String(255), unique=True, nullable=False)
|
||||
name = sql.Column(sql.String(255), nullable=False)
|
||||
domain_id = sql.Column(sql.String(64), nullable=False,
|
||||
server_default=NULL_DOMAIN_ID)
|
||||
extra = sql.Column(sql.JsonBlob())
|
||||
__table_args__ = (sql.UniqueConstraint('name'), {})
|
||||
__table_args__ = (sql.UniqueConstraint('name', 'domain_id'), {})
|
||||
|
|
|
@ -71,7 +71,8 @@ class Routers(wsgi.RoutersBase):
|
|||
|
||||
routers.append(
|
||||
router.Router(controllers.RoleV3(), 'roles', 'role',
|
||||
resource_descriptions=self.v3_resources))
|
||||
resource_descriptions=self.v3_resources,
|
||||
overrides={'list': 'list_roles_wrapper'}))
|
||||
|
||||
implied_roles_controller = controllers.ImpliedRolesV3()
|
||||
self._add_resource(
|
||||
|
|
|
@ -19,14 +19,22 @@ from keystone.common import wsgi
|
|||
class Router(wsgi.ComposableRouter):
|
||||
def __init__(self, controller, collection_key, key,
|
||||
resource_descriptions=None,
|
||||
is_entity_implemented=True):
|
||||
is_entity_implemented=True,
|
||||
overrides=None):
|
||||
self.controller = controller
|
||||
self.key = key
|
||||
self.collection_key = collection_key
|
||||
self._resource_descriptions = resource_descriptions
|
||||
self._is_entity_implemented = is_entity_implemented
|
||||
self.overrides = overrides
|
||||
|
||||
def add_routes(self, mapper):
|
||||
def _assign_action(action, key, overrides):
|
||||
if overrides is not None and action in overrides:
|
||||
return overrides[action]
|
||||
else:
|
||||
return '%(action)s_%(key)s' % {'action': action, 'key': key}
|
||||
|
||||
collection_path = '/%(collection_key)s' % {
|
||||
'collection_key': self.collection_key}
|
||||
entity_path = '/%(collection_key)s/{%(key)s_id}' % {
|
||||
|
@ -36,27 +44,27 @@ class Router(wsgi.ComposableRouter):
|
|||
mapper.connect(
|
||||
collection_path,
|
||||
controller=self.controller,
|
||||
action='create_%s' % self.key,
|
||||
action=_assign_action('create', self.key, self.overrides),
|
||||
conditions=dict(method=['POST']))
|
||||
mapper.connect(
|
||||
collection_path,
|
||||
controller=self.controller,
|
||||
action='list_%s' % self.collection_key,
|
||||
action=_assign_action('list', self.collection_key, self.overrides),
|
||||
conditions=dict(method=['GET']))
|
||||
mapper.connect(
|
||||
entity_path,
|
||||
controller=self.controller,
|
||||
action='get_%s' % self.key,
|
||||
action=_assign_action('get', self.key, self.overrides),
|
||||
conditions=dict(method=['GET']))
|
||||
mapper.connect(
|
||||
entity_path,
|
||||
controller=self.controller,
|
||||
action='update_%s' % self.key,
|
||||
action=_assign_action('update', self.key, self.overrides),
|
||||
conditions=dict(method=['PATCH']))
|
||||
mapper.connect(
|
||||
entity_path,
|
||||
controller=self.controller,
|
||||
action='delete_%s' % self.key,
|
||||
action=_assign_action('delete', self.key, self.overrides),
|
||||
conditions=dict(method=['DELETE']))
|
||||
|
||||
# Add the collection resource and entity resource to the resource
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import migrate
|
||||
import sqlalchemy as sql
|
||||
|
||||
_ROLE_NAME_OLD_CONSTRAINT = 'ixu_role_name'
|
||||
_ROLE_NAME_NEW_CONSTRAINT = 'ixu_role_name_domain_id'
|
||||
_ROLE_TABLE_NAME = 'role'
|
||||
_DOMAIN_ID_COLUMN_NAME = 'domain_id'
|
||||
_NULL_DOMAIN_ID = '<<null>>'
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
meta = sql.MetaData()
|
||||
meta.bind = migrate_engine
|
||||
|
||||
role_table = sql.Table(_ROLE_TABLE_NAME, meta, autoload=True)
|
||||
domain_id = sql.Column(_DOMAIN_ID_COLUMN_NAME, sql.String(64),
|
||||
nullable=False, server_default=_NULL_DOMAIN_ID)
|
||||
role_table.create_column(domain_id)
|
||||
|
||||
migrate.UniqueConstraint(role_table.c.name,
|
||||
name=_ROLE_NAME_OLD_CONSTRAINT).drop()
|
||||
|
||||
migrate.UniqueConstraint(role_table.c.name,
|
||||
role_table.c.domain_id,
|
||||
name=_ROLE_NAME_NEW_CONSTRAINT).create()
|
|
@ -10,6 +10,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from keystone.common import sql
|
||||
from keystone import exception
|
||||
from keystone.tests import unit
|
||||
|
@ -21,7 +23,8 @@ class SqlRoleModels(core_sql.BaseBackendSqlModels):
|
|||
|
||||
def test_role_model(self):
|
||||
cols = (('id', sql.String, 64),
|
||||
('name', sql.String, 255))
|
||||
('name', sql.String, 255),
|
||||
('domain_id', sql.String, 64))
|
||||
self.assertExpectedSchema('role', cols)
|
||||
|
||||
|
||||
|
@ -36,3 +39,68 @@ class SqlRole(core_sql.BaseBackendSqlTests, core.RoleTests):
|
|||
self.assertRaises(exception.RoleNotFound,
|
||||
self.role_api.get_role,
|
||||
role['id'])
|
||||
|
||||
def test_create_duplicate_role_domain_specific_name_fails(self):
|
||||
domain = unit.new_domain_ref()
|
||||
role1 = unit.new_role_ref(domain_id=domain['id'])
|
||||
self.role_api.create_role(role1['id'], role1)
|
||||
role2 = unit.new_role_ref(name=role1['name'],
|
||||
domain_id=domain['id'])
|
||||
self.assertRaises(exception.Conflict,
|
||||
self.role_api.create_role,
|
||||
role2['id'],
|
||||
role2)
|
||||
|
||||
def test_update_domain_id_of_role_fails(self):
|
||||
# Create a global role
|
||||
role1 = unit.new_role_ref()
|
||||
role1 = self.role_api.create_role(role1['id'], role1)
|
||||
# Try and update it to be domain specific
|
||||
domainA = unit.new_domain_ref()
|
||||
role1['domain_id'] = domainA['id']
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.role_api.update_role,
|
||||
role1['id'],
|
||||
role1)
|
||||
|
||||
# Create a domain specific role from scratch
|
||||
role2 = unit.new_role_ref(domain_id=domainA['id'])
|
||||
self.role_api.create_role(role2['id'], role2)
|
||||
# Try to "move" it to another domain
|
||||
domainB = unit.new_domain_ref()
|
||||
role2['domain_id'] = domainB['id']
|
||||
self.assertRaises(exception.ValidationError,
|
||||
self.role_api.update_role,
|
||||
role2['id'],
|
||||
role2)
|
||||
|
||||
def test_domain_specific_separation(self):
|
||||
domain1 = unit.new_domain_ref()
|
||||
role1 = unit.new_role_ref(domain_id=domain1['id'])
|
||||
role_ref1 = self.role_api.create_role(role1['id'], role1)
|
||||
self.assertDictEqual(role1, role_ref1)
|
||||
# Check we can have the same named role in a different domain
|
||||
domain2 = unit.new_domain_ref()
|
||||
role2 = unit.new_role_ref(name=role1['name'], domain_id=domain2['id'])
|
||||
role_ref2 = self.role_api.create_role(role2['id'], role2)
|
||||
self.assertDictEqual(role2, role_ref2)
|
||||
# ...and in fact that you can have the same named role as a global role
|
||||
role3 = unit.new_role_ref(name=role1['name'])
|
||||
role_ref3 = self.role_api.create_role(role3['id'], role3)
|
||||
self.assertDictEqual(role3, role_ref3)
|
||||
# Check that updating one doesn't change the others
|
||||
role1['name'] = uuid.uuid4().hex
|
||||
self.role_api.update_role(role1['id'], role1)
|
||||
role_ref1 = self.role_api.get_role(role1['id'])
|
||||
self.assertDictEqual(role1, role_ref1)
|
||||
role_ref2 = self.role_api.get_role(role2['id'])
|
||||
self.assertDictEqual(role2, role_ref2)
|
||||
role_ref3 = self.role_api.get_role(role3['id'])
|
||||
self.assertDictEqual(role3, role_ref3)
|
||||
# Check that deleting one of these, doesn't affect the others
|
||||
self.role_api.delete_role(role1['id'])
|
||||
self.assertRaises(exception.RoleNotFound,
|
||||
self.role_api.get_role,
|
||||
role1['id'])
|
||||
self.role_api.get_role(role2['id'])
|
||||
self.role_api.get_role(role3['id'])
|
||||
|
|
|
@ -401,6 +401,7 @@ def new_role_ref(**kwargs):
|
|||
ref = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'domain_id': None
|
||||
}
|
||||
ref.update(kwargs)
|
||||
return ref
|
||||
|
|
|
@ -96,24 +96,31 @@ ROLES = [
|
|||
{
|
||||
'id': 'admin',
|
||||
'name': 'admin',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': 'member',
|
||||
'name': 'Member',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': '9fe2ff9ee4384b1894a90878d3e92bab',
|
||||
'name': '_member_',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': 'other',
|
||||
'name': 'Other',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': 'browser',
|
||||
'name': 'Browser',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': 'writer',
|
||||
'name': 'Writer',
|
||||
'domain_id': None,
|
||||
}, {
|
||||
'id': 'service',
|
||||
'name': 'Service',
|
||||
'domain_id': None,
|
||||
}
|
||||
]
|
||||
|
||||
|
|
|
@ -637,6 +637,50 @@ class SqlUpgradeTests(SqlMigrateBase):
|
|||
constraint_names = [constraint['name'] for constraint in constraints]
|
||||
self.assertIn('duplicate_trust_constraint', constraint_names)
|
||||
|
||||
def test_add_domain_specific_roles(self):
|
||||
"""Check database upgraded successfully for domain specific roles.
|
||||
|
||||
The following items need to be checked:
|
||||
|
||||
- The domain_id column has been added
|
||||
- That it has been added to the uniqueness constraints
|
||||
- Existing roles have their domain_id columns set to the specific
|
||||
string of '<<null>>'
|
||||
|
||||
"""
|
||||
NULL_DOMAIN_ID = '<<null>>'
|
||||
|
||||
self.upgrade(87)
|
||||
session = self.Session()
|
||||
role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
|
||||
# Add a role before we upgrade, so we can check that its new domain_id
|
||||
# attribute is handled correctly
|
||||
role_id = uuid.uuid4().hex
|
||||
self.insert_dict(session, 'role',
|
||||
{'id': role_id, 'name': uuid.uuid4().hex})
|
||||
session.close()
|
||||
|
||||
self.upgrade(88)
|
||||
|
||||
session = self.Session()
|
||||
self.metadata.clear()
|
||||
self.assertTableColumns('role', ['id', 'name', 'domain_id', 'extra'])
|
||||
# Check the domain_id has been added to the uniqueness constraint
|
||||
inspector = reflection.Inspector.from_engine(self.engine)
|
||||
constraints = inspector.get_unique_constraints('role')
|
||||
constraint_columns = [
|
||||
constraint['column_names'] for constraint in constraints
|
||||
if constraint['name'] == 'ixu_role_name_domain_id']
|
||||
self.assertIn('domain_id', constraint_columns[0])
|
||||
|
||||
# Now check our role has its domain_id attribute set correctly
|
||||
role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
|
||||
cols = [role_table.c.domain_id]
|
||||
filter = role_table.c.id == role_id
|
||||
statement = sqlalchemy.select(cols).where(filter)
|
||||
role_entry = session.execute(statement).fetchone()
|
||||
self.assertEqual(NULL_DOMAIN_ID, role_entry[0])
|
||||
|
||||
def populate_user_table(self, with_pass_enab=False,
|
||||
with_pass_enab_domain=False):
|
||||
# Populate the appropriate fields in the user
|
||||
|
|
|
@ -1045,6 +1045,21 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
|
|||
*args,
|
||||
**kwargs)
|
||||
|
||||
def assertRoleInListResponse(self, resp, ref, expected=1):
|
||||
found_count = 0
|
||||
for entity in resp.result.get('roles'):
|
||||
try:
|
||||
self.assertValidRole(entity, ref=ref)
|
||||
except Exception:
|
||||
# It doesn't match, so let's go onto the next one
|
||||
pass
|
||||
else:
|
||||
found_count += 1
|
||||
self.assertEqual(expected, found_count)
|
||||
|
||||
def assertRoleNotInListResponse(self, resp, ref):
|
||||
self.assertRoleInListResponse(resp, ref=ref, expected=0)
|
||||
|
||||
def assertValidRoleResponse(self, resp, *args, **kwargs):
|
||||
return self.assertValidResponse(
|
||||
resp,
|
||||
|
@ -1058,6 +1073,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
|
|||
self.assertIsNotNone(entity.get('name'))
|
||||
if ref:
|
||||
self.assertEqual(ref['name'], entity['name'])
|
||||
self.assertEqual(ref['domain_id'], entity['domain_id'])
|
||||
return entity
|
||||
|
||||
# role assignment validation
|
||||
|
|
|
@ -2577,3 +2577,75 @@ class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin,
|
|||
prior = self._create_role()
|
||||
url = '/roles/%s/implies/%s' % (prior['id'], root_role['id'])
|
||||
self.put(url, expected_status=http_client.FORBIDDEN)
|
||||
|
||||
|
||||
class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase):
|
||||
def setUp(self):
|
||||
def create_role(domain_id=None):
|
||||
"""Call ``POST /roles``."""
|
||||
ref = unit.new_role_ref(domain_id=domain_id)
|
||||
r = self.post(
|
||||
'/roles',
|
||||
body={'role': ref})
|
||||
return self.assertValidRoleResponse(r, ref)
|
||||
|
||||
super(DomainSpecificRoleTests, self).setUp()
|
||||
self.domainA = unit.new_domain_ref()
|
||||
self.resource_api.create_domain(self.domainA['id'], self.domainA)
|
||||
self.domainB = unit.new_domain_ref()
|
||||
self.resource_api.create_domain(self.domainB['id'], self.domainB)
|
||||
|
||||
self.global_role1 = create_role()
|
||||
self.global_role2 = create_role()
|
||||
# Since there maybe other global roles already created, let's count
|
||||
# them, so we can ensure we can check subsequent list responses
|
||||
# are correct
|
||||
r = self.get('/roles')
|
||||
self.existing_global_roles = len(r.result['roles'])
|
||||
|
||||
# And now create some domain specific roles
|
||||
self.domainA_role1 = create_role(domain_id=self.domainA['id'])
|
||||
self.domainA_role2 = create_role(domain_id=self.domainA['id'])
|
||||
self.domainB_role = create_role(domain_id=self.domainB['id'])
|
||||
|
||||
def test_get_and_list_domain_specific_roles(self):
|
||||
# Check we can get a domain specific role
|
||||
r = self.get('/roles/%s' % self.domainA_role1['id'])
|
||||
self.assertValidRoleResponse(r, self.domainA_role1)
|
||||
|
||||
# If we list without specifying a domain, we should only get global
|
||||
# roles back.
|
||||
r = self.get('/roles')
|
||||
self.assertValidRoleListResponse(
|
||||
r, expected_length=self.existing_global_roles)
|
||||
self.assertRoleInListResponse(r, self.global_role1)
|
||||
self.assertRoleInListResponse(r, self.global_role2)
|
||||
self.assertRoleNotInListResponse(r, self.domainA_role1)
|
||||
self.assertRoleNotInListResponse(r, self.domainA_role2)
|
||||
self.assertRoleNotInListResponse(r, self.domainB_role)
|
||||
|
||||
# Now list those in domainA, making sure that's all we get back
|
||||
r = self.get('/roles?domain_id=%s' % self.domainA['id'])
|
||||
self.assertValidRoleListResponse(r, expected_length=2)
|
||||
self.assertRoleInListResponse(r, self.domainA_role1)
|
||||
self.assertRoleInListResponse(r, self.domainA_role2)
|
||||
|
||||
def test_update_domain_specific_roles(self):
|
||||
self.domainA_role1['name'] = uuid.uuid4().hex
|
||||
self.patch('/roles/%(role_id)s' % {
|
||||
'role_id': self.domainA_role1['id']},
|
||||
body={'role': self.domainA_role1})
|
||||
r = self.get('/roles/%s' % self.domainA_role1['id'])
|
||||
self.assertValidRoleResponse(r, self.domainA_role1)
|
||||
|
||||
def test_delete_domain_specific_roles(self):
|
||||
# Check delete only removes that one domain role
|
||||
self.delete('/roles/%(role_id)s' % {
|
||||
'role_id': self.domainA_role1['id']})
|
||||
|
||||
self.get('/roles/%s' % self.domainA_role1['id'],
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
# Now re-list those in domainA, making sure there's only one left
|
||||
r = self.get('/roles?domain_id=%s' % self.domainA['id'])
|
||||
self.assertValidRoleListResponse(r, expected_length=1)
|
||||
self.assertRoleInListResponse(r, self.domainA_role2)
|
||||
|
|
Loading…
Reference in New Issue