Browse Source

Add S3 signature v4 checking

Keystone can check signature v1 for s3,
but many new tools uses new v4 signature protocol.
This patchset adds checking of v4 signature.
Architecture of implementation is the same as
v1 implementated.

Change-Id: I14121b4df2cae1407102335671c3f6878d46fc35
Closes-Bug: #1473042
changes/81/215481/10
Andrey Pavlov 6 years ago
parent
commit
f11d396546
2 changed files with 101 additions and 8 deletions
  1. +51
    -6
      keystone/contrib/s3/core.py
  2. +50
    -2
      keystone/tests/unit/test_contrib_s3_core.py

+ 51
- 6
keystone/contrib/s3/core.py View File

@ -33,6 +33,7 @@ from keystone.common import utils
from keystone.common import wsgi
from keystone.contrib.ec2 import controllers
from keystone import exception
from keystone.i18n import _
EXTENSION_DATA = {
@ -67,16 +68,60 @@ class S3Extension(wsgi.V3ExtensionRouter):
class S3Controller(controllers.Ec2Controller):
def check_signature(self, creds_ref, credentials):
msg = base64.urlsafe_b64decode(str(credentials['token']))
key = str(creds_ref['secret']).encode('utf-8')
string_to_sign = base64.urlsafe_b64decode(str(credentials['token']))
if string_to_sign[0:4] != b'AWS4':
signature = self._calculate_signature_v1(string_to_sign,
creds_ref['secret'])
else:
signature = self._calculate_signature_v4(string_to_sign,
creds_ref['secret'])
if not utils.auth_str_equal(credentials['signature'], signature):
raise exception.Unauthorized(
message=_('Credential signature mismatch'))
def _calculate_signature_v1(self, string_to_sign, secret_key):
"""Calculates a v1 signature.
:param bytes string_to_sign: String that contains request params and
is used for calculate signature of request
:param text secret_key: Second auth key of EC2 account that is used to
sign requests
"""
key = str(secret_key).encode('utf-8')
if six.PY2:
b64_encode = base64.encodestring
else:
b64_encode = base64.encodebytes
signed = b64_encode(hmac.new(key, string_to_sign, hashlib.sha1)
.digest()).decode('utf-8').strip()
return signed
def _calculate_signature_v4(self, string_to_sign, secret_key):
"""Calculates a v4 signature.
:param bytes string_to_sign: String that contains request params and
is used for calculate signature of request
:param text secret_key: Second auth key of EC2 account that is used to
sign requests
"""
parts = string_to_sign.split(b'\n')
if len(parts) != 4 or parts[0] != b'AWS4-HMAC-SHA256':
raise exception.Unauthorized(
message=_('Invalid EC2 signature.'))
scope = parts[2].split(b'/')
if len(scope) != 4 or scope[2] != b's3' or scope[3] != b'aws4_request':
raise exception.Unauthorized(
message=_('Invalid EC2 signature.'))
def _sign(key, msg):
return hmac.new(key, msg, hashlib.sha256).digest()
signed = b64_encode(
hmac.new(key, msg, hashlib.sha1).digest()).decode('utf-8').strip()
signed = _sign(six.b('AWS4' + secret_key), scope[0])
signed = _sign(signed, scope[1])
signed = _sign(signed, scope[2])
signed = _sign(signed, b'aws4_request')
if not utils.auth_str_equal(credentials['signature'], signed):
raise exception.Unauthorized('Credential signature mismatch')
signature = hmac.new(signed, string_to_sign, hashlib.sha256)
return signature.hexdigest()

+ 50
- 2
keystone/tests/unit/test_contrib_s3_core.py View File

@ -27,7 +27,7 @@ class S3ContribCore(unit.TestCase):
self.controller = s3.S3Controller()
def test_good_signature(self):
def test_good_signature_v1(self):
creds_ref = {'secret':
'b121dd41cdcc42fe9f70e572e84295aa'}
credentials = {'token':
@ -40,7 +40,7 @@ class S3ContribCore(unit.TestCase):
self.assertIsNone(self.controller.check_signature(creds_ref,
credentials))
def test_bad_signature(self):
def test_bad_signature_v1(self):
creds_ref = {'secret':
'b121dd41cdcc42fe9f70e572e84295aa'}
credentials = {'token':
@ -53,3 +53,51 @@ class S3ContribCore(unit.TestCase):
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, credentials)
def test_good_signature_v4(self):
creds_ref = {'secret':
'e7a7a2240136494986991a6598d9fb9f'}
credentials = {'token':
'QVdTNC1ITUFDLVNIQTI1NgoyMDE1MDgyNFQxMTIwNDFaCjIw'
'MTUwODI0L1JlZ2lvbk9uZS9zMy9hd3M0X3JlcXVlc3QKZjIy'
'MTU1ODBlZWI5YTE2NzM1MWJkOTNlODZjM2I2ZjA0YTkyOGY1'
'YzU1MjBhMzkzNWE0NTM1NDBhMDk1NjRiNQ==',
'signature':
'730ba8f58df6ffeadd78f402e990b2910d60'
'bc5c2aec63619734f096a4dd77be'}
self.assertIsNone(self.controller.check_signature(creds_ref,
credentials))
def test_bad_signature_v4(self):
creds_ref = {'secret':
'e7a7a2240136494986991a6598d9fb9f'}
credentials = {'token':
'QVdTNC1ITUFDLVNIQTI1NgoyMDE1MDgyNFQxMTIwNDFaCjIw'
'MTUwODI0L1JlZ2lvbk9uZS9zMy9hd3M0X3JlcXVlc3QKZjIy'
'MTU1ODBlZWI5YTE2NzM1MWJkOTNlODZjM2I2ZjA0YTkyOGY1'
'YzU1MjBhMzkzNWE0NTM1NDBhMDk1NjRiNQ==',
'signature': uuid.uuid4().hex}
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, credentials)
def test_bad_token_v4(self):
creds_ref = {'secret':
'e7a7a2240136494986991a6598d9fb9f'}
# token has invalid format of first part
credentials = {'token':
'QVdTNC1BQUEKWApYClg=',
'signature': ''}
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, credentials)
# token has invalid format of scope
credentials = {'token':
'QVdTNC1ITUFDLVNIQTI1NgpYCi8vczMvYXdzTl9yZXF1ZXN0Clg=',
'signature': ''}
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, credentials)

Loading…
Cancel
Save