Enable RightAWS style signature checking using server_string without port number, add test cases for authenticate() and a new helper routine, and fix lp753660.
This commit is contained in:
@@ -305,9 +305,9 @@ class AuthManager(object):
|
|||||||
if check_type == 's3':
|
if check_type == 's3':
|
||||||
sign = signer.Signer(user.secret.encode())
|
sign = signer.Signer(user.secret.encode())
|
||||||
expected_signature = sign.s3_authorization(headers, verb, path)
|
expected_signature = sign.s3_authorization(headers, verb, path)
|
||||||
LOG.debug('user.secret: %s', user.secret)
|
LOG.debug(_('user.secret: %s'), user.secret)
|
||||||
LOG.debug('expected_signature: %s', expected_signature)
|
LOG.debug(_('expected_signature: %s'), expected_signature)
|
||||||
LOG.debug('signature: %s', signature)
|
LOG.debug(_('signature: %s'), signature)
|
||||||
if signature != expected_signature:
|
if signature != expected_signature:
|
||||||
LOG.audit(_("Invalid signature for user %s"), user.name)
|
LOG.audit(_("Invalid signature for user %s"), user.name)
|
||||||
raise exception.InvalidSignature(signature=signature,
|
raise exception.InvalidSignature(signature=signature,
|
||||||
@@ -317,10 +317,20 @@ class AuthManager(object):
|
|||||||
# secret isn't unicode
|
# secret isn't unicode
|
||||||
expected_signature = signer.Signer(user.secret.encode()).generate(
|
expected_signature = signer.Signer(user.secret.encode()).generate(
|
||||||
params, verb, server_string, path)
|
params, verb, server_string, path)
|
||||||
LOG.debug('user.secret: %s', user.secret)
|
LOG.debug(_('user.secret: %s'), user.secret)
|
||||||
LOG.debug('expected_signature: %s', expected_signature)
|
LOG.debug(_('expected_signature: %s'), expected_signature)
|
||||||
LOG.debug('signature: %s', signature)
|
LOG.debug(_('signature: %s'), signature)
|
||||||
if signature != expected_signature:
|
if signature != expected_signature:
|
||||||
|
(addr_str, port_str) = utils.parse_server_string(server_string)
|
||||||
|
# If the given server_string contains port num, try without it.
|
||||||
|
if port_str != '':
|
||||||
|
host_only_signature = signer.Signer(
|
||||||
|
user.secret.encode()).generate(params, verb,
|
||||||
|
addr_str, path)
|
||||||
|
LOG.debug(_('host_only_signature: %s'),
|
||||||
|
host_only_signature)
|
||||||
|
if signature == host_only_signature:
|
||||||
|
return (user, project)
|
||||||
LOG.audit(_("Invalid signature for user %s"), user.name)
|
LOG.audit(_("Invalid signature for user %s"), user.name)
|
||||||
raise exception.InvalidSignature(signature=signature,
|
raise exception.InvalidSignature(signature=signature,
|
||||||
user=user)
|
user=user)
|
||||||
|
|||||||
@@ -101,9 +101,43 @@ class _AuthManagerBaseTestCase(test.TestCase):
|
|||||||
self.assertEqual('private-party', u.access)
|
self.assertEqual('private-party', u.access)
|
||||||
|
|
||||||
def test_004_signature_is_valid(self):
|
def test_004_signature_is_valid(self):
|
||||||
#self.assertTrue(self.manager.authenticate(**boto.generate_url ...? ))
|
with user_generator(self.manager, name='admin', secret='admin',
|
||||||
pass
|
access='admin'):
|
||||||
#raise NotImplementedError
|
with project_generator(self.manager, name="admin",
|
||||||
|
manager_user='admin'):
|
||||||
|
accesskey = 'admin:admin'
|
||||||
|
expected_result = (self.manager.get_user('admin'),
|
||||||
|
self.manager.get_project('admin'))
|
||||||
|
# captured sig and query string using boto 1.9b/euca2ools 1.2
|
||||||
|
sig = 'd67Wzd9Bwz8xid9QU+lzWXcF2Y3tRicYABPJgrqfrwM='
|
||||||
|
auth_params = {'AWSAccessKeyId': 'admin:admin',
|
||||||
|
'Action': 'DescribeAvailabilityZones',
|
||||||
|
'SignatureMethod': 'HmacSHA256',
|
||||||
|
'SignatureVersion': '2',
|
||||||
|
'Timestamp': '2011-04-22T11:29:29',
|
||||||
|
'Version': '2009-11-30'}
|
||||||
|
self.assertTrue(expected_result, self.manager.authenticate(
|
||||||
|
accesskey,
|
||||||
|
sig,
|
||||||
|
auth_params,
|
||||||
|
'GET',
|
||||||
|
'127.0.0.1:8773',
|
||||||
|
'/services/Cloud/'))
|
||||||
|
# captured sig and query string using RightAWS 1.10.0
|
||||||
|
sig = 'ECYLU6xdFG0ZqRVhQybPJQNJ5W4B9n8fGs6+/fuGD2c='
|
||||||
|
auth_params = {'AWSAccessKeyId': 'admin:admin',
|
||||||
|
'Action': 'DescribeAvailabilityZones',
|
||||||
|
'SignatureMethod': 'HmacSHA256',
|
||||||
|
'SignatureVersion': '2',
|
||||||
|
'Timestamp': '2011-04-22T11:29:49.000Z',
|
||||||
|
'Version': '2008-12-01'}
|
||||||
|
self.assertTrue(expected_result, self.manager.authenticate(
|
||||||
|
accesskey,
|
||||||
|
sig,
|
||||||
|
auth_params,
|
||||||
|
'GET',
|
||||||
|
'127.0.0.1',
|
||||||
|
'/services/Cloud'))
|
||||||
|
|
||||||
def test_005_can_get_credentials(self):
|
def test_005_can_get_credentials(self):
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user