Merge "s3api: Prevent XXE injections" into stable/ussuri

This commit is contained in:
Zuul 2023-01-25 20:08:52 +00:00 committed by Gerrit Code Review
commit 1721d8586c
3 changed files with 274 additions and 1 deletions

View File

@ -130,7 +130,7 @@ class _Element(lxml.etree.ElementBase):
parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
parser = lxml.etree.XMLParser()
parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
parser.set_element_class_lookup(parser_lookup)
Element = parser.makeelement

View File

@ -0,0 +1,233 @@
#!/usr/bin/env python
# Copyright (c) 2022 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import botocore
import test.functional as tf
from test.functional.s3api import S3ApiBaseBoto3
def setUpModule():
tf.setup_package()
def tearDownModule():
tf.teardown_package()
class TestS3ApiXxeInjection(S3ApiBaseBoto3):
def setUp(self):
super(TestS3ApiXxeInjection, self).setUp()
self.bucket = 'test-s3api-xxe-injection'
def _create_bucket(self, **kwargs):
resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
@staticmethod
def _clear_data(request, **_kwargs):
if 'Content-MD5' in request.headers:
del request.headers['Content-MD5']
request.data = b''
def _presign_url(self, method, key=None, **kwargs):
params = {
'Bucket': self.bucket
}
if key:
params['Key'] = key
params.update(kwargs)
try:
# https://github.com/boto/boto3/issues/2192
self.conn.meta.events.register(
'before-sign.s3.*', self._clear_data)
return self.conn.generate_presigned_url(
method, Params=params, ExpiresIn=60)
finally:
self.conn.meta.events.unregister(
'before-sign.s3.*', self._clear_data)
def test_put_bucket_acl(self):
if not tf.cluster_info['s3api'].get('s3_acl'):
self.skipTest('s3_acl must be enabled')
self._create_bucket()
url = self._presign_url('put_bucket_acl')
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Owner>
<DisplayName>test:tester</DisplayName>
<ID>test:tester</ID>
</Owner>
<AccessControlList>
<Grant>
<Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
<DisplayName>name&xxe;</DisplayName>
<ID>id&xxe;</ID>
</Grantee>
<Permission>WRITE</Permission>
</Grant>
</AccessControlList>
</AccessControlPolicy>
""") # noqa: E501
self.assertEqual(200, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
acl = self.conn.get_bucket_acl(Bucket=self.bucket)
response_metadata = acl.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
self.assertDictEqual({
'Owner': {
'DisplayName': 'test:tester',
'ID': 'test:tester'
},
'Grants': [
{
'Grantee': {
'DisplayName': 'id',
'ID': 'id',
'Type': 'CanonicalUser'
},
'Permission': 'WRITE'
}
]
}, acl)
def test_create_bucket(self):
url = self._presign_url('create_bucket')
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LocationConstraint>&xxe;</LocationConstraint>
</CreateBucketConfiguration>
""") # noqa: E501
self.assertEqual(400, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
self.assertRaisesRegexp(
botocore.exceptions.ClientError, 'Not Found',
self.conn.head_bucket, Bucket=self.bucket)
def test_delete_objects(self):
self._create_bucket()
url = self._presign_url(
'delete_objects',
Delete={
'Objects': [
{
'Key': 'string',
'VersionId': 'string'
}
]
})
body = """
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Object>
<Key>&xxe;</Key>
</Object>
</Delete>
"""
body = body.encode('utf-8')
resp = requests.post(url, data=body)
self.assertEqual(400, resp.status_code, resp.content)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
def test_complete_multipart_upload(self):
self._create_bucket()
resp = self.conn.create_multipart_upload(
Bucket=self.bucket, Key='test')
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
uploadid = resp.get('UploadId')
try:
url = self._presign_url(
'complete_multipart_upload',
Key='key',
MultipartUpload={
'Parts': [
{
'ETag': 'string',
'PartNumber': 1
}
],
},
UploadId=uploadid)
resp = requests.post(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Part>
<ETag>"{uploadid}"</ETag>
<PartNumber>&xxe;</PartNumber>
</Part>
</CompleteMultipartUpload>
""") # noqa: E501
self.assertEqual(404, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
resp = requests.post(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Part>
<ETag>"&xxe;"</ETag>
<PartNumber>1</PartNumber>
</Part>
</CompleteMultipartUpload>
""") # noqa: E501
self.assertEqual(404, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
finally:
resp = self.conn.abort_multipart_upload(
Bucket=self.bucket, Key='test', UploadId=uploadid)
response_metadata = resp.pop('ResponseMetadata', {})
self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
def test_put_bucket_versioning(self):
self._create_bucket()
url = self._presign_url(
'put_bucket_versioning',
VersioningConfiguration={
'Status': 'Enabled'
})
resp = requests.put(url, data="""
<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Status>&xxe;</Status>
</VersioningConfiguration>
""") # noqa: E501
self.assertEqual(400, resp.status_code)
self.assertNotIn(b'xxe', resp.content)
self.assertNotIn(b'[swift-hash]', resp.content)
versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
response_metadata = versioning.pop('ResponseMetadata', {})
self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
self.assertDictEqual({}, versioning)

View File

@ -445,6 +445,7 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
body=body)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
def _test_object_multi_DELETE(self, account):
self.keys = ['Key1', 'Key2']
@ -500,6 +501,45 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
def test_object_multi_DELETE_with_system_entity(self):
self.keys = ['Key1', 'Key2']
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
swob.HTTPNotFound, {}, None)
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
swob.HTTPNoContent, {}, None)
elem = Element('Delete')
for key in self.keys:
obj = SubElement(elem, 'Object')
SubElement(obj, 'Key').text = key
body = tostring(elem, use_s3ns=False)
body = body.replace(
b'?>\n',
b'?>\n<!DOCTYPE foo '
b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
).replace(b'>Key1<', b'>Key1&ent;<')
content_md5 = (
base64.b64encode(md5(body).digest())
.strip())
req = Request.blank('/bucket?delete',
environ={'REQUEST_METHOD': 'POST'},
headers={
'Authorization': 'AWS test:full_control:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
req.date = datetime.now()
req.content_type = 'text/plain'
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '200 OK', body)
self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
self.assertNotIn(b'root:/root', body)
self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
def _test_no_body(self, use_content_length=False,
use_transfer_encoding=False, string_to_md5=b''):
content_md5 = base64.b64encode(md5(string_to_md5).digest()).strip()