Merge "Add config option for whether to skip s3_acl-requiring tests"
This commit is contained in:
@@ -12,7 +12,7 @@
|
|||||||
# implied.
|
# implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import functools
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
@@ -144,6 +144,15 @@ def get_s3_client(user=1, signature_version='s3v4', addressing_style='path'):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def skip_if_s3_acl_tests_disabled(func):
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
if config_true_value(get_opt('s3_acl_tests_disabled', 'false')):
|
||||||
|
raise unittest.SkipTest('s3_acl_tests_disabled is true')
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def etag_from_resp(response):
|
def etag_from_resp(response):
|
||||||
return response['ETag']
|
return response['ETag']
|
||||||
|
|
||||||
|
|||||||
@@ -15,38 +15,43 @@
|
|||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from test.s3api import BaseS3TestCase, ConfigError
|
from test.s3api import BaseS3TestCase, ConfigError, \
|
||||||
|
skip_if_s3_acl_tests_disabled
|
||||||
|
|
||||||
|
|
||||||
class TestGetServiceSigV4(BaseS3TestCase):
|
class TestGetServiceSigV4(BaseS3TestCase):
|
||||||
|
def _do_test_empty_service(self, client):
|
||||||
|
access_key = client._request_signer._credentials.access_key
|
||||||
|
resp = client.list_buckets()
|
||||||
|
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
|
||||||
|
self.assertEqual([], resp['Buckets'])
|
||||||
|
self.assertIn('x-amz-request-id',
|
||||||
|
resp['ResponseMetadata']['HTTPHeaders'])
|
||||||
|
self.assertIn('DisplayName', resp['Owner'])
|
||||||
|
self.assertEqual(access_key, resp['Owner']['DisplayName'])
|
||||||
|
self.assertIn('ID', resp['Owner'])
|
||||||
|
|
||||||
def test_empty_service(self):
|
def test_empty_service(self):
|
||||||
def do_test(client):
|
client1 = self.get_s3_client(1)
|
||||||
access_key = client._request_signer._credentials.access_key
|
self._do_test_empty_service(client1)
|
||||||
resp = client.list_buckets()
|
|
||||||
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
|
|
||||||
self.assertEqual([], resp['Buckets'])
|
|
||||||
self.assertIn('x-amz-request-id',
|
|
||||||
resp['ResponseMetadata']['HTTPHeaders'])
|
|
||||||
self.assertIn('DisplayName', resp['Owner'])
|
|
||||||
self.assertEqual(access_key, resp['Owner']['DisplayName'])
|
|
||||||
self.assertIn('ID', resp['Owner'])
|
|
||||||
|
|
||||||
client = self.get_s3_client(1)
|
@skip_if_s3_acl_tests_disabled
|
||||||
do_test(client)
|
def test_empty_service_client3(self):
|
||||||
try:
|
try:
|
||||||
client = self.get_s3_client(3)
|
client3 = self.get_s3_client(3)
|
||||||
except ConfigError:
|
except ConfigError as err:
|
||||||
pass
|
raise unittest.SkipTest(str(err))
|
||||||
else:
|
else:
|
||||||
do_test(client)
|
self._do_test_empty_service(client3)
|
||||||
|
|
||||||
def test_service_with_buckets(self):
|
def _create_buckets(self, client):
|
||||||
c = self.get_s3_client(1)
|
|
||||||
buckets = [self.create_name('bucket%s' % i) for i in range(5)]
|
buckets = [self.create_name('bucket%s' % i) for i in range(5)]
|
||||||
for bucket in buckets:
|
for bucket in buckets:
|
||||||
c.create_bucket(Bucket=bucket)
|
client.create_bucket(Bucket=bucket)
|
||||||
|
return buckets
|
||||||
|
|
||||||
resp = c.list_buckets()
|
def _do_test_service_with_buckets(self, client, buckets):
|
||||||
|
resp = client.list_buckets()
|
||||||
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
|
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
|
||||||
self.assertEqual(sorted(buckets), [
|
self.assertEqual(sorted(buckets), [
|
||||||
bucket['Name'] for bucket in resp['Buckets']])
|
bucket['Name'] for bucket in resp['Buckets']])
|
||||||
@@ -55,27 +60,38 @@ class TestGetServiceSigV4(BaseS3TestCase):
|
|||||||
self.assertIn('x-amz-request-id',
|
self.assertIn('x-amz-request-id',
|
||||||
resp['ResponseMetadata']['HTTPHeaders'])
|
resp['ResponseMetadata']['HTTPHeaders'])
|
||||||
self.assertIn('DisplayName', resp['Owner'])
|
self.assertIn('DisplayName', resp['Owner'])
|
||||||
access_key = c._request_signer._credentials.access_key
|
access_key = client._request_signer._credentials.access_key
|
||||||
self.assertEqual(access_key, resp['Owner']['DisplayName'])
|
self.assertEqual(access_key, resp['Owner']['DisplayName'])
|
||||||
self.assertIn('ID', resp['Owner'])
|
self.assertIn('ID', resp['Owner'])
|
||||||
|
|
||||||
|
def test_service_with_buckets(self):
|
||||||
|
client = self.get_s3_client(1)
|
||||||
|
buckets = self._create_buckets(client)
|
||||||
|
self._do_test_service_with_buckets(client, buckets)
|
||||||
|
|
||||||
|
@skip_if_s3_acl_tests_disabled
|
||||||
|
def test_service_with_buckets_client2(self):
|
||||||
# Second user can only see its own buckets
|
# Second user can only see its own buckets
|
||||||
try:
|
try:
|
||||||
c2 = self.get_s3_client(2)
|
client2 = self.get_s3_client(2)
|
||||||
except ConfigError as err:
|
except ConfigError as err:
|
||||||
raise unittest.SkipTest(str(err))
|
raise unittest.SkipTest(str(err))
|
||||||
buckets2 = [self.create_name('bucket%s' % i) for i in range(2)]
|
client1 = self.get_s3_client(1)
|
||||||
for bucket in buckets2:
|
self._create_buckets(client1)
|
||||||
c2.create_bucket(Bucket=bucket)
|
buckets2 = self._create_buckets(client2)
|
||||||
self.assertEqual(sorted(buckets2), [
|
self.assertEqual(sorted(buckets2), [
|
||||||
bucket['Name'] for bucket in c2.list_buckets()['Buckets']])
|
bucket['Name'] for bucket in client2.list_buckets()['Buckets']])
|
||||||
|
|
||||||
|
@skip_if_s3_acl_tests_disabled
|
||||||
|
def test_service_with_buckets_client3(self):
|
||||||
# Unprivileged user can't see anything
|
# Unprivileged user can't see anything
|
||||||
try:
|
try:
|
||||||
c3 = self.get_s3_client(3)
|
client3 = self.get_s3_client(3)
|
||||||
except ConfigError as err:
|
except ConfigError as err:
|
||||||
raise unittest.SkipTest(str(err))
|
raise unittest.SkipTest(str(err))
|
||||||
self.assertEqual([], c3.list_buckets()['Buckets'])
|
client1 = self.get_s3_client(1)
|
||||||
|
self._create_buckets(client1)
|
||||||
|
self.assertEqual([], client3.list_buckets()['Buckets'])
|
||||||
|
|
||||||
|
|
||||||
class TestGetServiceSigV2(TestGetServiceSigV4):
|
class TestGetServiceSigV2(TestGetServiceSigV4):
|
||||||
|
|||||||
@@ -1,8 +1,4 @@
|
|||||||
[s3api_test]
|
[s3api_test]
|
||||||
# You just enable advanced compatibility features to pass all tests. Add the
|
|
||||||
# following non-default options to the s3api section of your proxy-server.conf
|
|
||||||
# s3_acl = True
|
|
||||||
# check_bucket_owner = True
|
|
||||||
endpoint = http://127.0.0.1:8080
|
endpoint = http://127.0.0.1:8080
|
||||||
#ca_cert=/path/to/ca.crt
|
#ca_cert=/path/to/ca.crt
|
||||||
region = us-east-1
|
region = us-east-1
|
||||||
@@ -15,6 +11,13 @@ secret_key2 = testing2
|
|||||||
access_key3 = test:tester3
|
access_key3 = test:tester3
|
||||||
secret_key3 = testing3
|
secret_key3 = testing3
|
||||||
|
|
||||||
|
# Some tests require advanced compatibility features to pass. Add the
|
||||||
|
# following non-default options to the s3api section of your proxy-server.conf
|
||||||
|
# s3_acl = True
|
||||||
|
# check_bucket_owner = True
|
||||||
|
# Alternatively, skip those tests by setting this option to True
|
||||||
|
s3_acl_tests_disabled = False
|
||||||
|
|
||||||
[func_test]
|
[func_test]
|
||||||
# Sample config for Swift with tempauth
|
# Sample config for Swift with tempauth
|
||||||
auth_uri = http://127.0.0.1:8080/auth/v1.0
|
auth_uri = http://127.0.0.1:8080/auth/v1.0
|
||||||
|
|||||||
Reference in New Issue
Block a user