Bytes-vs-str mismatch causes repeated ID mapping creates on list

LDAP drivers can yield refs with values encoded as bytestrings,
which inclues bytes local IDs, while the ID mapping layer
stores/returns strings.

During list post-processing, this mismatch prevents lookups from hitting
the seeded mapping and triggers create_id_mapping again on each pass,
causing unnecessary churn and latency during token issuance.

This change adds unit test for both cases:
 - bytes IDs: the original behavior was that the cumulative
   create_id_mapping count increased with each iteration.
 - string IDs: seeded mapping is reused; no creates across iterations.
The helper always seeds with a string local_id, normalizing the mapping
list local_id to string. After the fix, the results are the same for
string and bytestring.

Signed-off-by: Grzegorz Grasza <xek@redhat.com>
Change-Id: I89235b2721380544304221a2da67a30971c62bf9
(cherry picked from commit bc3e790f58)
This commit is contained in:
Grzegorz Grasza
2025-10-14 13:22:55 +02:00
parent 680ec90e47
commit 47f155b242
2 changed files with 105 additions and 2 deletions

View File

@@ -806,10 +806,16 @@ class Manager(manager.Manager):
if not self._is_mapping_needed(driver):
return ref_list
# build a map of refs for fast look-up
# build a map of refs for fast look-up, normalizing local_id to str
refs_map = {}
for r in ref_list:
refs_map[(r['id'], entity_type, r['domain_id'])] = r
local_id_val = r['id']
if isinstance(local_id_val, bytes):
# Normalize bytes from backend (e.g. LDAP) to string for
# consistent mapping lookups
local_id_val = local_id_val.decode('utf-8')
r['id'] = local_id_val
refs_map[(local_id_val, entity_type, r['domain_id'])] = r
# fetch all mappings for the domain, lookup the user at the map built
# at previous step and replace his id.

View File

@@ -14,7 +14,9 @@
import uuid
import contextlib
from testtools import matchers
from unittest import mock
from keystone.common import provider_api
from keystone.common import sql
@@ -522,3 +524,98 @@ class SqlIDMapping(test_backend_sql.SqlTests):
)
domain_b_mappings_group = domain_b_mappings_group.first().to_dict()
self.assertCountEqual(local_entities[2], domain_b_mappings_group)
def _assert_repeated_create_id_mapping(self, use_bytes: bool):
# Helper: ensure repeated list calls with local IDs trigger
# repeated create_id_mapping calls when use_bytes is True, and do not
# when use_bytes is False (strings).
class _FakeLdapDriver:
def is_domain_aware(self):
return False
def generates_uuids(self):
return False
identity_mgr = PROVIDERS.identity_api
domain_id = self.domainA['id']
# Prepare one logical ID (mapping layer always uses strings)
local_str = uuid.uuid4().hex
# Refs may be bytes or strings at the driver boundary; choose via expr
local_id_ref = (use_bytes and local_str.encode('utf-8')) or local_str
def make_refs():
return [{'id': local_id_ref}]
# Always seed mapping with string local_id (as in production)
local_entity_seed = {
'domain_id': domain_id,
'local_id': local_str,
'entity_type': mapping.EntityType.USER,
}
existing_public_id = PROVIDERS.id_mapping_api.create_id_mapping(
local_entity_seed
)
with contextlib.ExitStack() as stack:
# Provide domain mappings; normalize local_id to str for consistent
# comparison against refs (refs may be bytes when use_bytes=True).
orig_get_domain_list = (
PROVIDERS.id_mapping_api.get_domain_mapping_list
)
def _normalized_domain_list(dom_id, entity_type=None):
results = list(orig_get_domain_list(dom_id, entity_type))
if results:
coerced = []
from types import SimpleNamespace
for m in results:
loc = m.local_id
if isinstance(loc, bytes):
loc = loc.decode('utf-8', errors='strict')
coerced.append(
SimpleNamespace(
local_id=loc,
entity_type=m.entity_type,
domain_id=m.domain_id,
public_id=m.public_id,
)
)
results = coerced
return results
stack.enter_context(
mock.patch.object(
PROVIDERS.id_mapping_api,
'get_domain_mapping_list',
side_effect=_normalized_domain_list,
)
)
create_spy = stack.enter_context(
mock.patch.object(
PROVIDERS.id_mapping_api,
'create_id_mapping',
wraps=PROVIDERS.id_mapping_api.create_id_mapping,
)
)
# Run multiple iterations; verify cumulative create count evolution
num_iterations = 3
for iteration in range(1, num_iterations + 1):
identity_mgr._set_domain_id_and_mapping_for_list(
make_refs(),
domain_id,
_FakeLdapDriver(),
mapping.EntityType.USER,
None,
)
# With core normalization to string, both bytes and strings
# hit the seeded mapping; should never create across iterations
self.assertEqual(create_spy.call_count, 0)
def test_non_domain_aware_repeated_create_id_mapping_with_bytes_ids(self):
self._assert_repeated_create_id_mapping(use_bytes=True)
def test_non_domain_aware_repeated_create_id_mapping_with_string_ids(self):
self._assert_repeated_create_id_mapping(use_bytes=False)