Read-only default domain for LDAP (bug 1168726)

NOTE: this patch effectively *removes* a feature new in grizzly
(multi-domain support for LDAP) in favor of fixing our existing 99% use
case (proper read-only support for LDAP for a single domain).

A proper fix to the above was also blocked by bug 1117356, so that's
fixed here as well (updates properly return the resulting entities).

Change-Id: I672b90e67545cc1fe65b05ef7f8af5b42ca6afc3
This commit is contained in:
Dolph Mathews 2013-05-03 15:55:59 -05:00
parent f509179e15
commit 39c4ca1e2f
4 changed files with 182 additions and 168 deletions

View File

@ -339,6 +339,8 @@ class BaseLdap(object):
except ldap.NO_SUCH_OBJECT:
raise self._not_found(id)
return self.get(id)
def delete(self, id):
if not self.allow_delete:
action = _('LDAP %s delete') % self.options_name
@ -572,14 +574,16 @@ class EnabledEmuMixIn(BaseLdap):
if 'enabled' not in self.attribute_ignore and self.enabled_emulation:
data = values.copy()
enabled_value = data.pop('enabled', None)
super(EnabledEmuMixIn, self).update(object_id, data, old_obj)
ref = super(EnabledEmuMixIn, self).update(object_id, data, old_obj)
if enabled_value is not None:
if enabled_value:
self._add_enabled(object_id)
else:
self._remove_enabled(object_id)
return ref
else:
super(EnabledEmuMixIn, self).update(object_id, values, old_obj)
return super(EnabledEmuMixIn, self).update(
object_id, values, old_obj)
def delete(self, object_id):
if self.enabled_emulation:

View File

@ -29,10 +29,16 @@ from keystone import config
from keystone import exception
from keystone import identity
CONF = config.CONF
CONF = config.CONF
LOG = logging.getLogger(__name__)
DEFAULT_DOMAIN = {
'id': CONF.identity.default_domain_id,
'name': 'Default',
'enabled': True
}
class Identity(identity.Driver):
def __init__(self):
@ -46,7 +52,6 @@ class Identity(identity.Driver):
self.project = ProjectApi(CONF)
self.role = RoleApi(CONF)
self.group = GroupApi(CONF)
self.domain = DomainApi(CONF)
def get_connection(self, user=None, password=None):
if self.LDAP_URL.startswith('fake://'):
@ -60,6 +65,36 @@ class Identity(identity.Driver):
conn.simple_bind_s(user, password)
return conn
def _validate_domain(self, ref):
"""Validate that either the default domain or nothing is specified.
Also removes the domain from the ref so that LDAP doesn't have to
persist the attribute.
"""
ref = ref.copy()
domain_id = ref.pop('domain_id', CONF.identity.default_domain_id)
self._validate_domain_id(domain_id)
return ref
def _validate_domain_id(self, domain_id):
"""Validate that the domain ID specified belongs to the default domain.
"""
if domain_id != CONF.identity.default_domain_id:
raise exception.DomainNotFound(domain_id=domain_id)
def _set_default_domain(self, ref):
"""Overrides any domain reference with the default domain."""
if isinstance(ref, dict):
ref = ref.copy()
ref['domain_id'] = CONF.identity.default_domain_id
return ref
elif isinstance(ref, list):
return [self._set_default_domain(x) for x in ref]
else:
raise ValueError(_('Expected dict or list: %s') % type(ref))
# Identity interface
def authenticate(self, user_id=None, tenant_id=None, password=None):
"""Authenticate based on a user, tenant and password.
@ -98,38 +133,38 @@ class Identity(identity.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
return (identity.filter_user(user_ref), tenant_ref, metadata_ref)
user_ref = self._set_default_domain(identity.filter_user(user_ref))
return (user_ref, tenant_ref, metadata_ref)
def get_project(self, tenant_id):
return self.project.get(tenant_id)
return self._set_default_domain(self.project.get(tenant_id))
def list_projects(self):
return self.project.get_all()
return self._set_default_domain(self.project.get_all())
def get_project_by_name(self, tenant_name, domain_id):
# TODO(henry-nash): Use domain_id once domains are implemented
# in LDAP backend
return self.project.get_by_name(tenant_name)
self._validate_domain_id(domain_id)
return self._set_default_domain(self.project.get_by_name(tenant_name))
def _get_user(self, user_id):
return self.user.get(user_id)
def get_user(self, user_id):
return identity.filter_user(self._get_user(user_id))
ref = identity.filter_user(self._get_user(user_id))
return self._set_default_domain(ref)
def list_users(self):
return self.user.get_all()
return self._set_default_domain(self.user.get_all())
def get_user_by_name(self, user_name, domain_id):
# TODO(henry-nash): Use domain_id once domains are implemented
# in LDAP backend
return identity.filter_user(self.user.get_by_name(user_name))
self._validate_domain_id(domain_id)
ref = identity.filter_user(self.user.get_by_name(user_name))
return self._set_default_domain(ref)
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
# FIXME(henry-nash): Use domain_id and group_id once domains
# and groups are implemented in LDAP backend
if domain_id is not None:
raise NotImplemented('Domain metadata not supported by LDAP.')
if not self.get_project(tenant_id) or not self.get_user(user_id):
return {}
@ -150,7 +185,7 @@ class Identity(identity.Driver):
def get_project_users(self, tenant_id):
self.get_project(tenant_id)
return self.project.get_users(tenant_id)
return self._set_default_domain(self.project.get_users(tenant_id))
def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
@ -166,27 +201,32 @@ class Identity(identity.Driver):
# CRUD
def create_user(self, user_id, user):
user = self._validate_domain(user)
user['name'] = clean.user_name(user['name'])
return identity.filter_user(self.user.create(user))
user_ref = self.user.create(user)
return self._set_default_domain(identity.filter_user(user_ref))
def update_user(self, user_id, user):
user = self._validate_domain(user)
if 'name' in user:
user['name'] = clean.user_name(user['name'])
return self.user.update(user_id, user)
return self._set_default_domain(self.user.update(user_id, user))
def create_project(self, tenant_id, tenant):
tenant = self._validate_domain(tenant)
tenant['name'] = clean.project_name(tenant['name'])
data = tenant.copy()
if 'id' not in data or data['id'] is None:
data['id'] = str(uuid.uuid4().hex)
if 'description' in data and data['description'] in ['', None]:
data.pop('description')
return self.project.create(data)
return self._set_default_domain(self.project.create(data))
def update_project(self, tenant_id, tenant):
tenant = self._validate_domain(tenant)
if 'name' in tenant:
tenant['name'] = clean.project_name(tenant['name'])
return self.project.update(tenant_id, tenant)
return self._set_default_domain(self.project.update(tenant_id, tenant))
def create_metadata(self, user_id, tenant_id, metadata):
return {}
@ -227,16 +267,18 @@ class Identity(identity.Driver):
self.role.update(role_id, role)
def create_group(self, group_id, group):
group = self._validate_domain(group)
group['name'] = clean.group_name(group['name'])
return self.group.create(group)
return self._set_default_domain(self.group.create(group))
def get_group(self, group_id):
return self.group.get(group_id)
return self._set_default_domain(self.group.get(group_id))
def update_group(self, group_id, group):
group = self._validate_domain(group)
if 'name' in group:
group['name'] = clean.group_name(group['name'])
return self.group.update(group_id, group)
return self._set_default_domain(self.group.update(group_id, group))
def delete_group(self, group_id):
return self.group.delete(group_id)
@ -253,14 +295,14 @@ class Identity(identity.Driver):
def list_groups_for_user(self, user_id):
self.get_user(user_id)
return self.group.list_user_groups(user_id)
return self._set_default_domain(self.group.list_user_groups(user_id))
def list_groups(self):
return self.group.get_all()
return self._set_default_domain(self.group.get_all())
def list_users_in_group(self, group_id):
self.get_group(group_id)
return self.group.list_group_users(group_id)
return self._set_default_domain(self.group.list_group_users(group_id))
def check_user_in_group(self, user_id, group_id):
self.get_user(user_id)
@ -274,28 +316,25 @@ class Identity(identity.Driver):
return found
def create_domain(self, domain_id, domain):
domain['name'] = clean.domain_name(domain['name'])
return self.domain.create(domain)
if domain_id == CONF.identity.default_domain_id:
msg = 'Duplicate ID, %s.' % domain_id
raise exception.Conflict(type='domain', details=msg)
raise exception.Forbidden('Domains are read-only against LDAP')
def get_domain(self, domain_id):
try:
return self.domain.get(domain_id)
except exception.NotFound:
raise exception.DomainNotFound(domain_id=domain_id)
self._validate_domain_id(domain_id)
return DEFAULT_DOMAIN
def update_domain(self, domain_id, domain):
if 'name' in domain:
domain['name'] = clean.domain_name(domain['name'])
return self.domain.update(domain_id, domain)
self._validate_domain_id(domain_id)
raise exception.Forbidden('Domains are read-only against LDAP')
def delete_domain(self, domain_id):
try:
return self.domain.delete(domain_id)
except ldap.NO_SUCH_OBJECT:
raise exception.DomainNotFound(domain_id=domain_id)
self._validate_domain_id(domain_id)
raise exception.Forbidden('Domains are read-only against LDAP')
def list_domains(self):
return self.domain.get_all()
return [DEFAULT_DOMAIN]
# TODO(termie): remove this and move cross-api calls into driver
@ -339,12 +378,6 @@ class ApiShim(object):
self._group = GroupApi(self.conf)
return self._group
@property
def domain(self):
if not self._domain:
self._domain = DomainApi(self.conf)
return self._domain
# TODO(termie): remove this and move cross-api calls into driver
class ApiShimMixin(object):
@ -578,7 +611,7 @@ class ProjectApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap,
if old_obj['name'] != values['name']:
msg = 'Changing Name not supported by LDAP'
raise exception.NotImplemented(message=msg)
super(ProjectApi, self).update(id, values, old_obj)
return super(ProjectApi, self).update(id, values, old_obj)
class UserRoleAssociation(object):
@ -789,7 +822,7 @@ class RoleApi(common_ldap.BaseLdap, ApiShimMixin):
raise exception.Conflict('Cannot duplicate name %s' % old_name)
except exception.NotFound:
pass
super(RoleApi, self).update(role_id, role)
return super(RoleApi, self).update(role_id, role)
def delete(self, id):
conn = self.get_connection()
@ -876,7 +909,7 @@ class GroupApi(common_ldap.BaseLdap, ApiShimMixin):
if old_obj['name'] != values['name']:
msg = _('Changing Name not supported by LDAP')
raise exception.NotImplemented(message=msg)
super(GroupApi, self).update(id, values, old_obj)
return super(GroupApi, self).update(id, values, old_obj)
def add_user(self, user_id, group_id):
conn = self.get_connection()
@ -935,61 +968,3 @@ class GroupApi(common_ldap.BaseLdap, ApiShimMixin):
" from the group. The user will be ignored.") %
dict(user_dn=user_dn, group_dn=group_dn))
return users
class DomainApi(common_ldap.EnabledEmuMixIn, common_ldap.BaseLdap,
ApiShimMixin):
DEFAULT_OU = 'ou=Domains'
DEFAULT_STRUCTURAL_CLASSES = []
DEFAULT_OBJECTCLASS = 'groupOfNames'
DEFAULT_ID_ATTR = 'cn'
DEFAULT_MEMBER_ATTRIBUTE = 'member'
DEFAULT_ATTRIBUTE_IGNORE = []
options_name = 'domain'
attribute_mapping = {'name': 'ou',
'description': 'description',
'domainId': 'cn',
'enabled': 'enabled'}
model = models.Domain
def __init__(self, conf):
super(DomainApi, self).__init__(conf)
self.api = ApiShim(conf)
self.attribute_mapping['name'] = conf.ldap.domain_name_attribute
self.attribute_mapping['description'] = conf.ldap.domain_desc_attribute
self.attribute_mapping['enabled'] = conf.ldap.tenant_enabled_attribute
self.member_attribute = (getattr(conf.ldap, 'domain_member_attribute')
or self.DEFAULT_MEMBER_ATTRIBUTE)
self.attribute_ignore = (getattr(conf.ldap, 'domain_attribute_ignore')
or self.DEFAULT_ATTRIBUTE_IGNORE)
def get(self, id, filter=None):
"""Replaces exception.NotFound with exception.DomainNotFound."""
try:
return super(DomainApi, self).get(id, filter)
except exception.NotFound:
raise exception.DomainNotFound(domain_id=id)
def create(self, values):
self.affirm_unique(values)
data = values.copy()
if data.get('id') is None:
data['id'] = uuid.uuid4().hex
return super(DomainApi, self).create(data)
def delete(self, id):
if self.subtree_delete_enabled:
super(DomainApi, self).deleteTree(id)
else:
self.role_api.roles_delete_subtree_by_type(id, 'Domain')
super(DomainApi, self).delete(id)
def update(self, id, values):
try:
old_obj = self.get(id)
except exception.NotFound:
raise exception.DomainNotFound(domain_id=id)
if old_obj['name'] != values['name']:
msg = _('Changing Name not supported by LDAP')
raise exception.NotImplemented(message=msg)
super(DomainApi, self).update(id, values, old_obj)

View File

@ -32,6 +32,11 @@ TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
class IdentityTests(object):
def _get_domain_fixture(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
return domain
def test_project_add_and_remove_user_role(self):
user_refs = self.identity_api.get_project_users(self.tenant_bar['id'])
self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs])
@ -560,8 +565,7 @@ class IdentityTests(object):
- Check non-existing domain gives DomainNotFound
"""
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_domain = self._get_domain_fixture()
new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
@ -1560,10 +1564,14 @@ class IdentityTests(object):
self.assertTrue(x for x in users if x['id'] == test_user['id'])
def test_list_groups(self):
group1 = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
group2 = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
group1 = {
'id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex}
group2 = {
'id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, group1['id'], group1)
self.identity_man.create_group({}, group2['id'], group2)
groups = self.identity_api.list_groups()
@ -1678,9 +1686,8 @@ class IdentityTests(object):
self.assertEqual(tenant_ref['enabled'], tenant['enabled'])
def test_add_user_to_group(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
@ -1698,8 +1705,7 @@ class IdentityTests(object):
self.assertTrue(found)
def test_add_user_to_group_404(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': domain['id']}
@ -1709,7 +1715,7 @@ class IdentityTests(object):
new_user['id'],
uuid.uuid4().hex)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
self.assertRaises(exception.UserNotFound,
@ -1718,9 +1724,8 @@ class IdentityTests(object):
new_group['id'])
def test_check_user_in_group(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
@ -1732,8 +1737,10 @@ class IdentityTests(object):
self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
def test_check_user_not_in_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
new_group = {
'id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.check_user_in_group,
@ -1741,9 +1748,8 @@ class IdentityTests(object):
new_group['id'])
def test_list_users_in_group(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
@ -1760,9 +1766,8 @@ class IdentityTests(object):
self.assertTrue(found)
def test_remove_user_from_group(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
@ -1779,13 +1784,12 @@ class IdentityTests(object):
self.assertFalse(x['id'] == new_group['id'])
def test_remove_user_from_group_404(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': domain['id']}
self.identity_man.create_user({}, new_user['id'], new_user)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],
'name': uuid.uuid4().hex}
self.identity_man.create_group({}, new_group['id'], new_group)
self.assertRaises(exception.NotFound,
@ -1911,7 +1915,8 @@ class IdentityTests(object):
domain['id'])
def test_user_crud(self):
user = {'domain_id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
user = {'domain_id': CONF.identity.default_domain_id,
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex, 'password': 'passw0rd'}
self.identity_api.create_user(user['id'], user)
user_ref = self.identity_api.get_user(user['id'])

View File

@ -34,6 +34,9 @@ CONF = config.CONF
class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def _get_domain_fixture(self):
"""Domains in LDAP are read-only, so just return the static one."""
return self.identity_api.get_domain(CONF.identity.default_domain_id)
def clear_database(self):
db = fakeldap.FakeShelve().get_instance()
@ -374,8 +377,11 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
# In the tests below, the update is demonstrated by updating description.
# Refer to bug 1136403 for more detail.
def test_group_crud(self):
group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex}
group = {
'id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex}
self.identity_api.create_group(group['id'], group)
group_ref = self.identity_api.get_group(group['id'])
self.assertDictEqual(group_ref, group)
@ -392,15 +398,25 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def test_domain_crud(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True, 'description': uuid.uuid4().hex}
self.identity_api.create_domain(domain['id'], domain)
domain_ref = self.identity_api.get_domain(domain['id'])
self.assertDictEqual(domain_ref, domain)
domain['description'] = uuid.uuid4().hex
self.identity_api.update_domain(domain['id'], domain)
domain_ref = self.identity_api.get_domain(domain['id'])
self.assertDictEqual(domain_ref, domain)
self.identity_api.delete_domain(domain['id'])
with self.assertRaises(exception.Forbidden):
self.identity_api.create_domain(domain['id'], domain)
with self.assertRaises(exception.Conflict):
self.identity_api.create_domain(
CONF.identity.default_domain_id, domain)
with self.assertRaises(exception.DomainNotFound):
domain_ref = self.identity_api.get_domain(domain['id'])
with self.assertRaises(exception.DomainNotFound):
domain['description'] = uuid.uuid4().hex
self.identity_api.update_domain(domain['id'], domain)
with self.assertRaises(exception.Forbidden):
self.identity_api.update_domain(
CONF.identity.default_domain_id, domain)
with self.assertRaises(exception.DomainNotFound):
self.identity_api.get_domain(domain['id'])
with self.assertRaises(exception.DomainNotFound):
self.identity_api.delete_domain(domain['id'])
with self.assertRaises(exception.Forbidden):
self.identity_api.delete_domain(CONF.identity.default_domain_id)
self.assertRaises(exception.DomainNotFound,
self.identity_api.get_domain,
domain['id'])
@ -421,10 +437,10 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_get_and_remove_role_grant_by_group_and_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_role_grant_by_user_and_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_correct_role_grant_from_a_mix(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
@ -434,7 +450,7 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
# updating of a project name so this method override
# provides a different update test
project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'description': uuid.uuid4().hex
}
self.identity_api.create_project(project['id'], project)
@ -457,34 +473,34 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
project['id'])
def test_get_and_remove_role_grant_by_group_and_cross_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_get_and_remove_role_grant_by_user_and_cross_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_role_grant_by_group_and_cross_domain_project(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_role_grant_by_user_and_cross_domain_project(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_multi_role_grant_by_user_group_on_project_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_delete_role_with_user_and_group_grants(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_delete_user_with_group_project_domain_links(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_delete_group_with_user_project_domain_links(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_list_user_projects(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
def test_get_project_users(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_create_duplicate_user_name_in_different_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@ -493,7 +509,8 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_create_duplicate_group_name_in_different_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
raise nose.exc.SkipTest(
'N/A: LDAP does not support multiple domains')
def test_move_user_between_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@ -502,7 +519,8 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_move_group_between_domains(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
raise nose.exc.SkipTest(
'N/A: LDAP does not support multiple domains')
def test_move_group_between_domains_with_clashing_names_fails(self):
raise nose.exc.SkipTest('Blocked by bug 1101276')
@ -514,7 +532,7 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
raise nose.exc.SkipTest('Blocked by bug 1101276')
def test_get_roles_for_user_and_domain(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
raise nose.exc.SkipTest('N/A: LDAP does not support multiple domains')
def test_list_group_members_missing_entry(self):
"""List group members with deleted user.
@ -551,6 +569,14 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.assertEqual(len(res), 1, "Expected 1 entry (user_1)")
self.assertEqual(res[0]['id'], user_1_id, "Expected user 1 id")
def test_list_domains(self):
domains = self.identity_api.list_domains()
self.assertEquals(
domains,
[{'id': CONF.identity.default_domain_id,
'name': 'Default',
'enabled': True}])
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
@ -596,10 +622,11 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
# NOTE(topol): LDAPIdentityEnabledEmulation will create an
# enabled key in the project dictionary so this
# method override handles this side-effect
project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': uuid.uuid4().hex,
'description': uuid.uuid4().hex
}
project = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'description': uuid.uuid4().hex}
self.identity_api.create_project(project['id'], project)
project_ref = self.identity_api.get_project(project['id'])
@ -621,8 +648,11 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
project['id'])
def test_user_crud(self):
user = {'domain_id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex, 'password': 'passw0rd'}
user = {
'id': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex}
self.identity_man.create_user({}, user['id'], user)
user['enabled'] = True
user_ref = self.identity_api.get_user(user['id'])