Ec2Signer: Initial support for v4 signature verification

Adds initial support for verifying AWS v4 signatures, tested with
the latest boto trunk (which now uses v4 signatures by default)

Change-Id: Id163363e259cf08aa251a7a00ff4293b742cbef6
blueprint: ec2signer-v4signatures
This commit is contained in:
Steven Hardy
2013-04-03 17:14:30 +01:00
parent b7adf5b96b
commit 5c37d85944
2 changed files with 249 additions and 19 deletions

View File

@@ -32,24 +32,69 @@ class Ec2Signer(object):
"""
def __init__(self, secret_key):
secret_key = secret_key.encode()
self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
self.secret_key = secret_key.encode()
self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
if hashlib.sha256:
self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
def _v4_creds(self, credentials):
"""
Detect if the credentials are for a v4 signed request, since AWS
removed the SignatureVersion field from the v4 request spec...
This expects a dict of the request headers to be passed in the
credentials dict, since the recommended way to pass v4 creds is
via the 'Authorization' header
see http://docs.aws.amazon.com/general/latest/gr/
sigv4-signed-request-examples.html
Alternatively X-Amz-Algorithm can be specified as a query parameter,
and the authentication data can also passed as query parameters.
Note a hash of the request body is also required in the credentials
for v4 auth to work in the body_hash key, calculated via:
hashlib.sha256(req.body).hexdigest()
"""
try:
auth_str = credentials['headers']['Authorization']
if auth_str.startswith('AWS4-HMAC-SHA256'):
return True
except KeyError:
# Alternatively the Authorization data can be passed via
# the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
try:
if (credentials['params']['X-Amz-Algorithm'] ==
'AWS4-HMAC-SHA256'):
return True
except KeyError:
pass
return False
def generate(self, credentials):
"""Generate auth string according to what SignatureVersion is given."""
if credentials['params']['SignatureVersion'] == '0':
signature_version = credentials['params'].get('SignatureVersion')
if signature_version == '0':
return self._calc_signature_0(credentials['params'])
if credentials['params']['SignatureVersion'] == '1':
if signature_version == '1':
return self._calc_signature_1(credentials['params'])
if credentials['params']['SignatureVersion'] == '2':
if signature_version == '2':
return self._calc_signature_2(credentials['params'],
credentials['verb'],
credentials['host'],
credentials['path'])
raise Exception('Unknown Signature Version: %s' %
credentials['params']['SignatureVersion'])
if self._v4_creds(credentials):
return self._calc_signature_4(credentials['params'],
credentials['verb'],
credentials['host'],
credentials['path'],
credentials['headers'],
credentials['body_hash'])
if signature_version is not None:
raise Exception('Unknown signature version: %s' %
signature_version)
else:
raise Exception('Unexpected signature format')
@staticmethod
def _get_utf8_value(value):
@@ -77,6 +122,22 @@ class Ec2Signer(object):
self.hmac.update(val)
return base64.b64encode(self.hmac.digest())
@staticmethod
def _canonical_qs(params):
"""
Construct a sorted, correctly encoded query string as required for
_calc_signature_2 and _calc_signature_4
"""
keys = params.keys()
keys.sort()
pairs = []
for key in keys:
val = Ec2Signer._get_utf8_value(params[key])
val = urllib.quote(val, safe='-_~')
pairs.append(urllib.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
return qs
def _calc_signature_2(self, params, verb, server_string, path):
"""Generate AWS signature version 2 string."""
string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
@@ -86,15 +147,116 @@ class Ec2Signer(object):
else:
current_hmac = self.hmac
params['SignatureMethod'] = 'HmacSHA1'
keys = params.keys()
keys.sort()
pairs = []
for key in keys:
val = self._get_utf8_value(params[key])
val = urllib.quote(val, safe='-_~')
pairs.append(urllib.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
string_to_sign += qs
string_to_sign += self._canonical_qs(params)
current_hmac.update(string_to_sign)
b64 = base64.b64encode(current_hmac.digest())
return b64
def _calc_signature_4(self, params, verb, server_string, path, headers,
body_hash):
"""Generate AWS signature version 4 string."""
def sign(key, msg):
return hmac.new(key, self._get_utf8_value(msg),
hashlib.sha256).digest()
def signature_key(datestamp, region_name, service_name):
"""
Signature key derivation, see
http://docs.aws.amazon.com/general/latest/gr/
signature-v4-examples.html#signature-v4-examples-python
"""
k_date = sign(self._get_utf8_value("AWS4" + self.secret_key),
datestamp)
k_region = sign(k_date, region_name)
k_service = sign(k_region, service_name)
k_signing = sign(k_service, "aws4_request")
return k_signing
def auth_param(param_name):
"""
Get specified auth parameter, provided via one of:
- the Authorization header
- the X-Amz-* query parameters
"""
try:
auth_str = headers['Authorization']
param_str = auth_str.partition(
'%s=' % param_name)[2].split(',')[0]
except KeyError:
param_str = params.get('X-Amz-%s' % param_name)
return param_str
def date_param():
"""
Get the X-Amz-Date' value, which can be either a header or paramter
Note AWS supports parsing the Date header also, but this is not
currently supported here as it will require some format mangling
So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
can be used to match against the YYYYMMDD format provided in the
credential scope.
see:
http://docs.aws.amazon.com/general/latest/gr/
sigv4-date-handling.html
"""
try:
return headers['X-Amz-Date']
except KeyError:
return params.get('X-Amz-Date')
def canonical_header_str():
# Get the list of headers to include, from either
# - the Authorization header (SignedHeaders key)
# - the X-Amz-SignedHeaders query parameter
headers_lower = dict((k.lower().strip(), v.strip())
for (k, v) in headers.iteritems())
header_list = []
sh_str = auth_param('SignedHeaders')
for h in sh_str.split(';'):
if h not in headers_lower:
continue
if h == 'host':
# Note we discard any port suffix
header_list.append('%s:%s' %
(h, headers_lower[h].split(':')[0]))
else:
header_list.append('%s:%s' % (h, headers_lower[h]))
return '\n'.join(header_list) + '\n'
# Create canonical request:
# http://docs.aws.amazon.com/general/latest/gr/
# sigv4-create-canonical-request.html
# Get parameters and headers in expected string format
cr = "\n".join((verb.upper(), path,
self._canonical_qs(params),
canonical_header_str(),
auth_param('SignedHeaders'),
body_hash))
# Check the date, reject any request where the X-Amz-Date doesn't
# match the credential scope
credential = auth_param('Credential')
credential_split = credential.split('/')
credential_scope = '/'.join(credential_split[1:])
credential_date = credential_split[1]
param_date = date_param()
if not param_date.startswith(credential_date):
raise Exception('Request date mismatch error')
# Create the string to sign
# http://docs.aws.amazon.com/general/latest/gr/
# sigv4-create-string-to-sign.html
string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
param_date,
credential_scope,
hashlib.sha256(cr).hexdigest()))
# Calculate the derived key, this requires a datestamp, region
# and service, which can be extracted from the credential scope
(req_region, req_service) = credential_split[2:4]
s_key = signature_key(credential_date, req_region, req_service)
# Finally calculate the signature!
signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
hashlib.sha256).hexdigest()
return signature

View File

@@ -27,6 +27,36 @@ class Ec2SignerTest(testtools.TestCase):
self.secret = '89cdf9e94e2643cab35b8b8ac5a51f83'
self.signer = Ec2Signer(self.secret)
def tearDown(self):
super(Ec2SignerTest, self).tearDown()
def test_v4_creds_header(self):
auth_str = 'AWS4-HMAC-SHA256 blah'
credentials = {'host': '127.0.0.1',
'verb': 'GET',
'path': '/v1/',
'params': {},
'headers': {'Authorization': auth_str}}
self.assertTrue(self.signer._v4_creds(credentials))
def test_v4_creds_param(self):
credentials = {'host': '127.0.0.1',
'verb': 'GET',
'path': '/v1/',
'params': {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256'},
'headers': {}}
self.assertTrue(self.signer._v4_creds(credentials))
def test_v4_creds_false(self):
credentials = {'host': '127.0.0.1',
'verb': 'GET',
'path': '/v1/',
'params': {'SignatureVersion': '0',
'AWSAccessKeyId': self.access,
'Timestamp': '2012-11-27T11:47:02Z',
'Action': 'Foo'}}
self.assertFalse(self.signer._v4_creds(credentials))
def test_generate_0(self):
"""Test generate function for v0 signature"""
credentials = {'host': '127.0.0.1',
@@ -40,8 +70,6 @@ class Ec2SignerTest(testtools.TestCase):
expected = 'SmXQEZAUdQw5glv5mX8mmixBtas='
self.assertEqual(signature, expected)
pass
def test_generate_1(self):
"""Test generate function for v1 signature"""
credentials = {'host': '127.0.0.1',
@@ -75,3 +103,43 @@ class Ec2SignerTest(testtools.TestCase):
signature = self.signer.generate(credentials)
expected = 'ZqCxMI4ZtTXWI175743mJ0hy/Gc='
self.assertEqual(signature, expected)
def test_generate_v4(self):
"""
Test v4 generator with data from AWS docs example, see:
http://docs.aws.amazon.com/general/latest/gr/
sigv4-create-canonical-request.html
and
http://docs.aws.amazon.com/general/latest/gr/
sigv4-signed-request-examples.html
"""
# Create a new signer object with the AWS example key
secret = 'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'
signer = Ec2Signer(secret)
body_hash = ('b6359072c78d70ebee1e81adcbab4f0'
'1bf2c23245fa365ef83fe8f1f955085e2')
auth_str = ('AWS4-HMAC-SHA256 '
'Credential=AKIAIOSFODNN7EXAMPLE/20110909/'
'us-east-1/iam/aws4_request,'
'SignedHeaders=content-type;host;x-amz-date,')
headers = {'Content-type':
'application/x-www-form-urlencoded; charset=utf-8',
'X-Amz-Date': '20110909T233600Z',
'Host': 'iam.amazonaws.com',
'Authorization': auth_str}
# Note the example in the AWS docs is inconsistent, previous
# examples specify no query string, but the final POST example
# does, apparently incorrectly since an empty parameter list
# aligns all steps and the final signature with the examples
params = {}
credentials = {'host': 'iam.amazonaws.com',
'verb': 'POST',
'path': '/',
'params': params,
'headers': headers,
'body_hash': body_hash}
signature = signer.generate(credentials)
expected = ('ced6826de92d2bdeed8f846f0bf508e8'
'559e98e4b0199114b84c54174deb456c')
self.assertEqual(signature, expected)