From 729dcad7384ba66ee7494154969cdd7ae90d86ee Mon Sep 17 00:00:00 2001 From: Brant Knudson Date: Wed, 21 May 2014 17:30:42 -0500 Subject: [PATCH] LDAP fix for get_roles_for_user_and_project user=group ID When there was a role assigned to a group with the same ID as a user, the LDAP assignment backend would incorrectly return the assignment to the group when requesting roles for the user via the get_roles_for_user_and_project method. With this change, assignments to a group with the same ID are not returned for the user when calling get_roles_for_user_and_project. Functions were added to compare DNs more accurately based on the LDAP RFCs. The fakeldap code was changed to normalize the values when comparing values for checking if the values match the filter. Co-Authored By: Nathan Kinder Co-Authored By: Adam Young Change-Id: Ia6f1ae2e3af1e968f1a393bd4f2f38812a88a5d0 Closes-Bug: #1309228 --- keystone/assignment/backends/ldap.py | 5 +- keystone/common/ldap/core.py | 110 +++++++++++++++ keystone/tests/fakeldap.py | 9 +- keystone/tests/test_backend_ldap.py | 67 ++++------ keystone/tests/unit/common/test_ldap.py | 169 ++++++++++++++++++++++++ 5 files changed, 318 insertions(+), 42 deletions(-) create mode 100644 keystone/tests/unit/common/test_ldap.py diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py index 48058bddb..e5ccf1ff1 100644 --- a/keystone/assignment/backends/ldap.py +++ b/keystone/assignment/backends/ldap.py @@ -89,10 +89,11 @@ class Assignment(assignment.Driver): def _get_roles_for_just_user_and_project(user_id, tenant_id): self.get_project(tenant_id) + user_dn = self.user._id_to_dn(user_id) return [self.role._dn_to_id(a.role_dn) for a in self.role.get_role_assignments (self.project._id_to_dn(tenant_id)) - if self.user._dn_to_id(a.user_dn) == user_id] + if common_ldap.is_dn_equal(a.user_dn, user_dn)] def _get_roles_for_group_and_project(group_id, project_id): self.get_project(project_id) @@ -106,7 +107,7 @@ class Assignment(assignment.Driver): return [self.role._dn_to_id(a.role_dn) for a in self.role.get_role_assignments (self.project._id_to_dn(project_id)) - if a.user_dn.upper() == group_dn.upper()] + if common_ldap.is_dn_equal(a.user_dn, group_dn)] if domain_id is not None: msg = _('Domain metadata not supported by LDAP') diff --git a/keystone/common/ldap/core.py b/keystone/common/ldap/core.py index 625ac2dd0..cdd73468d 100644 --- a/keystone/common/ldap/core.py +++ b/keystone/common/ldap/core.py @@ -14,6 +14,7 @@ import abc import os.path +import re import codecs import ldap @@ -141,6 +142,115 @@ def ldap_scope(scope): 'options': ', '.join(LDAP_SCOPES.keys())}) +def prep_case_insensitive(value): + """Prepare a string for case-insensitive comparison. + + This is defined in RFC4518. For simplicity, all this function does is + lowercase all the characters, strip leading and trailing whitespace, + and compress sequences of spaces to a single space. + """ + value = re.sub(r'\s+', ' ', value.strip().lower()) + return value + + +def is_ava_value_equal(attribute_type, val1, val2): + """Returns True if and only if the AVAs are equal. + + When comparing AVAs, the equality matching rule for the attribute type + should be taken into consideration. For simplicity, this implementation + does a case-insensitive comparison. + + Note that this function uses prep_case_insenstive so the limitations of + that function apply here. + + """ + + return prep_case_insensitive(val1) == prep_case_insensitive(val2) + + +def is_rdn_equal(rdn1, rdn2): + """Returns True if and only if the RDNs are equal. + + * RDNs must have the same number of AVAs. + * Each AVA of the RDNs must be the equal for the same attribute type. The + order isn't significant. Note that an attribute type will only be in one + AVA in an RDN, otherwise the DN wouldn't be valid. + * Attribute types aren't case sensitive. Note that attribute type + comparison is more complicated than implemented. This function only + compares case-insentive. The code should handle multiple names for an + attribute type (e.g., cn, commonName, and 2.5.4.3 are the same). + + Note that this function uses is_ava_value_equal to compare AVAs so the + limitations of that function apply here. + + """ + + if len(rdn1) != len(rdn2): + return False + + for attr_type_1, val1, dummy in rdn1: + found = False + for attr_type_2, val2, dummy in rdn2: + if attr_type_1.lower() != attr_type_2.lower(): + continue + + found = True + if not is_ava_value_equal(attr_type_1, val1, val2): + return False + break + if not found: + return False + + return True + + +def is_dn_equal(dn1, dn2): + """Returns True if and only if the DNs are equal. + + Two DNs are equal if they've got the same number of RDNs and if the RDNs + are the same at each position. See RFC4517. + + Note that this function uses is_rdn_equal to compare RDNs so the + limitations of that function apply here. + + :param dn1: Either a string DN or a DN parsed by ldap.dn.str2dn. + :param dn2: Either a string DN or a DN parsed by ldap.dn.str2dn. + + """ + + if not isinstance(dn1, list): + dn1 = ldap.dn.str2dn(dn1) + if not isinstance(dn2, list): + dn2 = ldap.dn.str2dn(dn2) + + if len(dn1) != len(dn2): + return False + + for rdn1, rdn2 in zip(dn1, dn2): + if not is_rdn_equal(rdn1, rdn2): + return False + return True + + +def dn_startswith(descendant_dn, dn): + """Returns True if and only if the descendant_dn is under the dn. + + :param descendant_dn: Either a string DN or a DN parsed by ldap.dn.str2dn. + :param dn: Either a string DN or a DN parsed by ldap.dn.str2dn. + + """ + + if not isinstance(descendant_dn, list): + descendant_dn = ldap.dn.str2dn(descendant_dn) + if not isinstance(dn, list): + dn = ldap.dn.str2dn(dn) + + if len(descendant_dn) <= len(dn): + return False + + return is_dn_equal(descendant_dn[len(dn):], dn) + + @six.add_metaclass(abc.ABCMeta) class LDAPHandler(object): '''Abstract class which defines methods for a LDAP API provider. diff --git a/keystone/tests/fakeldap.py b/keystone/tests/fakeldap.py index 5118f5ee5..e5b060d6b 100644 --- a/keystone/tests/fakeldap.py +++ b/keystone/tests/fakeldap.py @@ -62,6 +62,11 @@ def _internal_attr(attr_name, value_or_values): if dn == 'cn=Doe\\5c, John,ou=Users,cn=example,cn=com': return 'CN=Doe\\, John,OU=Users,CN=example,CN=com' + # NOTE(blk-u): Another special case for this tested value. When a + # roleOccupant has an escaped comma, it gets converted to \2C. + if dn == 'cn=Doe\\, John,ou=Users,cn=example,cn=com': + return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com' + dn = ldap.dn.str2dn(core.utf8_encode(dn)) norm = [] for part in dn: @@ -133,7 +138,9 @@ def _match(key, value, attrs): str_sids = [six.text_type(x) for x in attrs[key]] return six.text_type(value) in str_sids if key != 'objectclass': - return _internal_attr(key, value)[0] in attrs[key] + check_value = _internal_attr(key, value)[0] + norm_values = list(_internal_attr(key, x)[0] for x in attrs[key]) + return check_value in norm_values # it is an objectclass check, so check subclasses values = _subs(value) for v in values: diff --git a/keystone/tests/test_backend_ldap.py b/keystone/tests/test_backend_ldap.py index 30ddc0dfa..53e77ee2e 100644 --- a/keystone/tests/test_backend_ldap.py +++ b/keystone/tests/test_backend_ldap.py @@ -594,6 +594,34 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertThat(ref_list, matchers.Equals([group])) + def test_user_id_comma_grants(self): + """Even if the user has a , in their ID, can get user and group grants. + """ + + # Create a user with a , in their ID + # NOTE(blk-u): the DN for this user is hard-coded in fakeldap! + user_id = u'Doe, John' + user = { + 'id': user_id, + 'name': self.getUniqueString(), + 'password': self.getUniqueString(), + 'domain_id': CONF.identity.default_domain_id, + } + self.identity_api.create_user(user_id, user) + + # Grant the user a role on a project. + + role_id = 'member' + project_id = self.tenant_baz['id'] + + self.assignment_api.create_grant(role_id, user_id=user_id, + project_id=project_id) + + role_ref = self.assignment_api.get_grant(role_id, user_id=user_id, + project_id=project_id) + + self.assertEqual(role_id, role_ref['id']) + class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): @@ -1250,45 +1278,6 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): user1['id'], CONF.identity.default_domain_id) self.assertEqual(0, len(combined_role_list)) - def test_get_roles_for_user_and_project_user_group_same_id(self): - """When a user has the same ID as a group, - get_roles_for_user_and_project returns the roles for the group. - - Overriding this test for LDAP because it works differently. The role - for the group is returned. This is bug 1309228. - """ - - # Setup: create user, group with same ID, role, and project; - # assign the group the role on the project. - - user_group_id = uuid.uuid4().hex - - user1 = {'id': user_group_id, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, } - self.identity_api.create_user(user_group_id, user1) - - group1 = {'id': user_group_id, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, } - self.identity_api.create_group(user_group_id, group1) - - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.assignment_api.create_role(role1['id'], role1) - - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, } - self.assignment_api.create_project(project1['id'], project1) - - self.assignment_api.create_grant(role1['id'], - group_id=user_group_id, - project_id=project1['id']) - - # Check the roles, shouldn't be any since the user wasn't granted any. - roles = self.assignment_api.get_roles_for_user_and_project( - user_group_id, project1['id']) - - self.assertEqual([role1['id']], roles, - 'role for group is %s' % role1['id']) - def test_list_projects_for_alternate_domain(self): self.skipTest( 'N/A: LDAP does not support multiple domains') diff --git a/keystone/tests/unit/common/test_ldap.py b/keystone/tests/unit/common/test_ldap.py new file mode 100644 index 000000000..220bf1a5e --- /dev/null +++ b/keystone/tests/unit/common/test_ldap.py @@ -0,0 +1,169 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ldap.dn + +from keystone.common import ldap as ks_ldap +from keystone import tests + + +class DnCompareTest(tests.BaseTestCase): + """Tests for the DN comparison functions in keystone.common.ldap.core.""" + + def test_prep(self): + # prep_case_insensitive returns the string with spaces at the front and + # end if it's already lowercase and no insignificant characters. + value = 'lowercase value' + self.assertEqual(value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_lowercase(self): + # prep_case_insensitive returns the string with spaces at the front and + # end and lowercases the value. + value = 'UPPERCASE VALUE' + exp_value = value.lower() + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_insignificant(self): + # prep_case_insensitive remove insignificant spaces. + value = 'before after' + exp_value = 'before after' + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_insignificant_pre_post(self): + # prep_case_insensitive remove insignificant spaces. + value = ' value ' + exp_value = 'value' + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_ava_equal_same(self): + # is_ava_value_equal returns True if the two values are the same. + value = 'val1' + self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value)) + + def test_ava_equal_complex(self): + # is_ava_value_equal returns True if the two values are the same using + # a value that's got different capitalization and insignificant chars. + val1 = 'before after' + val2 = ' BEFORE afTer ' + self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2)) + + def test_ava_different(self): + # is_ava_value_equal returns False if the values aren't the same. + self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2')) + + def test_rdn_same(self): + # is_rdn_equal returns True if the two values are the same. + rdn = ldap.dn.str2dn('cn=val1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn)) + + def test_rdn_diff_length(self): + # is_rdn_equal returns False if the RDNs have a different number of + # AVAs. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_same_order(self): + # is_rdn_equal returns True if the RDNs have the same number of AVAs + # and the values are the same. + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_diff_order(self): + # is_rdn_equal returns True if the RDNs have the same number of AVAs + # and the values are the same, even if in a different order + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_diff_type(self): + # is_rdn_equal returns False if the RDNs have the same number of AVAs + # and the attribute types are different. + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_attr_type_case_diff(self): + # is_rdn_equal returns True for same RDNs even when attr type case is + # different. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('CN=cn1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_attr_type_alias(self): + # is_rdn_equal returns False for same RDNs even when attr type alias is + # used. Note that this is a limitation since an LDAP server should + # consider them equal. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_dn_same(self): + # is_dn_equal returns True if the DNs are the same. + dn = 'cn=Babs Jansen,ou=OpenStack' + self.assertTrue(ks_ldap.is_dn_equal(dn, dn)) + + def test_dn_diff_length(self): + # is_dn_equal returns False if the DNs don't have the same number of + # RDNs + dn1 = 'cn=Babs Jansen,ou=OpenStack' + dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com' + self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2)) + + def test_dn_equal_rdns(self): + # is_dn_equal returns True if the DNs have the same number of RDNs + # and each RDN is the same. + dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource' + dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack' + self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2)) + + def test_dn_parsed_dns(self): + # is_dn_equal can also accept parsed DNs. + dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource') + dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack') + self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2)) + + def test_startswith_under_child(self): + # dn_startswith returns True if descendant_dn is a child of dn. + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'ou=OpenStack' + self.assertTrue(ks_ldap.dn_startswith(child, parent)) + + def test_startswith_parent(self): + # dn_startswith returns False if descendant_dn is a parent of dn. + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'ou=OpenStack' + self.assertFalse(ks_ldap.dn_startswith(parent, child)) + + def test_startswith_same(self): + # dn_startswith returns False if DNs are the same. + dn = 'cn=Babs Jansen,ou=OpenStack' + self.assertFalse(ks_ldap.dn_startswith(dn, dn)) + + def test_startswith_not_parent(self): + # dn_startswith returns False if descendant_dn is not under the dn + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'dc=example.com' + self.assertFalse(ks_ldap.dn_startswith(child, parent)) + + def test_startswith_descendant(self): + # dn_startswith returns True if descendant_dn is a descendant of dn. + descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com' + dn = 'ou=OpenStack,dc=example.com' + self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) + + def test_startswith_parsed_dns(self): + # dn_startswith also accepts parsed DNs. + descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack') + dn = ldap.dn.str2dn('ou=OpenStack') + self.assertTrue(ks_ldap.dn_startswith(descendant, dn))