Fix privilege escalation via spoofed identity headers

The external_oauth2_token middleware did not sanitize incoming
authentication headers before processing OAuth 2.0 tokens. This
allowed an attacker to send forged identity headers (e.g.,
X-Is-Admin-Project, X-Roles, X-User-Id) that would not be cleared
by the middleware, potentially enabling privilege escalation.

This fix adds a call to remove_auth_headers() at the start of
request processing to sanitize all incoming identity headers,
matching the secure behavior of the main auth_token middleware.

Closes-Bug: #2129018
Change-Id: Idd4fe1d17a25b3064b31f454d9830242f345e018
Signed-off-by: Jeremy Stanley <fungi@yuggoth.org>
Signed-off-by: Artem Goncharov <artem.goncharov@gmail.com>
This commit is contained in:
Grzegorz Grasza
2026-01-08 14:46:19 +01:00
committed by Artem Goncharov
parent 44460934f8
commit e15e33fe9b
2 changed files with 81 additions and 2 deletions

View File

@@ -33,6 +33,7 @@ from keystoneauth1.loading import session as session_loading
from keystonemiddleware._common import config
from keystonemiddleware.auth_token import _cache
from keystonemiddleware.auth_token import _request
from keystonemiddleware.exceptions import ConfigurationError
from keystonemiddleware.exceptions import KeystoneMiddlewareException
from keystonemiddleware.i18n import _
@@ -534,7 +535,7 @@ class ExternalAuth2Protocol(object):
**cache_kwargs)
return _cache.TokenCache(self._log, **cache_kwargs)
@webob.dec.wsgify()
@webob.dec.wsgify(RequestClass=_request._AuthTokenRequest)
def __call__(self, req):
"""Handle incoming request."""
self.process_request(req)
@@ -545,8 +546,10 @@ class ExternalAuth2Protocol(object):
"""Process request.
:param request: Incoming request
:type request: _request.AuthTokenRequest
:type request: _request._AuthTokenRequest
"""
request.remove_auth_headers()
access_token = None
if (request.authorization and
request.authorization.authtype == 'Bearer'):

View File

@@ -1823,6 +1823,82 @@ class ExternalOauth2TokenMiddlewareClientSecretBasicTest(
self.assertEqual(resp.headers.get('WWW-Authenticate'),
'Authorization OAuth 2.0 uri="%s"' % self._audience)
def test_spoofed_headers_are_sanitized(self):
"""Test that spoofed identity headers are removed and replaced.
This test verifies the fix for a privilege escalation vulnerability
where an attacker could send spoofed identity headers that would not
be cleared by the middleware, allowing unauthorized access.
"""
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
# Use non-admin roles in the token metadata
non_admin_roles = 'member,reader'
non_admin_metadata = copy.deepcopy(self._default_metadata)
non_admin_metadata['roles'] = non_admin_roles
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=non_admin_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
# Attempt to spoof multiple identity headers
spoofed_headers = get_authorization_header(self._token)
spoofed_headers.update({
'X-Identity-Status': 'Confirmed',
'X-Is-Admin-Project': 'true',
'X-User-Id': 'spoofed_admin_user_id',
'X-User-Name': 'spoofed_admin',
'X-Roles': 'admin,superuser',
'X-Project-Id': 'spoofed_project_id',
'X-User-Domain-Id': 'spoofed_domain_id',
'X-User-Domain-Name': 'spoofed_domain',
})
resp = self.call_middleware(
headers=spoofed_headers,
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertEqual(FakeApp.SUCCESS, resp.body)
# Verify spoofed headers were replaced with actual token values
env = resp.request.environ
# X-Is-Admin-Project should not be present (not the spoofed 'true')
# because the token has non-admin roles and the middleware only sets
# this header when is_admin is true
self.assertNotIn('HTTP_X_IS_ADMIN_PROJECT', env)
# User info should match the token, not the spoofed values
self.assertEqual(self._user_id, env['HTTP_X_USER_ID'])
self.assertEqual(self._user_name, env['HTTP_X_USER_NAME'])
self.assertEqual(self._user_domain_id, env['HTTP_X_USER_DOMAIN_ID'])
self.assertEqual(
self._user_domain_name,
env['HTTP_X_USER_DOMAIN_NAME']
)
# Roles should be from the token, not spoofed
self.assertEqual(non_admin_roles, env['HTTP_X_ROLES'])
# Project info should match the token
self.assertEqual(self._project_id, env['HTTP_X_PROJECT_ID'])
class ExternalAuth2ProtocolTest(BaseExternalOauth2TokenMiddlewareTest):