281 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			281 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright 2012 OpenStack Foundation
 | |
| # Copyright 2010 United States Government as represented by the
 | |
| # Administrator of the National Aeronautics and Space Administration.
 | |
| # Copyright 2011 - 2012 Justin Santa Barbara
 | |
| # All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import base64
 | |
| import hashlib
 | |
| import hmac
 | |
| import re
 | |
| 
 | |
| import six
 | |
| from six.moves import urllib
 | |
| 
 | |
| from keystoneclient.i18n import _
 | |
| 
 | |
| 
 | |
| class Ec2Signer(object):
 | |
|     """Utility class which allows a request to be signed with an AWS style
 | |
|     signature, which can then be used for authentication via the keystone ec2
 | |
|     authentication extension.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, secret_key):
 | |
|         self.secret_key = secret_key.encode()
 | |
|         self.hmac = hmac.new(self.secret_key, digestmod=hashlib.sha1)
 | |
|         if hashlib.sha256:
 | |
|             self.hmac_256 = hmac.new(self.secret_key, digestmod=hashlib.sha256)
 | |
| 
 | |
|     def _v4_creds(self, credentials):
 | |
|         """Detect if the credentials are for a v4 signed request, since AWS
 | |
|         removed the SignatureVersion field from the v4 request spec...
 | |
| 
 | |
|         This expects a dict of the request headers to be passed in the
 | |
|         credentials dict, since the recommended way to pass v4 creds is
 | |
|         via the 'Authorization' header
 | |
|         see http://docs.aws.amazon.com/general/latest/gr/
 | |
|             sigv4-signed-request-examples.html
 | |
| 
 | |
|         Alternatively X-Amz-Algorithm can be specified as a query parameter,
 | |
|         and the authentication data can also passed as query parameters.
 | |
| 
 | |
|         Note a hash of the request body is also required in the credentials
 | |
|         for v4 auth to work in the body_hash key, calculated via:
 | |
|         hashlib.sha256(req.body).hexdigest()
 | |
|         """
 | |
|         try:
 | |
|             auth_str = credentials['headers']['Authorization']
 | |
|             if auth_str.startswith('AWS4-HMAC-SHA256'):
 | |
|                 return True
 | |
|         except KeyError:
 | |
|             # Alternatively the Authorization data can be passed via
 | |
|             # the query params list, check X-Amz-Algorithm=AWS4-HMAC-SHA256
 | |
|             try:
 | |
|                 if (credentials['params']['X-Amz-Algorithm'] ==
 | |
|                         'AWS4-HMAC-SHA256'):
 | |
|                     return True
 | |
|             except KeyError:
 | |
|                 pass
 | |
| 
 | |
|         return False
 | |
| 
 | |
|     def generate(self, credentials):
 | |
|         """Generate auth string according to what SignatureVersion is given."""
 | |
|         signature_version = credentials['params'].get('SignatureVersion')
 | |
|         if signature_version == '0':
 | |
|             return self._calc_signature_0(credentials['params'])
 | |
|         if signature_version == '1':
 | |
|             return self._calc_signature_1(credentials['params'])
 | |
|         if signature_version == '2':
 | |
|             return self._calc_signature_2(credentials['params'],
 | |
|                                           credentials['verb'],
 | |
|                                           credentials['host'],
 | |
|                                           credentials['path'])
 | |
|         if self._v4_creds(credentials):
 | |
|             return self._calc_signature_4(credentials['params'],
 | |
|                                           credentials['verb'],
 | |
|                                           credentials['host'],
 | |
|                                           credentials['path'],
 | |
|                                           credentials['headers'],
 | |
|                                           credentials['body_hash'])
 | |
| 
 | |
|         if signature_version is not None:
 | |
|             raise Exception(_('Unknown signature version: %s') %
 | |
|                             signature_version)
 | |
|         else:
 | |
|             raise Exception(_('Unexpected signature format'))
 | |
| 
 | |
|     @staticmethod
 | |
|     def _get_utf8_value(value):
 | |
|         """Get the UTF8-encoded version of a value."""
 | |
|         if not isinstance(value, (six.binary_type, six.text_type)):
 | |
|             value = str(value)
 | |
|         if isinstance(value, six.text_type):
 | |
|             return value.encode('utf-8')
 | |
|         else:
 | |
|             return value
 | |
| 
 | |
|     def _calc_signature_0(self, params):
 | |
|         """Generate AWS signature version 0 string."""
 | |
|         s = (params['Action'] + params['Timestamp']).encode('utf-8')
 | |
|         self.hmac.update(s)
 | |
|         return base64.b64encode(self.hmac.digest()).decode('utf-8')
 | |
| 
 | |
|     def _calc_signature_1(self, params):
 | |
|         """Generate AWS signature version 1 string."""
 | |
|         keys = list(params)
 | |
|         keys.sort(key=six.text_type.lower)
 | |
|         for key in keys:
 | |
|             self.hmac.update(key.encode('utf-8'))
 | |
|             val = self._get_utf8_value(params[key])
 | |
|             self.hmac.update(val)
 | |
|         return base64.b64encode(self.hmac.digest()).decode('utf-8')
 | |
| 
 | |
|     @staticmethod
 | |
|     def _canonical_qs(params):
 | |
|         """Construct a sorted, correctly encoded query string as required for
 | |
|         _calc_signature_2 and _calc_signature_4.
 | |
|         """
 | |
|         keys = list(params)
 | |
|         keys.sort()
 | |
|         pairs = []
 | |
|         for key in keys:
 | |
|             val = Ec2Signer._get_utf8_value(params[key])
 | |
|             val = urllib.parse.quote(val, safe='-_~')
 | |
|             pairs.append(urllib.parse.quote(key, safe='') + '=' + val)
 | |
|         qs = '&'.join(pairs)
 | |
|         return qs
 | |
| 
 | |
|     def _calc_signature_2(self, params, verb, server_string, path):
 | |
|         """Generate AWS signature version 2 string."""
 | |
|         string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
 | |
|         if self.hmac_256:
 | |
|             current_hmac = self.hmac_256
 | |
|             params['SignatureMethod'] = 'HmacSHA256'
 | |
|         else:
 | |
|             current_hmac = self.hmac
 | |
|             params['SignatureMethod'] = 'HmacSHA1'
 | |
|         string_to_sign += self._canonical_qs(params)
 | |
|         current_hmac.update(string_to_sign.encode('utf-8'))
 | |
|         b64 = base64.b64encode(current_hmac.digest()).decode('utf-8')
 | |
|         return b64
 | |
| 
 | |
|     def _calc_signature_4(self, params, verb, server_string, path, headers,
 | |
|                           body_hash):
 | |
|         """Generate AWS signature version 4 string."""
 | |
| 
 | |
|         def sign(key, msg):
 | |
|             return hmac.new(key, self._get_utf8_value(msg),
 | |
|                             hashlib.sha256).digest()
 | |
| 
 | |
|         def signature_key(datestamp, region_name, service_name):
 | |
|             """Signature key derivation, see
 | |
|             http://docs.aws.amazon.com/general/latest/gr/
 | |
|             signature-v4-examples.html#signature-v4-examples-python
 | |
|             """
 | |
|             k_date = sign(self._get_utf8_value(b"AWS4" + self.secret_key),
 | |
|                           datestamp)
 | |
|             k_region = sign(k_date, region_name)
 | |
|             k_service = sign(k_region, service_name)
 | |
|             k_signing = sign(k_service, "aws4_request")
 | |
|             return k_signing
 | |
| 
 | |
|         def auth_param(param_name):
 | |
|             """Get specified auth parameter.
 | |
| 
 | |
|             Provided via one of:
 | |
|             - the Authorization header
 | |
|             - the X-Amz-* query parameters
 | |
|             """
 | |
|             try:
 | |
|                 auth_str = headers['Authorization']
 | |
|                 param_str = auth_str.partition(
 | |
|                     '%s=' % param_name)[2].split(',')[0]
 | |
|             except KeyError:
 | |
|                 param_str = params.get('X-Amz-%s' % param_name)
 | |
|             return param_str
 | |
| 
 | |
|         def date_param():
 | |
|             """Get the X-Amz-Date' value, which can be either a header
 | |
|             or parameter.
 | |
| 
 | |
|             Note AWS supports parsing the Date header also, but this is not
 | |
|             currently supported here as it will require some format mangling
 | |
|             So the X-Amz-Date value must be YYYYMMDDTHHMMSSZ format, then it
 | |
|             can be used to match against the YYYYMMDD format provided in the
 | |
|             credential scope.
 | |
|             see:
 | |
|             http://docs.aws.amazon.com/general/latest/gr/
 | |
|             sigv4-date-handling.html
 | |
|             """
 | |
|             try:
 | |
|                 return headers['X-Amz-Date']
 | |
|             except KeyError:
 | |
|                 return params.get('X-Amz-Date')
 | |
| 
 | |
|         def canonical_header_str():
 | |
|             # Get the list of headers to include, from either
 | |
|             # - the Authorization header (SignedHeaders key)
 | |
|             # - the X-Amz-SignedHeaders query parameter
 | |
|             headers_lower = dict((k.lower().strip(), v.strip())
 | |
|                                  for (k, v) in six.iteritems(headers))
 | |
| 
 | |
|             # Boto versions < 2.9.3 strip the port component of the host:port
 | |
|             # header, so detect the user-agent via the header and strip the
 | |
|             # port if we detect an old boto version.  FIXME: remove when all
 | |
|             # distros package boto >= 2.9.3, this is a transitional workaround
 | |
|             user_agent = headers_lower.get('user-agent', '')
 | |
|             strip_port = re.match('Boto/2.[0-9].[0-2]', user_agent)
 | |
| 
 | |
|             header_list = []
 | |
|             sh_str = auth_param('SignedHeaders')
 | |
|             for h in sh_str.split(';'):
 | |
|                 if h not in headers_lower:
 | |
|                     continue
 | |
| 
 | |
|                 if h == 'host' and strip_port:
 | |
|                     header_list.append('%s:%s' %
 | |
|                                        (h, headers_lower[h].split(':')[0]))
 | |
|                     continue
 | |
| 
 | |
|                 header_list.append('%s:%s' % (h, headers_lower[h]))
 | |
|             return '\n'.join(header_list) + '\n'
 | |
| 
 | |
|         def canonical_query_str(verb, params):
 | |
|             # POST requests pass parameters in through the request body
 | |
|             canonical_qs = ''
 | |
|             if verb.upper() != 'POST':
 | |
|                 canonical_qs = self._canonical_qs(params)
 | |
|             return canonical_qs
 | |
| 
 | |
|         # Create canonical request:
 | |
|         # http://docs.aws.amazon.com/general/latest/gr/
 | |
|         # sigv4-create-canonical-request.html
 | |
|         # Get parameters and headers in expected string format
 | |
|         cr = "\n".join((verb.upper(), path,
 | |
|                         canonical_query_str(verb, params),
 | |
|                         canonical_header_str(),
 | |
|                         auth_param('SignedHeaders'),
 | |
|                         body_hash))
 | |
| 
 | |
|         # Check the date, reject any request where the X-Amz-Date doesn't
 | |
|         # match the credential scope
 | |
|         credential = auth_param('Credential')
 | |
|         credential_split = credential.split('/')
 | |
|         credential_scope = '/'.join(credential_split[1:])
 | |
|         credential_date = credential_split[1]
 | |
|         param_date = date_param()
 | |
|         if not param_date.startswith(credential_date):
 | |
|             raise Exception(_('Request date mismatch error'))
 | |
| 
 | |
|         # Create the string to sign
 | |
|         # http://docs.aws.amazon.com/general/latest/gr/
 | |
|         # sigv4-create-string-to-sign.html
 | |
|         cr = cr.encode('utf-8')
 | |
|         string_to_sign = '\n'.join(('AWS4-HMAC-SHA256',
 | |
|                                     param_date,
 | |
|                                     credential_scope,
 | |
|                                     hashlib.sha256(cr).hexdigest()))
 | |
| 
 | |
|         # Calculate the derived key, this requires a datestamp, region
 | |
|         # and service, which can be extracted from the credential scope
 | |
|         (req_region, req_service) = credential_split[2:4]
 | |
|         s_key = signature_key(credential_date, req_region, req_service)
 | |
|         # Finally calculate the signature!
 | |
|         signature = hmac.new(s_key, self._get_utf8_value(string_to_sign),
 | |
|                              hashlib.sha256).hexdigest()
 | |
|         return signature
 | 
