swift/test/functional/s3api/test_xxe_injection.py
Tim Burke 8dd2d010ac Skip S3 versioning test when versioning is not enabled
Change-Id: I36e42f459a74ed71a1cc57570a564e5562abbae3
2023-02-24 11:48:13 -08:00

235 lines
7.9 KiB
Python

#!/usr/bin/env python
# Copyright (c) 2022 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import botocore
import test.functional as tf
from test.functional.s3api import S3ApiBaseBoto3
def setUpModule():
tf.setup_package()
def tearDownModule():
tf.teardown_package()
class TestS3ApiXxeInjection(S3ApiBaseBoto3):
def setUp(self):
super(TestS3ApiXxeInjection, self).setUp()
self.bucket = 'test-s3api-xxe-injection'
def _create_bucket(self, **kwargs):
resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
@staticmethod
def _clear_data(request, **_kwargs):
request.data = b''
def _presign_url(self, method, key=None, **kwargs):
params = {
'Bucket': self.bucket
}
if key:
params['Key'] = key
params.update(kwargs)
try:
# https://github.com/boto/boto3/issues/2192
self.conn.meta.events.register(
'before-sign.s3.*', self._clear_data)
return self.conn.generate_presigned_url(
method, Params=params, ExpiresIn=60)
finally:
self.conn.meta.events.unregister(
'before-sign.s3.*', self._clear_data)
def test_put_bucket_acl(self):
if not tf.cluster_info['s3api'].get('s3_acl'):
self.skipTest('s3_acl must be enabled')
self._create_bucket()
url = self._presign_url('put_bucket_acl')
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Owner>
<DisplayName>test:tester</DisplayName>
<ID>test:tester</ID>
</Owner>
<AccessControlList>
<Grant>
<Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
<DisplayName>name&xxe;</DisplayName>
<ID>id&xxe;</ID>
</Grantee>
<Permission>WRITE</Permission>
</Grant>
</AccessControlList>
</AccessControlPolicy>
""") # noqa: E501
self.assertEqual(200, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
acl = self.conn.get_bucket_acl(Bucket=self.bucket)
response_metadata = acl.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
self.assertDictEqual({
'Owner': {
'DisplayName': 'test:tester',
'ID': 'test:tester'
},
'Grants': [
{
'Grantee': {
'DisplayName': 'id',
'ID': 'id',
'Type': 'CanonicalUser'
},
'Permission': 'WRITE'
}
]
}, acl)
def test_create_bucket(self):
url = self._presign_url('create_bucket')
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LocationConstraint>&xxe;</LocationConstraint>
</CreateBucketConfiguration>
""") # noqa: E501
self.assertEqual(400, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
self.assertRaisesRegex(
botocore.exceptions.ClientError, 'Not Found',
self.conn.head_bucket, Bucket=self.bucket)
def test_delete_objects(self):
self._create_bucket()
url = self._presign_url(
'delete_objects',
Delete={
'Objects': [
{
'Key': 'string',
'VersionId': 'string'
}
]
})
body = """
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Object>
<Key>&xxe;</Key>
</Object>
</Delete>
"""
body = body.encode('utf-8')
resp = requests.post(url, data=body)
self.assertEqual(400, resp.status_code, resp.content)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
def test_complete_multipart_upload(self):
self._create_bucket()
resp = self.conn.create_multipart_upload(
Bucket=self.bucket, Key='test')
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
uploadid = resp.get('UploadId')
try:
url = self._presign_url(
'complete_multipart_upload',
Key='key',
MultipartUpload={
'Parts': [
{
'ETag': 'string',
'PartNumber': 1
}
],
},
UploadId=uploadid)
resp = requests.post(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Part>
<ETag>"{uploadid}"</ETag>
<PartNumber>&xxe;</PartNumber>
</Part>
</CompleteMultipartUpload>
""") # noqa: E501
self.assertEqual(404, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
resp = requests.post(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Part>
<ETag>"&xxe;"</ETag>
<PartNumber>1</PartNumber>
</Part>
</CompleteMultipartUpload>
""") # noqa: E501
self.assertEqual(404, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
finally:
resp = self.conn.abort_multipart_upload(
Bucket=self.bucket, Key='test', UploadId=uploadid)
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
def test_put_bucket_versioning(self):
if 'object_versioning' not in tf.cluster_info:
raise tf.SkipTest('S3 versioning requires that Swift object '
'versioning be enabled')
self._create_bucket()
url = self._presign_url(
'put_bucket_versioning',
VersioningConfiguration={
'Status': 'Enabled'
})
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Status>&xxe;</Status>
</VersioningConfiguration>
""") # noqa: E501
self.assertEqual(400, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
response_metadata = versioning.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
self.assertDictEqual({}, versioning)