Files
swift/test/unit/common/middleware/s3api/test_utils.py
Yan Xiao 9d7e7e27a5 Provide some s3 helper methods for other middlewares to use.
get_s3_access_key_id returns the S3 access_key_id user for the request
is_s3_req checks whether a request looks like it ought to be an S3 request
parse_path returns a wsgi string
extract_bucket_and_key extracts bucket and object from the request's PATH_INFO

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Shreeya Deshpande <shreeyad@nvidia.com>

Change-Id: Iaf86a07238cca6700dee736f55d4c0672cccf1b1
Signed-off-by: Shreeya Deshpande <shreeyad@nvidia.com>
2025-10-14 13:58:00 +00:00

385 lines
16 KiB
Python

# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import unittest
from swift.common.swob import Request
from swift.common.middleware.s3api import utils, s3request
from swift.common.middleware.s3api.exception import InvalidBucketNameParseError
strs = [
('Owner', 'owner'),
('DisplayName', 'display_name'),
('AccessControlPolicy', 'access_control_policy'),
]
class TestS3ApiUtils(unittest.TestCase):
def test_camel_to_snake(self):
for s1, s2 in strs:
self.assertEqual(utils.camel_to_snake(s1), s2)
def test_snake_to_camel(self):
for s1, s2 in strs:
self.assertEqual(s1, utils.snake_to_camel(s2))
def test_validate_bucket_name(self):
# good cases
self.assertTrue(utils.validate_bucket_name('bucket', True))
self.assertTrue(utils.validate_bucket_name('bucket1', True))
self.assertTrue(utils.validate_bucket_name('bucket-1', True))
self.assertTrue(utils.validate_bucket_name('b.u.c.k.e.t', True))
self.assertTrue(utils.validate_bucket_name('a' * 63, True))
self.assertTrue(utils.validate_bucket_name('v1.0', True))
# bad cases
self.assertFalse(utils.validate_bucket_name('a', True))
self.assertFalse(utils.validate_bucket_name('aa', True))
self.assertFalse(utils.validate_bucket_name('a+a', True))
self.assertFalse(utils.validate_bucket_name('a_a', True))
self.assertFalse(utils.validate_bucket_name('Bucket', True))
self.assertFalse(utils.validate_bucket_name('BUCKET', True))
self.assertFalse(utils.validate_bucket_name('bucket-', True))
self.assertFalse(utils.validate_bucket_name('bucket.', True))
self.assertFalse(utils.validate_bucket_name('bucket_', True))
self.assertFalse(utils.validate_bucket_name('bucket.-bucket', True))
self.assertFalse(utils.validate_bucket_name('bucket-.bucket', True))
self.assertFalse(utils.validate_bucket_name('bucket..bucket', True))
self.assertFalse(utils.validate_bucket_name('a' * 64, True))
self.assertFalse(utils.validate_bucket_name('v1', False))
def test_validate_bucket_name_with_dns_compliant_bucket_names_false(self):
# good cases
self.assertTrue(utils.validate_bucket_name('bucket', False))
self.assertTrue(utils.validate_bucket_name('bucket1', False))
self.assertTrue(utils.validate_bucket_name('bucket-1', False))
self.assertTrue(utils.validate_bucket_name('b.u.c.k.e.t', False))
self.assertTrue(utils.validate_bucket_name('a' * 63, False))
self.assertTrue(utils.validate_bucket_name('a' * 255, False))
self.assertTrue(utils.validate_bucket_name('a_a', False))
self.assertTrue(utils.validate_bucket_name('Bucket', False))
self.assertTrue(utils.validate_bucket_name('BUCKET', False))
self.assertTrue(utils.validate_bucket_name('bucket-', False))
self.assertTrue(utils.validate_bucket_name('bucket_', False))
self.assertTrue(utils.validate_bucket_name('bucket.-bucket', False))
self.assertTrue(utils.validate_bucket_name('bucket-.bucket', False))
self.assertTrue(utils.validate_bucket_name('bucket..bucket', False))
# bad cases
self.assertFalse(utils.validate_bucket_name('a', False))
self.assertFalse(utils.validate_bucket_name('aa', False))
self.assertFalse(utils.validate_bucket_name('a+a', False))
# ending with dot seems invalid in US standard, too
self.assertFalse(utils.validate_bucket_name('bucket.', False))
self.assertFalse(utils.validate_bucket_name('a' * 256, False))
def test_extract_bucket_and_key(self):
req = Request.blank(
'/bucket/object',
environ={
'REQUEST_METHOD': 'GET',
},
headers={
'Authorization': 'AWS test:tester:hmac',
},
)
cont, obj = utils.extract_bucket_and_key(req, [], False)
self.assertEqual(cont, 'bucket')
self.assertEqual(obj, 'object')
def test_extract_bucket_and_key_invalid_character(self):
req = Request.blank(
'/bucket/\x00object',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'},
)
self.assertEqual((None, None),
utils.extract_bucket_and_key(req, [], False))
def test_extract_bucket_and_key_invalid_bucket(self):
req = Request.blank(
'/b/object',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'},
)
self.assertEqual((None, None),
utils.extract_bucket_and_key(req, [], False))
def test_extract_bucket_and_key_invalid_dns_compliant(self):
req = Request.blank(
'/BUCKET/object',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'},
)
self.assertEqual(('BUCKET', 'object'),
utils.extract_bucket_and_key(req, [], False))
self.assertEqual((None, None),
utils.extract_bucket_and_key(req, [], True))
def test_extract_bucket_and_key_bucket_in_host(self):
req = Request.blank(
'/object/xyz',
environ={'REQUEST_METHOD': 'GET',
'HTTP_HOST': 'bucket.localhost'},
headers={'Authorization': 'AWS test:atester:hmac'},
)
self.assertEqual(
('bucket', 'object/xyz'),
utils.extract_bucket_and_key(req, ['localhost'], False))
def test_parse_host(self):
req = Request.blank(
'/bucket/object',
environ={
'REQUEST_METHOD': 'GET',
'SERVER_NAME': 'foo.boo'
},
)
del req.environ['HTTP_HOST']
self.assertEqual(utils.parse_host(req.environ, []), None)
self.assertEqual(utils.parse_host(req.environ, ['boo']), 'foo')
req = Request.blank(
'/bucket/object',
environ={
'REQUEST_METHOD': 'GET',
'HTTP_HOST': 'buckets.localhost',
'SERVER_NAME': 'foo.localhost',
},
)
self.assertEqual(utils.parse_host(req.environ, []), None)
self.assertEqual(utils.parse_host(
req.environ, ['notlocalhost']), None)
self.assertEqual(utils.parse_host(
req.environ, ['localhost']), 'buckets')
self.assertEqual(utils.parse_host(
req.environ, ['.localhost']), 'buckets')
self.assertEqual(utils.parse_host(
req.environ, ['notlocalhost', '.localhost']), 'buckets')
def test_parse_path(self):
req = Request.blank(
'/bucket/object',
environ={'REQUEST_METHOD': 'GET'},
)
bucket, obj = utils.parse_path(req, None, False)
self.assertEqual(bucket, 'bucket')
self.assertEqual(obj, 'object')
bucket, obj = utils.parse_path(req, None, True)
self.assertEqual(bucket, 'bucket')
self.assertEqual(obj, 'object')
bucket, obj = utils.parse_path(req, 'boo', True)
self.assertEqual(bucket, 'boo')
self.assertEqual(obj, 'bucket/object')
def test_parse_path_dns_compliant_bucket_names(self):
req = Request.blank(
'/BUCKET/object',
environ={'REQUEST_METHOD': 'GET'},
)
with self.assertRaises(InvalidBucketNameParseError):
utils.parse_path(req, None, True)
# non-compliant is ok if it somehow came in the host??
bucket, obj = utils.parse_path(req, 'BUCKET', True)
self.assertEqual(bucket, 'BUCKET')
self.assertEqual(obj, 'BUCKET/object')
def test_get_s3_access_key_id_not_s3_req(self):
headers = {'Authorization': 'not AWS my_access_key_id:signature'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
self.assertIsNone(utils.get_s3_access_key_id(req))
def test_get_s3_access_key_id_v2_header(self):
headers = {'Authorization': 'AWS my_access_key_id:signature'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
self.assertEqual('my_access_key_id', utils.get_s3_access_key_id(req))
def test_get_s3_access_key_id_v2_param(self):
params = {'AWSAccessKeyId': 'my_access_key_id'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
params=params)
self.assertEqual('my_access_key_id', utils.get_s3_access_key_id(req))
def test_get_s3_access_key_id_v4_header(self):
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=my_access_key_id/20130524/us-east-1/s3/'
'aws4_request,'
'SignedHeaders=host;range;x-amz-date,'
'Signature=fe5f80f77d5fa3beca038a248ff027d0445342fe2855ddc963'
'176630326f1024'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
self.assertEqual('my_access_key_id', utils.get_s3_access_key_id(req))
def test_get_s3_access_key_id_v4_param(self):
params = {'X-Amz-Credential':
'my_access_key_id/20130721/us-east-1/s3/aws4_request'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
params=params)
self.assertEqual('my_access_key_id', utils.get_s3_access_key_id(req))
def test_is_s3_req(self):
headers = {'Authorization': 'not AWS my_access_key_id:signature'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
self.assertIs(False, utils.is_s3_req(req))
headers = {'Authorization': 'AWS my_access_key_id:signature'}
req = Request.blank('/v1/a/',
environ={'REQUEST_METHOD': 'GET'},
headers=headers)
self.assertIs(True, utils.is_s3_req(req))
def test_mktime(self):
date_headers = [
'Thu, 01 Jan 1970 00:00:00 -0000',
'Thu, 01 Jan 1970 00:00:00 GMT',
'Thu, 01 Jan 1970 00:00:00 UTC',
'Thu, 01 Jan 1970 08:00:00 +0800',
'Wed, 31 Dec 1969 16:00:00 -0800',
'Wed, 31 Dec 1969 16:00:00 PST',
]
for header in date_headers:
ts = utils.mktime(header)
self.assertEqual(0, ts, 'Got %r for header %s' % (ts, header))
# Last-Modified response style
self.assertEqual(0, utils.mktime('1970-01-01T00:00:00'))
# X-Amz-Date style
self.assertEqual(0, utils.mktime('19700101T000000Z',
s3request.SIGV4_X_AMZ_DATE_FORMAT))
def test_mktime_weird_tz(self):
orig_tz = os.environ.get('TZ', '')
try:
os.environ['TZ'] = 'EST+05EDT,M4.1.0,M10.5.0'
time.tzset()
os.environ['TZ'] = '+0000'
# No tzset! Simulating what Swift would do.
self.assertNotEqual(0, time.timezone)
self.test_mktime()
finally:
os.environ['TZ'] = orig_tz
time.tzset()
class TestS3Timestamp(unittest.TestCase):
def test_s3xmlformat(self):
expected = '1970-01-01T00:00:01.000Z'
# integer
ts = utils.S3Timestamp(1)
self.assertEqual(expected, ts.s3xmlformat)
# microseconds digits are not included in Timestamp.normal so do not
# cause the timestamp to be rounded up
ts = utils.S3Timestamp(1.000001)
self.assertEqual(expected, ts.s3xmlformat)
# milliseconds unit should be rounded up
expected = '1970-01-01T00:00:02.000Z'
ts = utils.S3Timestamp(1.1)
self.assertEqual(expected, ts.s3xmlformat)
# float (deca-microseconds) should be rounded up too
ts = utils.S3Timestamp(1.000010)
self.assertEqual(expected, ts.s3xmlformat)
# Bigger float (milliseconds) should be rounded up too
ts = utils.S3Timestamp(1.9)
self.assertEqual(expected, ts.s3xmlformat)
def test_from_s3xmlformat(self):
ts = utils.S3Timestamp.from_s3xmlformat('2014-06-10T22:47:32.000Z')
self.assertIsInstance(ts, utils.S3Timestamp)
self.assertEqual(1402440452, float(ts))
self.assertEqual('2014-06-10T22:47:32.000000', ts.isoformat)
ts = utils.S3Timestamp.from_s3xmlformat('1970-01-01T00:00:00.000Z')
self.assertIsInstance(ts, utils.S3Timestamp)
self.assertEqual(0.0, float(ts))
self.assertEqual('1970-01-01T00:00:00.000000', ts.isoformat)
ts = utils.S3Timestamp(1402440452.0)
self.assertIsInstance(ts, utils.S3Timestamp)
ts1 = utils.S3Timestamp.from_s3xmlformat(ts.s3xmlformat)
self.assertIsInstance(ts1, utils.S3Timestamp)
self.assertEqual(ts, ts1)
def test_from_isoformat(self):
ts = utils.S3Timestamp.from_isoformat('2014-06-10T22:47:32.054580')
self.assertIsInstance(ts, utils.S3Timestamp)
self.assertEqual(1402440452.05458, float(ts))
self.assertEqual('2014-06-10T22:47:32.054580', ts.isoformat)
self.assertEqual('2014-06-10T22:47:33.000Z', ts.s3xmlformat)
class TestConfig(unittest.TestCase):
def _assert_defaults(self, conf):
self.assertEqual([], conf.storage_domains)
self.assertEqual('us-east-1', conf.location)
self.assertFalse(conf.force_swift_request_proxy_log)
self.assertTrue(conf.dns_compliant_bucket_names)
self.assertTrue(conf.allow_multipart_uploads)
self.assertFalse(conf.allow_no_owner)
self.assertEqual(900, conf.allowable_clock_skew)
self.assertFalse(conf.ratelimit_as_client_error)
def test_defaults(self):
# deliberately brittle so new defaults will need to be added to test
conf = utils.Config()
self._assert_defaults(conf)
del conf.storage_domains
del conf.location
del conf.force_swift_request_proxy_log
del conf.dns_compliant_bucket_names
del conf.allow_multipart_uploads
del conf.allow_no_owner
del conf.allowable_clock_skew
del conf.ratelimit_as_client_error
del conf.max_upload_part_num
self.assertEqual({}, conf)
def test_update(self):
conf = utils.Config()
conf.update({'key1': 'val1', 'key2': 'val2'})
self._assert_defaults(conf)
self.assertEqual(conf.key1, 'val1')
self.assertEqual(conf.key2, 'val2')
conf.update({'allow_multipart_uploads': False})
self.assertFalse(conf.allow_multipart_uploads)
def test_set_get_delete(self):
conf = utils.Config()
self.assertRaises(AttributeError, lambda: conf.new_attr)
conf.new_attr = 123
self.assertEqual(123, conf.new_attr)
del conf.new_attr
self.assertRaises(AttributeError, lambda: conf.new_attr)
if __name__ == '__main__':
unittest.main()