s3request: refactor to introduce SigChecker classes

Previously the SigV4Mixin would override S3Request signature checking
methods. This patch refactors the signature checking into sigv2 and
sigv4 helper classes i.e. moves towards composition rather than
inheritance.

No behavioural changes are intended with this patch.

Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Thibault Person <thibault.person@ovhcloud.com>
Change-Id: Icaf86181c3bf7804c5176db48d2de5e2fc6f24d2
This commit is contained in:
Alistair Coles
2025-04-15 14:36:29 +01:00
parent c7f93f0f63
commit a93e420d32
3 changed files with 137 additions and 109 deletions

View File

@@ -171,22 +171,119 @@ class HashingInput(InputProxy):
return chunk
class BaseSigChecker:
def __init__(self, req):
self.req = req
self.signature = req.signature
self.string_to_sign = self._string_to_sign()
self._secret = None
def _string_to_sign(self):
raise NotImplementedError
def _derive_secret(self, secret):
return utf8encode(secret)
def _check_signature(self):
raise NotImplementedError
def check_signature(self, secret):
self._secret = self._derive_secret(secret)
return self._check_signature()
class SigCheckerV2(BaseSigChecker):
def _string_to_sign(self):
"""
Create 'StringToSign' value in Amazon terminology for v2.
"""
buf = [swob.wsgi_to_bytes(wsgi_str) for wsgi_str in [
self.req.method,
_header_strip(self.req.headers.get('Content-MD5')) or '',
_header_strip(self.req.headers.get('Content-Type')) or '']]
if 'headers_raw' in self.req.environ: # eventlet >= 0.19.0
# See https://github.com/eventlet/eventlet/commit/67ec999
amz_headers = defaultdict(list)
for key, value in self.req.environ['headers_raw']:
key = key.lower()
if not key.startswith('x-amz-'):
continue
amz_headers[key.strip()].append(value.strip())
amz_headers = dict((key, ','.join(value))
for key, value in amz_headers.items())
else: # mostly-functional fallback
amz_headers = dict((key.lower(), value)
for key, value in self.req.headers.items()
if key.lower().startswith('x-amz-'))
if self.req._is_header_auth:
if 'x-amz-date' in amz_headers:
buf.append(b'')
elif 'Date' in self.req.headers:
buf.append(swob.wsgi_to_bytes(self.req.headers['Date']))
elif self.req._is_query_auth:
buf.append(swob.wsgi_to_bytes(self.req.params['Expires']))
else:
# Should have already raised NotS3Request in _parse_auth_info,
# but as a sanity check...
raise AccessDenied(reason='not_s3')
for key, value in sorted(amz_headers.items()):
buf.append(swob.wsgi_to_bytes("%s:%s" % (key, value)))
path = self.req._canonical_uri()
if self.req.query_string:
path += '?' + self.req.query_string
params = []
if '?' in path:
path, args = path.split('?', 1)
for key, value in sorted(self.req.params.items()):
if key in ALLOWED_SUB_RESOURCES:
params.append('%s=%s' % (key, value) if value else key)
if params:
buf.append(swob.wsgi_to_bytes('%s?%s' % (path, '&'.join(params))))
else:
buf.append(swob.wsgi_to_bytes(path))
return b'\n'.join(buf)
def _check_signature(self):
valid_signature = base64.b64encode(hmac.new(
self._secret, self.string_to_sign, sha1
).digest()).strip().decode('ascii')
return streq_const_time(self.signature, valid_signature)
class SigCheckerV4(BaseSigChecker):
def __init__(self, req):
super().__init__(req)
self._all_chunk_signatures_valid = True
def _string_to_sign(self):
return b'\n'.join([
b'AWS4-HMAC-SHA256',
self.req.timestamp.amz_date_format.encode('ascii'),
'/'.join(self.req.scope.values()).encode('utf8'),
sha256(self.req._canonical_request()).hexdigest().encode('ascii')])
def _derive_secret(self, secret):
derived_secret = b'AWS4' + super()._derive_secret(secret)
for scope_piece in self.req.scope.values():
derived_secret = hmac.new(
derived_secret, scope_piece.encode('utf8'), sha256).digest()
return derived_secret
def _check_signature(self):
valid_signature = hmac.new(
self._secret, self.string_to_sign, sha256).hexdigest()
return streq_const_time(self.signature, valid_signature)
class SigV4Mixin(object):
"""
A request class mixin to provide S3 signature v4 functionality
"""
def check_signature(self, secret):
secret = utf8encode(secret)
user_signature = self.signature
derived_secret = b'AWS4' + secret
for scope_piece in self.scope.values():
derived_secret = hmac.new(
derived_secret, scope_piece.encode('utf8'), sha256).digest()
valid_signature = hmac.new(
derived_secret, self.string_to_sign, sha256).hexdigest()
return streq_const_time(user_signature, valid_signature)
@property
def _is_query_auth(self):
return 'X-Amz-Credential' in self.params
@@ -506,16 +603,6 @@ class SigV4Mixin(object):
('terminal', 'aws4_request'),
])
def _string_to_sign(self):
"""
Create 'StringToSign' value in Amazon terminology for v4.
"""
return b'\n'.join([
b'AWS4-HMAC-SHA256',
self.timestamp.amz_date_format.encode('ascii'),
'/'.join(self.scope.values()).encode('utf8'),
sha256(self._canonical_request()).hexdigest().encode('ascii')])
def signature_does_not_match_kwargs(self):
kwargs = super(SigV4Mixin, self).signature_does_not_match_kwargs()
cr = self._canonical_request()
@@ -565,13 +652,18 @@ class S3Request(swob.Request):
self.bucket_in_host = self._parse_host()
self.container_name, self.object_name = self._parse_uri()
self._validate_headers()
# Lock in string-to-sign now, before we start messing with query params
self.string_to_sign = self._string_to_sign()
if isinstance(self, SigV4Mixin):
# this is a deliberate but only partial shift away from the
# 'inherit and override from mixin' pattern towards a 'compose
# adapters' pattern.
self.sig_checker = SigCheckerV4(self)
else:
self.sig_checker = SigCheckerV2(self)
self.environ['s3api.auth_details'] = {
'access_key': self.access_key,
'signature': self.signature,
'string_to_sign': self.string_to_sign,
'check_signature': self.check_signature,
'string_to_sign': self.sig_checker.string_to_sign,
'check_signature': self.sig_checker.check_signature,
}
self.account = None
self.user_id = None
@@ -632,14 +724,6 @@ class S3Request(swob.Request):
return part_number
def check_signature(self, secret):
secret = utf8encode(secret)
user_signature = self.signature
valid_signature = base64.b64encode(hmac.new(
secret, self.string_to_sign, sha1
).digest()).strip().decode('ascii')
return streq_const_time(user_signature, valid_signature)
@property
def timestamp(self):
"""
@@ -1045,70 +1129,14 @@ class S3Request(swob.Request):
raw_path_info = '/' + self.bucket_in_host + raw_path_info
return raw_path_info
def _string_to_sign(self):
"""
Create 'StringToSign' value in Amazon terminology for v2.
"""
amz_headers = {}
buf = [swob.wsgi_to_bytes(wsgi_str) for wsgi_str in [
self.method,
_header_strip(self.headers.get('Content-MD5')) or '',
_header_strip(self.headers.get('Content-Type')) or '']]
if 'headers_raw' in self.environ: # eventlet >= 0.19.0
# See https://github.com/eventlet/eventlet/commit/67ec999
amz_headers = defaultdict(list)
for key, value in self.environ['headers_raw']:
key = key.lower()
if not key.startswith('x-amz-'):
continue
amz_headers[key.strip()].append(value.strip())
amz_headers = dict((key, ','.join(value))
for key, value in amz_headers.items())
else: # mostly-functional fallback
amz_headers = dict((key.lower(), value)
for key, value in self.headers.items()
if key.lower().startswith('x-amz-'))
if self._is_header_auth:
if 'x-amz-date' in amz_headers:
buf.append(b'')
elif 'Date' in self.headers:
buf.append(swob.wsgi_to_bytes(self.headers['Date']))
elif self._is_query_auth:
buf.append(swob.wsgi_to_bytes(self.params['Expires']))
else:
# Should have already raised NotS3Request in _parse_auth_info,
# but as a sanity check...
raise AccessDenied(reason='not_s3')
for key, value in sorted(amz_headers.items()):
buf.append(swob.wsgi_to_bytes("%s:%s" % (key, value)))
path = self._canonical_uri()
if self.query_string:
path += '?' + self.query_string
params = []
if '?' in path:
path, args = path.split('?', 1)
for key, value in sorted(self.params.items()):
if key in ALLOWED_SUB_RESOURCES:
params.append('%s=%s' % (key, value) if value else key)
if params:
buf.append(swob.wsgi_to_bytes('%s?%s' % (path, '&'.join(params))))
else:
buf.append(swob.wsgi_to_bytes(path))
return b'\n'.join(buf)
def signature_does_not_match_kwargs(self):
return {
'a_w_s_access_key_id': self.access_key,
'string_to_sign': self.string_to_sign,
'string_to_sign': self.sig_checker.string_to_sign,
'signature_provided': self.signature,
'string_to_sign_bytes': ' '.join(
format(ord(c), '02x')
for c in self.string_to_sign.decode('latin1')),
for c in self.sig_checker.string_to_sign.decode('latin1')),
}
@property

View File

@@ -706,7 +706,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
date_header = self.get_date_header()
req.headers['Date'] = date_header
with mock.patch('swift.common.middleware.s3api.s3request.'
'S3Request.check_signature') as mock_cs:
'SigCheckerV2.check_signature') as mock_cs:
status, headers, body = self.call_s3api(req)
self.assertIn('swift.backend_path', req.environ)
self.assertEqual(
@@ -737,7 +737,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
date_header = self.get_date_header()
req.headers['Date'] = date_header
with mock.patch('swift.common.middleware.s3api.s3request.'
'S3Request.check_signature') as mock_cs:
'SigCheckerV2.check_signature') as mock_cs:
status, headers, body = self.call_s3api(req)
self.assertIn('swift.backend_path', req.environ)
self.assertEqual(
@@ -1300,9 +1300,9 @@ class TestS3ApiMiddleware(S3ApiTestCase):
patch.object(swift.common.middleware.s3api.s3request,
'SERVICE', 'host'):
req = _get_req(path, environ)
hash_in_sts = req._string_to_sign().split(b'\n')[3]
hash_in_sts = req.sig_checker._string_to_sign().split(b'\n')[3]
self.assertEqual(hash_val, hash_in_sts.decode('ascii'))
self.assertTrue(req.check_signature(
self.assertTrue(req.sig_checker.check_signature(
'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'))
# all next data got from aws4_testsuite from Amazon

View File

@@ -791,8 +791,8 @@ class TestRequest(S3ApiTestCase):
b'Tue, 27 Mar 2007 19:36:42 +0000',
b'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(secret))
self.assertEqual(expected_sts, sigv2_req.sig_checker.string_to_sign)
self.assertTrue(sigv2_req.sig_checker.check_signature(secret))
req = Request.blank('/photos/puppy.jpg', method='PUT', headers={
'Content-Type': 'image/jpeg',
@@ -811,8 +811,8 @@ class TestRequest(S3ApiTestCase):
b'Tue, 27 Mar 2007 21:15:45 +0000',
b'/johnsmith/photos/puppy.jpg',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(secret))
self.assertEqual(expected_sts, sigv2_req.sig_checker.string_to_sign)
self.assertTrue(sigv2_req.sig_checker.check_signature(secret))
req = Request.blank(
'/?prefix=photos&max-keys=50&marker=puppy',
@@ -832,12 +832,12 @@ class TestRequest(S3ApiTestCase):
b'Tue, 27 Mar 2007 19:42:41 +0000',
b'/johnsmith/',
])
self.assertEqual(expected_sts, sigv2_req._string_to_sign())
self.assertTrue(sigv2_req.check_signature(secret))
self.assertEqual(expected_sts, sigv2_req.sig_checker.string_to_sign)
self.assertTrue(sigv2_req.sig_checker.check_signature(secret))
with patch('swift.common.middleware.s3api.s3request.streq_const_time',
return_value=True) as mock_eq:
self.assertTrue(sigv2_req.check_signature(secret))
self.assertTrue(sigv2_req.sig_checker.check_signature(secret))
mock_eq.assert_called_once()
def test_check_signature_sigv2(self):
@@ -861,7 +861,7 @@ class TestRequest(S3ApiTestCase):
'storage_domains': ['s3.amazonaws.com']}))
# This is a failure case with utf-8 non-ascii multi-bytes charactor
# but we expect to return just False instead of exceptions
self.assertFalse(sigv2_req.check_signature(
self.assertFalse(sigv2_req.sig_checker.check_signature(
u'\u30c9\u30e9\u30b4\u30f3'))
# Test v4 check_signature with multi bytes invalid secret
@@ -877,12 +877,12 @@ class TestRequest(S3ApiTestCase):
})
sigv4_req = SigV4Request(
req.environ, Config({'storage_domains': ['s3.amazonaws.com']}))
self.assertFalse(sigv4_req.check_signature(
self.assertFalse(sigv4_req.sig_checker.check_signature(
u'\u30c9\u30e9\u30b4\u30f3'))
with patch('swift.common.middleware.s3api.s3request.streq_const_time',
return_value=False) as mock_eq:
self.assertFalse(sigv4_req.check_signature(
self.assertFalse(sigv4_req.sig_checker.check_signature(
u'\u30c9\u30e9\u30b4\u30f3'))
mock_eq.assert_called_once()
@@ -908,7 +908,7 @@ class TestRequest(S3ApiTestCase):
sigv4_req = SigV4Request(req.environ)
self.assertTrue(
sigv4_req._canonical_request().endswith(b'UNSIGNED-PAYLOAD'))
self.assertTrue(sigv4_req.check_signature('secret'))
self.assertTrue(sigv4_req.sig_checker.check_signature('secret'))
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_check_signature_sigv4_url_encode(self):
@@ -935,7 +935,7 @@ class TestRequest(S3ApiTestCase):
canonical_req = sigv4_req._canonical_request()
self.assertIn(b'PUT\n/test/~/file%2C1_1%3A1-1\n', canonical_req)
self.assertTrue(canonical_req.endswith(b'UNSIGNED-PAYLOAD'))
self.assertTrue(sigv4_req.check_signature('secret'))
self.assertTrue(sigv4_req.sig_checker.check_signature('secret'))
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_check_sigv4_req_zero_content_length_sha256(self):
@@ -979,7 +979,7 @@ class TestRequest(S3ApiTestCase):
sigv4_req = SigV4Request(req.environ)
self.assertTrue(
sigv4_req._canonical_request().endswith(sha256_of_nothing))
self.assertTrue(sigv4_req.check_signature('secret'))
self.assertTrue(sigv4_req.sig_checker.check_signature('secret'))
# uppercase sha256 -- signature changes, but content's valid
headers = {
@@ -998,7 +998,7 @@ class TestRequest(S3ApiTestCase):
sigv4_req = SigV4Request(req.environ)
self.assertTrue(
sigv4_req._canonical_request().endswith(sha256_of_nothing.upper()))
self.assertTrue(sigv4_req.check_signature('secret'))
self.assertTrue(sigv4_req.sig_checker.check_signature('secret'))
@patch.object(S3Request, '_validate_dates', lambda *a: None)
def test_v4_req_xmz_content_sha256_mismatch(self):