Merge "Use swift3's check_signature function"

This commit is contained in:
Jenkins 2017-10-10 00:39:43 +00:00 committed by Gerrit Code Review
commit b06b88debe
2 changed files with 53 additions and 24 deletions

View File

@ -177,8 +177,6 @@ from __future__ import print_function
from time import time from time import time
from traceback import format_exc from traceback import format_exc
from uuid import uuid4 from uuid import uuid4
from hashlib import sha1
import hmac
import base64 import base64
from eventlet import Timeout from eventlet import Timeout
@ -437,20 +435,21 @@ class TempAuth(object):
s3_auth_details = env.get('swift3.auth_details') s3_auth_details = env.get('swift3.auth_details')
if s3_auth_details: if s3_auth_details:
if 'check_signature' not in s3_auth_details:
self.logger.warning(
'Swift3 did not provide a check_signature function; '
'upgrade Swift3 if you want to use it with tempauth')
return None
account_user = s3_auth_details['access_key'] account_user = s3_auth_details['access_key']
signature_from_user = s3_auth_details['signature']
if account_user not in self.users: if account_user not in self.users:
return None return None
account, user = account_user.split(':', 1) user = self.users[account_user]
account_id = self.users[account_user]['url'].rsplit('/', 1)[-1] account = account_user.split(':', 1)[0]
path = env['PATH_INFO'] account_id = user['url'].rsplit('/', 1)[-1]
env['PATH_INFO'] = path.replace(account_user, account_id, 1) if not s3_auth_details['check_signature'](user['key']):
valid_signature = base64.encodestring(hmac.new(
self.users[account_user]['key'],
s3_auth_details['string_to_sign'],
sha1).digest()).strip()
if signature_from_user != valid_signature:
return None return None
env['PATH_INFO'] = env['PATH_INFO'].replace(
account_user, account_id, 1)
groups = self._get_user_groups(account, account_user, account_id) groups = self._get_user_groups(account, account_user, account_id)
return groups return groups

View File

@ -19,7 +19,6 @@ import unittest
from contextlib import contextmanager from contextlib import contextmanager
from base64 import b64encode from base64 import b64encode
from time import time from time import time
import mock
from swift.common.middleware import tempauth as auth from swift.common.middleware import tempauth as auth
from swift.common.middleware.acl import format_acl from swift.common.middleware.acl import format_acl
@ -265,27 +264,58 @@ class TestAuth(unittest.TestCase):
self.assertEqual(req.environ['swift.authorize'], self.assertEqual(req.environ['swift.authorize'],
local_auth.denied_response) local_auth.denied_response)
def test_auth_with_s3_authorization(self): def test_auth_with_s3_authorization_good(self):
local_app = FakeApp() local_app = FakeApp()
local_auth = auth.filter_factory( local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app) {'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/AUTH_s3', environ={ req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': {
'access_key': 's3:s3',
'signature': b64encode('sig'),
'string_to_sign': 't',
'check_signature': lambda secret: True}})
resp = req.get_response(local_auth)
self.assertEqual(resp.status_int, 404)
self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/AUTH_s3')
self.assertEqual(req.environ['swift.authorize'],
local_auth.authorize)
def test_auth_with_s3_authorization_invalid(self):
local_app = FakeApp()
local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': {
'access_key': 's3:s3',
'signature': b64encode('sig'),
'string_to_sign': 't',
'check_signature': lambda secret: False}})
resp = req.get_response(local_auth)
self.assertEqual(resp.status_int, 401)
self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/s3:s3')
self.assertEqual(req.environ['swift.authorize'],
local_auth.denied_response)
def test_auth_with_old_s3_details(self):
local_app = FakeApp()
local_auth = auth.filter_factory(
{'user_s3_s3': 'secret .admin'})(local_app)
req = self._make_request('/v1/s3:s3', environ={
'swift3.auth_details': { 'swift3.auth_details': {
'access_key': 's3:s3', 'access_key': 's3:s3',
'signature': b64encode('sig'), 'signature': b64encode('sig'),
'string_to_sign': 't'}}) 'string_to_sign': 't'}})
resp = req.get_response(local_auth)
with mock.patch('hmac.new') as hmac: self.assertEqual(resp.status_int, 401)
hmac.return_value.digest.return_value = 'sig'
resp = req.get_response(local_auth)
self.assertEqual(hmac.mock_calls, [
mock.call('secret', 't', mock.ANY),
mock.call().digest()])
self.assertEqual(resp.status_int, 404)
self.assertEqual(local_app.calls, 1) self.assertEqual(local_app.calls, 1)
self.assertEqual(req.environ['PATH_INFO'], '/v1/s3:s3')
self.assertEqual(req.environ['swift.authorize'], self.assertEqual(req.environ['swift.authorize'],
local_auth.authorize) local_auth.denied_response)
def test_auth_no_reseller_prefix_no_token(self): def test_auth_no_reseller_prefix_no_token(self):
# Check that normally we set up a call back to our authorize. # Check that normally we set up a call back to our authorize.