Merge from trunk

This commit is contained in:
Cerberus
2011-05-10 15:42:08 -05:00
2 changed files with 53 additions and 9 deletions

View File

@@ -305,9 +305,9 @@ class AuthManager(object):
if check_type == 's3':
sign = signer.Signer(user.secret.encode())
expected_signature = sign.s3_authorization(headers, verb, path)
LOG.debug('user.secret: %s', user.secret)
LOG.debug('expected_signature: %s', expected_signature)
LOG.debug('signature: %s', signature)
LOG.debug(_('user.secret: %s'), user.secret)
LOG.debug(_('expected_signature: %s'), expected_signature)
LOG.debug(_('signature: %s'), signature)
if signature != expected_signature:
LOG.audit(_("Invalid signature for user %s"), user.name)
raise exception.InvalidSignature(signature=signature,
@@ -317,10 +317,20 @@ class AuthManager(object):
# secret isn't unicode
expected_signature = signer.Signer(user.secret.encode()).generate(
params, verb, server_string, path)
LOG.debug('user.secret: %s', user.secret)
LOG.debug('expected_signature: %s', expected_signature)
LOG.debug('signature: %s', signature)
LOG.debug(_('user.secret: %s'), user.secret)
LOG.debug(_('expected_signature: %s'), expected_signature)
LOG.debug(_('signature: %s'), signature)
if signature != expected_signature:
(addr_str, port_str) = utils.parse_server_string(server_string)
# If the given server_string contains port num, try without it.
if port_str != '':
host_only_signature = signer.Signer(
user.secret.encode()).generate(params, verb,
addr_str, path)
LOG.debug(_('host_only_signature: %s'),
host_only_signature)
if signature == host_only_signature:
return (user, project)
LOG.audit(_("Invalid signature for user %s"), user.name)
raise exception.InvalidSignature(signature=signature,
user=user)

View File

@@ -101,9 +101,43 @@ class _AuthManagerBaseTestCase(test.TestCase):
self.assertEqual('private-party', u.access)
def test_004_signature_is_valid(self):
#self.assertTrue(self.manager.authenticate(**boto.generate_url ...? ))
pass
#raise NotImplementedError
with user_generator(self.manager, name='admin', secret='admin',
access='admin'):
with project_generator(self.manager, name="admin",
manager_user='admin'):
accesskey = 'admin:admin'
expected_result = (self.manager.get_user('admin'),
self.manager.get_project('admin'))
# captured sig and query string using boto 1.9b/euca2ools 1.2
sig = 'd67Wzd9Bwz8xid9QU+lzWXcF2Y3tRicYABPJgrqfrwM='
auth_params = {'AWSAccessKeyId': 'admin:admin',
'Action': 'DescribeAvailabilityZones',
'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'Timestamp': '2011-04-22T11:29:29',
'Version': '2009-11-30'}
self.assertTrue(expected_result, self.manager.authenticate(
accesskey,
sig,
auth_params,
'GET',
'127.0.0.1:8773',
'/services/Cloud/'))
# captured sig and query string using RightAWS 1.10.0
sig = 'ECYLU6xdFG0ZqRVhQybPJQNJ5W4B9n8fGs6+/fuGD2c='
auth_params = {'AWSAccessKeyId': 'admin:admin',
'Action': 'DescribeAvailabilityZones',
'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'Timestamp': '2011-04-22T11:29:49.000Z',
'Version': '2008-12-01'}
self.assertTrue(expected_result, self.manager.authenticate(
accesskey,
sig,
auth_params,
'GET',
'127.0.0.1',
'/services/Cloud'))
def test_005_can_get_credentials(self):
return