Add service user authentication to ec2 and s3 endpoints

Add a policy to enforce authentication with a user in the service
group. This maintains AWS compatibility with the added security
layer.

Conflicts:
    keystone/tests/unit/test_contrib_ec2_core.py
    keystone/tests/unit/test_contrib_s3_core.py
    keystone/tests/unit/test_v3_credential.py

Closes-Bug: 2119646
Change-Id: Ic84b84247e05f29874e2c5636a033aaedd4de83c
Signed-off-by: Grzegorz Grasza <xek@redhat.com>
Signed-off-by: Jeremy Stanley <fungi@yuggoth.org>
Signed-off-by: Artem Goncharov <artem.goncharov@gmail.com>
Signed-off-by: Brian Haley <haleyb.dev@gmail.com>
(cherry picked from commit 68c1817e1cf1ed284d8420a6e1261749648bccd8)
(cherry picked from commit aea59e3117)
This commit is contained in:
Grzegorz Grasza
2025-09-19 14:02:18 +02:00
committed by Brian Haley
parent 4219df0751
commit 4f51cdcc5c
9 changed files with 153 additions and 17 deletions

View File

@@ -245,6 +245,8 @@ identity:delete_application_credential DELETE /v3/users/{use
identity:get_access_rule GET /v3/users/{user_id}/access_rules/{access_rule_id}
identity:list_access_rules GET /v3/users/{user_id}/access_rules
identity:delete_access_rule DELETE /v3/users/{user_id}/access_rules/{access_rule_id}
identity:s3tokens_validate POST /v3/s3tokens
identity:ec2tokens_validate POST /v3/es2tokens
========================================================= ===

View File

@@ -21,6 +21,7 @@ from oslo_serialization import jsonutils
from keystone.api._shared import EC2_S3_Resource
from keystone.api._shared import json_home_relations
from keystone.common import rbac_enforcer
from keystone.common import render_token
from keystone.common import utils
from keystone import exception
@@ -31,6 +32,9 @@ from keystone.server import flask as ks_flask
CRED_TYPE_EC2 = 'ec2'
ENFORCER = rbac_enforcer.RBACEnforcer
class EC2TokensResource(EC2_S3_Resource.ResourceBase):
@staticmethod
def _check_signature(creds_ref, credentials):
@@ -60,12 +64,14 @@ class EC2TokensResource(EC2_S3_Resource.ResourceBase):
raise exception.Unauthorized(
_('EC2 signature not supplied.'))
@ks_flask.unenforced_api
def post(self):
"""Authenticate ec2 token.
POST /v3/ec2tokens
"""
# Enforce RBAC in the same way as S3 tokens
ENFORCER.enforce_call(action='identity:ec2tokens_validate')
token = self.handle_authenticate()
token_reference = render_token.render_token_response_from_model(token)
resp_body = jsonutils.dumps(token_reference)

View File

@@ -22,12 +22,15 @@ from oslo_serialization import jsonutils
from keystone.api._shared import EC2_S3_Resource
from keystone.api._shared import json_home_relations
from keystone.common import rbac_enforcer
from keystone.common import render_token
from keystone.common import utils
from keystone import exception
from keystone.i18n import _
from keystone.server import flask as ks_flask
ENFORCER = rbac_enforcer.RBACEnforcer
def _calculate_signature_v1(string_to_sign, secret_key):
"""Calculate a v1 signature.
@@ -90,12 +93,14 @@ class S3Resource(EC2_S3_Resource.ResourceBase):
raise exception.Unauthorized(
message=_('Credential signature mismatch'))
@ks_flask.unenforced_api
def post(self):
"""Authenticate s3token.
POST /v3/s3tokens
"""
# Use standard Keystone policy enforcement for s3tokens access
ENFORCER.enforce_call(action='identity:s3tokens_validate')
token = self.handle_authenticate()
token_reference = render_token.render_token_response_from_model(token)
resp_body = jsonutils.dumps(token_reference)

View File

@@ -22,6 +22,7 @@ from keystone.common.policies import credential
from keystone.common.policies import domain
from keystone.common.policies import domain_config
from keystone.common.policies import ec2_credential
from keystone.common.policies import ec2tokens
from keystone.common.policies import endpoint
from keystone.common.policies import endpoint_group
from keystone.common.policies import grant
@@ -40,6 +41,7 @@ from keystone.common.policies import registered_limit
from keystone.common.policies import revoke_event
from keystone.common.policies import role
from keystone.common.policies import role_assignment
from keystone.common.policies import s3tokens
from keystone.common.policies import service
from keystone.common.policies import service_provider
from keystone.common.policies import token
@@ -78,6 +80,8 @@ def list_rules():
revoke_event.list_rules(),
role.list_rules(),
role_assignment.list_rules(),
s3tokens.list_rules(),
ec2tokens.list_rules(),
service.list_rules(),
service_provider.list_rules(),
token_revocation.list_rules(),

View File

@@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from keystone.common.policies import base
# Align EC2 tokens API with S3 tokens: require admin or service users
ADMIN_OR_SERVICE = 'rule:service_or_admin'
ec2tokens_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'ec2tokens_validate',
check_str=ADMIN_OR_SERVICE,
scope_types=['system', 'domain', 'project'],
description='Validate EC2 credentials and create a Keystone token. '
'Restricted to service users or administrators.',
operations=[{'path': '/v3/ec2tokens', 'method': 'POST'}],
)
]
def list_rules():
return ec2tokens_policies

View File

@@ -0,0 +1,36 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from keystone.common.policies import base
# S3 tokens API requires service authentication to prevent presigned URL
# exploitation.
# This policy restricts access to service users or administrators only
ADMIN_OR_SERVICE = 'rule:service_or_admin'
s3tokens_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 's3tokens_validate',
check_str=ADMIN_OR_SERVICE,
scope_types=['system', 'domain', 'project'],
description='Validate S3 credentials and create a Keystone token. '
'Restricted to service users or administrators to prevent '
'exploitation via presigned URLs.',
operations=[{'path': '/v3/s3tokens', 'method': 'POST'}],
)
]
def list_rules():
return s3tokens_policies

View File

@@ -44,7 +44,7 @@ class EC2ContribCoreV3(test_v3.RestfulTestCase):
self.assertEqual(http.client.METHOD_NOT_ALLOWED,
resp.status_code)
def test_valid_authentication_response_with_proper_secret(self):
def _test_valid_authentication_response_with_proper_secret(self, **kwargs):
signer = ec2_utils.Ec2Signer(self.cred_blob['secret'])
timestamp = utils.isotime(timeutils.utcnow())
credentials = {
@@ -60,11 +60,35 @@ class EC2ContribCoreV3(test_v3.RestfulTestCase):
},
}
credentials['signature'] = signer.generate(credentials)
# Authenticate as system admin by default unless overridden via kwargs
token = None
if 'noauth' in kwargs and kwargs['noauth']:
token = None
else:
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.role_id
)
token = self.get_system_scoped_token()
expected_status = kwargs.get('expected_status', http.client.OK)
resp = self.post(
'/ec2tokens',
body={'credentials': credentials},
expected_status=http.client.OK)
self.assertValidProjectScopedTokenResponse(resp, self.user)
expected_status=expected_status,
token=token,
noauth=kwargs.get('noauth'),
)
if expected_status == http.client.OK:
self.assertValidProjectScopedTokenResponse(resp, self.user)
def test_valid_authentication_response_with_proper_secret(self):
self._test_valid_authentication_response_with_proper_secret()
def test_valid_authentication_response_with_proper_secret_noauth(self):
# ec2 endpoint now enforces RBAC; unauthenticated should be denied
self._test_valid_authentication_response_with_proper_secret(
expected_status=http.client.UNAUTHORIZED, noauth=True
)
def test_valid_authentication_response_with_signature_v4(self):
signer = ec2_utils.Ec2Signer(self.cred_blob['secret'])

View File

@@ -46,20 +46,39 @@ class S3ContribCore(test_v3.RestfulTestCase):
self.assertEqual(http.client.METHOD_NOT_ALLOWED,
resp.status_code)
def test_good_response(self):
def _test_good_response(self, expected_status=http.client.OK, **kwargs):
sts = 'string to sign' # opaque string from swift3
sig = hmac.new(self.cred_blob['secret'].encode('ascii'),
sts.encode('ascii'), hashlib.sha1).digest()
sig = hmac.new(
self.cred_blob['secret'].encode('ascii'),
sts.encode('ascii'),
hashlib.sha1,
).digest()
resp = self.post(
'/s3tokens',
body={'credentials': {
'access': self.cred_blob['access'],
'signature': base64.b64encode(sig).strip(),
'token': base64.b64encode(sts.encode('ascii')).strip(),
}},
expected_status=http.client.OK)
self.assertValidProjectScopedTokenResponse(resp, self.user,
forbid_token_id=True)
body={
'credentials': {
'access': self.cred_blob['access'],
'signature': base64.b64encode(sig).strip(),
'token': base64.b64encode(sts.encode('ascii')).strip(),
}
},
expected_status=expected_status,
**kwargs,
)
if expected_status == http.client.OK:
self.assertValidProjectScopedTokenResponse(
resp, self.user, forbid_token_id=True
)
else:
self.assertValidErrorResponse(resp)
def test_good_response(self):
self._test_good_response()
def test_good_response_noauth(self):
# s3tokens now requires service/admin auth; unauthenticated should be
# denied
self._test_good_response(http.client.UNAUTHORIZED, noauth=True)
def test_bad_request(self):
self.post(

View File

@@ -85,10 +85,16 @@ class CredentialBaseTestCase(test_v3.RestfulTestCase):
'verb': 'GET',
'path': '/bar',
'params': params}
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.role_id
)
token = self.get_system_scoped_token()
r = self.post(
'/ec2tokens',
body={'ec2Credentials': sig_ref},
expected_status=http.client.OK)
expected_status=http.client.OK,
token=token,
)
self.assertValidTokenResponse(r)
return r.result['token']