Merge "prefactor: Pass s3api Config to S3Requests"

This commit is contained in:
Zuul 2021-01-15 02:39:13 +00:00 committed by Gerrit Code Review
commit 1971cc30c1
7 changed files with 338 additions and 78 deletions

View File

@ -167,7 +167,7 @@ class BaseAclHandler(object):
try:
elem = fromstring(body, ACL.root_tag)
acl = ACL.from_elem(
elem, True, self.req.allow_no_owner)
elem, True, self.req.conf.allow_no_owner)
except(XMLSyntaxError, DocumentInvalid):
raise MalformedACLError()
except Exception as e:

View File

@ -280,15 +280,12 @@ class S3ApiMiddleware(object):
conf, log_route=conf.get('log_name', 's3api'))
self.slo_enabled = self.conf.allow_multipart_uploads
self.check_pipeline(self.conf)
self.conf.slo_enabled = self.slo_enabled
def __call__(self, env, start_response):
try:
req_class = get_request_class(env, self.conf.s3_acl)
req = req_class(
env, self.app, self.slo_enabled, self.conf.storage_domain,
self.conf.location, self.conf.force_swift_request_proxy_log,
self.conf.dns_compliant_bucket_names,
self.conf.allow_multipart_uploads, self.conf.allow_no_owner)
req = req_class(env, self.conf, self.app)
resp = self.handle_request(req)
except NotS3Request:
resp = self.app

View File

@ -525,18 +525,11 @@ class S3Request(swob.Request):
bucket_acl = _header_acl_property('container')
object_acl = _header_acl_property('object')
def __init__(self, env, app=None, slo_enabled=True, storage_domain='',
location='us-east-1', force_request_log=False,
dns_compliant_bucket_names=True, allow_multipart_uploads=True,
allow_no_owner=False):
# NOTE: app and allow_no_owner are not used by this class, need for
# compatibility of S3acl
def __init__(self, env, conf, app=None):
# NOTE: app is not used by this class, need for compatibility of S3acl
swob.Request.__init__(self, env)
self.storage_domain = storage_domain
self.location = location
self.force_request_log = force_request_log
self.dns_compliant_bucket_names = dns_compliant_bucket_names
self.allow_multipart_uploads = allow_multipart_uploads
self.conf = conf
self.location = self.conf.location
self._timestamp = None
self.access_key, self.signature = self._parse_auth_info()
self.bucket_in_host = self._parse_host()
@ -552,7 +545,6 @@ class S3Request(swob.Request):
}
self.account = None
self.user_id = None
self.slo_enabled = slo_enabled
# Avoids that swift.swob.Response replaces Location header value
# by full URL when absolute path given. See swift.swob for more detail.
@ -610,7 +602,7 @@ class S3Request(swob.Request):
return 'AWSAccessKeyId' in self.params
def _parse_host(self):
storage_domain = self.storage_domain
storage_domain = self.conf.storage_domain
if not storage_domain:
return None
@ -643,7 +635,7 @@ class S3Request(swob.Request):
bucket, obj = self.split_path(0, 2, True)
if bucket and not validate_bucket_name(
bucket, self.dns_compliant_bucket_names):
bucket, self.conf.dns_compliant_bucket_names):
# Ignore GET service case
raise InvalidBucketName(bucket)
return (bucket, obj)
@ -1026,7 +1018,7 @@ class S3Request(swob.Request):
if self.is_service_request:
return ServiceController
if not self.slo_enabled:
if not self.conf.slo_enabled:
multi_part = ['partNumber', 'uploadId', 'uploads']
if len([p for p in multi_part if p in self.params]):
raise S3NotImplemented("Multi-part feature isn't support")
@ -1154,7 +1146,7 @@ class S3Request(swob.Request):
if key.startswith('HTTP_X_OBJECT_META_'):
del env[key]
if self.force_request_log:
if self.conf.force_swift_request_proxy_log:
env['swift.proxy_access_log_made'] = False
env['swift.source'] = 'S3'
if method is not None:
@ -1487,7 +1479,7 @@ class S3Request(swob.Request):
'unexpected status code %d' % info['status'])
def gen_multipart_manifest_delete_query(self, app, obj=None, version=None):
if not self.allow_multipart_uploads:
if not self.conf.allow_multipart_uploads:
return {}
if not obj:
obj = self.object_name
@ -1513,14 +1505,8 @@ class S3AclRequest(S3Request):
"""
S3Acl request object.
"""
def __init__(self, env, app, slo_enabled=True, storage_domain='',
location='us-east-1', force_request_log=False,
dns_compliant_bucket_names=True, allow_multipart_uploads=True,
allow_no_owner=False):
super(S3AclRequest, self).__init__(
env, app, slo_enabled, storage_domain, location, force_request_log,
dns_compliant_bucket_names, allow_multipart_uploads)
self.allow_no_owner = allow_no_owner
def __init__(self, env, conf, app):
super(S3AclRequest, self).__init__(env, conf, app)
self.authenticate(app)
self.acl_handler = None
@ -1587,9 +1573,9 @@ class S3AclRequest(S3Request):
resp = self._get_response(
app, method, container, obj, headers, body, query)
resp.bucket_acl = decode_acl(
'container', resp.sysmeta_headers, self.allow_no_owner)
'container', resp.sysmeta_headers, self.conf.allow_no_owner)
resp.object_acl = decode_acl(
'object', resp.sysmeta_headers, self.allow_no_owner)
'object', resp.sysmeta_headers, self.conf.allow_no_owner)
return resp

View File

@ -152,7 +152,18 @@ def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'):
class Config(dict):
DEFAULTS = {
'slo_enabled': True,
'storage_domain': '',
'location': 'us-east-1',
'force_swift_request_proxy_log': False,
'dns_compliant_bucket_names': True,
'allow_multipart_uploads': True,
'allow_no_owner': False,
}
def __init__(self, base=None):
self.update(self.DEFAULTS)
if base is not None:
self.update(base)

View File

@ -25,6 +25,7 @@ import six
from six.moves.urllib.parse import unquote, quote
import swift.common.middleware.s3api
from swift.common.middleware.s3api.utils import Config
from swift.common.middleware.keystoneauth import KeystoneAuth
from swift.common import swob, utils
from swift.common.swob import Request
@ -172,7 +173,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
'S3Request._validate_headers'), \
patch('swift.common.middleware.s3api.s3request.'
'S3Request._validate_dates'):
req = S3Request(env)
req = S3Request(env, Config())
return req.environ['s3api.auth_details']['string_to_sign']
def verify(hash, path, headers):
@ -986,7 +987,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
'S3Request._validate_headers'), \
patch('swift.common.middleware.s3api.utils.time.time',
return_value=fake_time):
req = SigV4Request(env, location=self.conf.location)
req = SigV4Request(env, self.conf, app=None)
return req
def canonical_string(path, environ):

View File

@ -21,8 +21,9 @@ import unittest
from io import BytesIO
from swift.common import swob
from swift.common.middleware.s3api import s3response, controllers
from swift.common.swob import Request, HTTPNoContent
from swift.common.middleware.s3api.utils import mktime
from swift.common.middleware.s3api.utils import mktime, Config
from swift.common.middleware.s3api.acl_handlers import get_acl_handler
from swift.common.middleware.s3api.subresource import ACL, User, Owner, \
Grant, encode_acl
@ -107,20 +108,7 @@ class TestRequest(S3ApiTestCase):
environ={'REQUEST_METHOD': method},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
if issubclass(req_klass, S3AclRequest):
s3_req = req_klass(
req.environ, MagicMock(),
True, self.conf.storage_domain,
self.conf.location, self.conf.force_swift_request_proxy_log,
self.conf.dns_compliant_bucket_names,
self.conf.allow_multipart_uploads, self.conf.allow_no_owner)
else:
s3_req = req_klass(
req.environ, MagicMock(),
True, self.conf.storage_domain,
self.conf.location, self.conf.force_swift_request_proxy_log,
self.conf.dns_compliant_bucket_names,
self.conf.allow_multipart_uploads, self.conf.allow_no_owner)
s3_req = req_klass(req.environ, self.conf, app=None)
s3_req.set_acl_handler(
get_acl_handler(s3_req.controller_name)(s3_req, DebugLogger()))
with patch('swift.common.middleware.s3api.s3request.S3Request.'
@ -202,7 +190,7 @@ class TestRequest(S3ApiTestCase):
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
return S3Request(req.environ)
return S3Request(req.environ, Config(), app=None)
s3req = create_s3request_with_param('max-keys', '1')
@ -249,7 +237,7 @@ class TestRequest(S3ApiTestCase):
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(req.environ, MagicMock())
s3_req = S3AclRequest(req.environ, self.conf, None)
self.assertNotIn('s3api.auth_details', s3_req.environ)
def test_to_swift_req_Authorization_not_exist_in_swreq(self):
@ -267,7 +255,7 @@ class TestRequest(S3ApiTestCase):
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(req.environ, MagicMock())
s3_req = S3AclRequest(req.environ, Config(), app=None)
# Yes, we *want* to assert this
sw_req = s3_req.to_swift_req(method, container, obj)
# So since the result of S3AclRequest init tests and with this
@ -290,7 +278,8 @@ class TestRequest(S3ApiTestCase):
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=True)
req.environ, Config({'force_swift_request_proxy_log': True}),
app=None)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertFalse(sw_req.environ['swift.proxy_access_log_made'])
@ -304,7 +293,8 @@ class TestRequest(S3ApiTestCase):
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=False)
req.environ, Config({'force_swift_request_proxy_log': False}),
app=None)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertTrue(sw_req.environ['swift.proxy_access_log_made'])
@ -321,7 +311,7 @@ class TestRequest(S3ApiTestCase):
req = Request.blank('/bucket', environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
s3_req = S3Request(req.environ)
s3_req = S3Request(req.environ, Config(), app=None)
# first, call get_response('HEAD')
info = s3_req.get_container_info(self.app)
self.assertTrue('status' in info) # sanity
@ -423,7 +413,7 @@ class TestRequest(S3ApiTestCase):
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config(), app=None)
if 'X-Amz-Date' in date_header:
timestamp = mktime(
@ -501,7 +491,7 @@ class TestRequest(S3ApiTestCase):
headers = {'Authorization': 'AWS test:tester:hmac'}
headers.update(date_header)
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3Request(req.environ)
sigv2_req = S3Request(req.environ, Config(), app=None)
if 'X-Amz-Date' in date_header:
timestamp = mktime(req.headers.get('X-Amz-Date'))
@ -581,7 +571,7 @@ class TestRequest(S3ApiTestCase):
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config(), app=None)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
@ -600,7 +590,7 @@ class TestRequest(S3ApiTestCase):
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config(), app=None)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(headers_to_sign, [
@ -620,7 +610,7 @@ class TestRequest(S3ApiTestCase):
req = Request.blank('/', environ=environ, headers=headers)
with self.assertRaises(SignatureDoesNotMatch):
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config(), app=None)
sigv4_req._headers_to_sign()
def test_canonical_uri_sigv2(self):
@ -634,14 +624,14 @@ class TestRequest(S3ApiTestCase):
# Virtual hosted-style
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3Request(
req.environ, storage_domain='s3.test.com')
req.environ, Config({'storage_domain': 's3.test.com'}))
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/obj1', environ=environ, headers=headers)
sigv2_req = S3Request(
req.environ, storage_domain='s3.test.com')
req.environ, Config({'storage_domain': 's3.test.com'}))
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/obj1')
@ -652,7 +642,7 @@ class TestRequest(S3ApiTestCase):
# Path-style
req = Request.blank('/', environ=environ, headers=headers)
sigv2_req = S3Request(req.environ, storage_domain='')
sigv2_req = S3Request(req.environ, Config())
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/')
@ -661,7 +651,7 @@ class TestRequest(S3ApiTestCase):
req = Request.blank('/bucket1/obj1',
environ=environ,
headers=headers)
sigv2_req = S3Request(req.environ, storage_domain='')
sigv2_req = S3Request(req.environ, Config())
uri = sigv2_req._canonical_uri()
self.assertEqual(uri, '/bucket1/obj1')
self.assertEqual(req.environ['PATH_INFO'], '/bucket1/obj1')
@ -686,14 +676,14 @@ class TestRequest(S3ApiTestCase):
# Virtual hosted-style
self.conf.storage_domain = 's3.test.com'
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, b'/')
self.assertEqual(req.environ['PATH_INFO'], '/')
req = Request.blank('/obj1', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, b'/obj1')
@ -706,7 +696,7 @@ class TestRequest(S3ApiTestCase):
# Path-style
self.conf.storage_domain = ''
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, b'/')
@ -715,7 +705,7 @@ class TestRequest(S3ApiTestCase):
req = Request.blank('/bucket/obj1',
environ=environ,
headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
uri = sigv4_req._canonical_uri()
self.assertEqual(uri, b'/bucket/obj1')
@ -731,7 +721,8 @@ class TestRequest(S3ApiTestCase):
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'bWq2s1WEIj+Ydj0vQ697zp+IXMU='),
})
sigv2_req = S3Request(req.environ, storage_domain='s3.amazonaws.com')
sigv2_req = S3Request(req.environ, Config({
'storage_domain': 's3.amazonaws.com'}))
expected_sts = b'\n'.join([
b'GET',
b'',
@ -750,7 +741,8 @@ class TestRequest(S3ApiTestCase):
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'MyyxeRY7whkBe+bq8fHCL/2kKUg='),
})
sigv2_req = S3Request(req.environ, storage_domain='s3.amazonaws.com')
sigv2_req = S3Request(req.environ, Config({
'storage_domain': 's3.amazonaws.com'}))
expected_sts = b'\n'.join([
b'PUT',
b'',
@ -770,7 +762,8 @@ class TestRequest(S3ApiTestCase):
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'htDYFYduRNen8P9ZfE/s9SuKy0U='),
})
sigv2_req = S3Request(req.environ, storage_domain='s3.amazonaws.com')
sigv2_req = S3Request(req.environ, Config({
'storage_domain': 's3.amazonaws.com'}))
expected_sts = b'\n'.join([
b'GET',
b'',
@ -798,7 +791,8 @@ class TestRequest(S3ApiTestCase):
'Authorization': ('AWS AKIAIOSFODNN7EXAMPLE:'
'bWq2s1WEIj+Ydj0vQ697zp+IXMU='),
})
sigv2_req = S3Request(req.environ, storage_domain='s3.amazonaws.com')
sigv2_req = S3Request(req.environ, Config({
'storage_domain': 's3.amazonaws.com'}))
# This is a failure case with utf-8 non-ascii multi-bytes charactor
# but we expect to return just False instead of exceptions
self.assertFalse(sigv2_req.check_signature(
@ -816,7 +810,7 @@ class TestRequest(S3ApiTestCase):
'X-Amz-Date': amz_date_header
})
sigv4_req = SigV4Request(
req.environ, storage_domain='s3.amazonaws.com')
req.environ, Config({'storage_domain': 's3.amazonaws.com'}))
self.assertFalse(sigv4_req.check_signature(
u'\u30c9\u30e9\u30b4\u30f3'))
@ -839,7 +833,7 @@ class TestRequest(S3ApiTestCase):
# Virtual hosted-style
self.conf.storage_domain = 's3.test.com'
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
self.assertTrue(
sigv4_req._canonical_request().endswith(b'UNSIGNED-PAYLOAD'))
self.assertTrue(sigv4_req.check_signature('secret'))
@ -868,7 +862,7 @@ class TestRequest(S3ApiTestCase):
# lowercase sha256
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(BadDigest, SigV4Request, req.environ)
self.assertRaises(BadDigest, SigV4Request, req.environ, Config())
sha256_of_nothing = hashlib.sha256().hexdigest().encode('ascii')
headers = {
'Authorization':
@ -883,7 +877,7 @@ class TestRequest(S3ApiTestCase):
'Content-Length': 0,
}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
self.assertTrue(
sigv4_req._canonical_request().endswith(sha256_of_nothing))
self.assertTrue(sigv4_req.check_signature('secret'))
@ -902,12 +896,249 @@ class TestRequest(S3ApiTestCase):
'Content-Length': 0,
}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
sigv4_req = SigV4Request(req.environ, Config())
self.assertTrue(
sigv4_req._canonical_request().endswith(sha256_of_nothing.upper()))
self.assertTrue(sigv4_req.check_signature('secret'))
class TestSigV4Request(S3ApiTestCase):
def setUp(self):
super(TestSigV4Request, self).setUp()
self.s3api.conf.s3_acl = True
self.swift.s3_acl = True
def test_init_header_authorization(self):
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'GET'}
def do_check_ok(conf, auth):
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ, conf)
self.assertEqual('X', sigv4_req.signature)
self.assertEqual('test', sigv4_req.access_key)
return sigv4_req
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
# location lowercase matches
sigv4_req = do_check_ok(Config({'location': 'us-east-1'}), auth)
self.assertEqual('us-east-1', sigv4_req.location)
# location case mis-matches
sigv4_req = do_check_ok(Config({'location': 'US-East-1'}), auth)
self.assertEqual('us-east-1', sigv4_req.location)
# location uppercase matches
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/US-East-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
sigv4_req = do_check_ok(Config({'location': 'US-East-1'}), auth)
self.assertEqual('US-East-1', sigv4_req.location)
def do_check_bad(conf, auth, exc):
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
self.assertRaises(exc, SigV4Request, req.environ, conf)
# location case mismatch
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/US-East-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AuthorizationHeaderMalformed)
# bad location
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-west-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AuthorizationHeaderMalformed)
# bad service name
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/S3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AuthorizationHeaderMalformed)
# bad terminal name
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/s3/AWS4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AuthorizationHeaderMalformed)
# bad Signature
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AccessDenied)
# bad SignedHeaders
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/s3/aws4_request,'
'SignedHeaders=,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), auth,
s3response.AuthorizationHeaderMalformed)
def test_init_query_authorization(self):
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'GET'}
def do_check_ok(conf, params):
x_amz_date = self.get_v4_amz_date_header()
params['X-Amz-Date'] = x_amz_date
signed_headers = {
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=signed_headers,
params=params)
sigv4_req = SigV4Request(req.environ, conf)
self.assertEqual('X', sigv4_req.signature)
self.assertEqual('test', sigv4_req.access_key)
return sigv4_req
ok_params = {
'AWSAccessKeyId': 'test',
'X-Amz-Expires': '3600',
'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
'X-Amz-Credential': 'test/%s/us-east-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0],
'X-Amz-SignedHeaders': 'host;x-amz-content-sha256;x-amz-date',
'X-Amz-Signature': 'X'}
# location lowercase matches
sigv4_req = do_check_ok(Config({'location': 'us-east-1'}), ok_params)
self.assertEqual('us-east-1', sigv4_req.location)
# location case mis-matches
sigv4_req = do_check_ok(Config({'location': 'US-East-1'}), ok_params)
self.assertEqual('us-east-1', sigv4_req.location)
# location uppercase matches
ok_params['X-Amz-Credential'] = (
'test/%s/US-East-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0])
sigv4_req = do_check_ok(Config({'location': 'US-East-1'}), ok_params)
self.assertEqual('US-East-1', sigv4_req.location)
def do_check_bad(conf, params, exc):
x_amz_date = self.get_v4_amz_date_header()
params['X-Amz-Date'] = x_amz_date
signed_headers = {
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=signed_headers,
params=params)
self.assertRaises(exc, SigV4Request, req.environ, conf)
# location case mismatch
bad_params = dict(ok_params)
bad_params['X-Amz-Credential'] = (
'test/%s/US-East-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AuthorizationQueryParametersError)
# bad location
bad_params = dict(ok_params)
bad_params['X-Amz-Credential'] = (
'test/%s/us-west-1/s3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AuthorizationQueryParametersError)
# bad service name
bad_params = dict(ok_params)
bad_params['X-Amz-Credential'] = (
'test/%s/us-east-1/S3/aws4_request' %
self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AuthorizationQueryParametersError)
# bad terminal name
bad_params = dict(ok_params)
bad_params['X-Amz-Credential'] = (
'test/%s/us-east-1/s3/AWS4_request' %
self.get_v4_amz_date_header().split('T', 1)[0])
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AuthorizationQueryParametersError)
# bad Signature
bad_params = dict(ok_params)
bad_params['X-Amz-Signature'] = ''
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AccessDenied)
# bad SignedHeaders
bad_params = dict(ok_params)
bad_params['X-Amz-SignedHeaders'] = ''
do_check_bad(Config({'location': 'us-east-1'}), bad_params,
s3response.AuthorizationQueryParametersError)
def test_controller_slo_enabled(self):
environ = {
'HTTP_HOST': 'bucket.s3.test.com',
'REQUEST_METHOD': 'GET'}
x_amz_date = self.get_v4_amz_date_header()
auth = ('AWS4-HMAC-SHA256 '
'Credential=test/%s/us-east-1/s3/aws4_request,'
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X' % self.get_v4_amz_date_header().split('T', 1)[0])
headers = {
'Authorization': auth,
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
def make_s3req(config, path, params):
req = Request.blank(path, environ=environ, headers=headers,
params=params)
return SigV4Request(req.environ, config)
s3req = make_s3req(Config(), '/bkt', {'partNumber': '3'})
self.assertEqual(controllers.multi_upload.PartController,
s3req.controller)
s3req = make_s3req(Config(), '/bkt', {'uploadId': '4'})
self.assertEqual(controllers.multi_upload.UploadController,
s3req.controller)
s3req = make_s3req(Config(), '/bkt', {'uploads': '99'})
self.assertEqual(controllers.multi_upload.UploadsController,
s3req.controller)
# multi part requests require slo_enabled
def do_check_slo_not_enabled(params):
s3req = make_s3req(Config({'slo_enabled': False}), '/bkt', params)
self.assertRaises(s3response.S3NotImplemented,
lambda: s3req.controller)
do_check_slo_not_enabled({'partNumber': '3'})
do_check_slo_not_enabled({'uploadId': '4'})
do_check_slo_not_enabled({'uploads': '99'})
# service requests not dependent on slo_enabled
s3req = make_s3req(Config(), '/', {'partNumber': '3'})
self.assertEqual(controllers.ServiceController,
s3req.controller)
s3req = make_s3req(Config({'slo_enabled': False}), '/',
{'partNumber': '3'})
self.assertEqual(controllers.ServiceController,
s3req.controller)
class TestHashingInput(S3ApiTestCase):
def test_good(self):
raw = b'123456789'

View File

@ -130,5 +130,39 @@ class TestS3ApiUtils(unittest.TestCase):
time.tzset()
class TestConfig(unittest.TestCase):
def _assert_defaults(self, conf):
self.assertTrue(conf.slo_enabled)
self.assertEqual('', conf.storage_domain)
self.assertEqual('us-east-1', conf.location)
self.assertFalse(conf.force_swift_request_proxy_log)
self.assertTrue(conf.dns_compliant_bucket_names)
self.assertTrue(conf.allow_multipart_uploads)
self.assertFalse(conf.allow_no_owner)
def test_defaults(self):
conf = utils.Config()
self._assert_defaults(conf)
def test_update(self):
conf = utils.Config()
conf.update({'key1': 'val1', 'key2': 'val2'})
self._assert_defaults(conf)
self.assertEqual(conf.key1, 'val1')
self.assertEqual(conf.key2, 'val2')
conf.update({'slo_enabled': False})
self.assertFalse(conf.slo_enabled)
def test_set_get_delete(self):
conf = utils.Config()
self.assertRaises(AttributeError, lambda: conf.new_attr)
conf.new_attr = 123
self.assertEqual(123, conf.new_attr)
del conf.new_attr
self.assertRaises(AttributeError, lambda: conf.new_attr)
if __name__ == '__main__':
unittest.main()