#!/usr/bin/env python # Copyright (c) 2022 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import requests import botocore import test.functional as tf from test.functional.s3api import S3ApiBaseBoto3 def setUpModule(): tf.setup_package() def tearDownModule(): tf.teardown_package() class TestS3ApiXxeInjection(S3ApiBaseBoto3): def setUp(self): super(TestS3ApiXxeInjection, self).setUp() self.bucket = 'test-s3api-xxe-injection' def _create_bucket(self, **kwargs): resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs) response_metadata = resp.pop('ResponseMetadata', {}) self.assertEqual(200, response_metadata.get('HTTPStatusCode')) @staticmethod def _clear_data(request, **_kwargs): request.data = b'' def _presign_url(self, method, key=None, **kwargs): params = { 'Bucket': self.bucket } if key: params['Key'] = key params.update(kwargs) try: # https://github.com/boto/boto3/issues/2192 self.conn.meta.events.register( 'before-sign.s3.*', self._clear_data) return self.conn.generate_presigned_url( method, Params=params, ExpiresIn=60) finally: self.conn.meta.events.unregister( 'before-sign.s3.*', self._clear_data) def test_put_bucket_acl(self): if not tf.cluster_info['s3api'].get('s3_acl'): self.skipTest('s3_acl must be enabled') self._create_bucket() url = self._presign_url('put_bucket_acl') resp = requests.put(url, data=""" ]> test:tester test:tester name&xxe; id&xxe; WRITE """) # noqa: E501 self.assertEqual(200, resp.status_code) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) acl = self.conn.get_bucket_acl(Bucket=self.bucket) response_metadata = acl.pop('ResponseMetadata', {}) self.assertEqual(200, response_metadata.get('HTTPStatusCode')) self.assertDictEqual({ 'Owner': { 'DisplayName': 'test:tester', 'ID': 'test:tester' }, 'Grants': [ { 'Grantee': { 'DisplayName': 'id', 'ID': 'id', 'Type': 'CanonicalUser' }, 'Permission': 'WRITE' } ] }, acl) def test_create_bucket(self): url = self._presign_url('create_bucket') resp = requests.put(url, data=""" ]> &xxe; """) # noqa: E501 self.assertEqual(400, resp.status_code) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) self.assertRaisesRegex( botocore.exceptions.ClientError, 'Not Found', self.conn.head_bucket, Bucket=self.bucket) def test_delete_objects(self): self._create_bucket() url = self._presign_url( 'delete_objects', Delete={ 'Objects': [ { 'Key': 'string', 'VersionId': 'string' } ] }) body = """ ]> &xxe; """ body = body.encode('utf-8') resp = requests.post(url, data=body) self.assertEqual(400, resp.status_code, resp.content) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) def test_complete_multipart_upload(self): self._create_bucket() resp = self.conn.create_multipart_upload( Bucket=self.bucket, Key='test') response_metadata = resp.pop('ResponseMetadata', {}) self.assertEqual(200, response_metadata.get('HTTPStatusCode')) uploadid = resp.get('UploadId') try: url = self._presign_url( 'complete_multipart_upload', Key='key', MultipartUpload={ 'Parts': [ { 'ETag': 'string', 'PartNumber': 1 } ], }, UploadId=uploadid) resp = requests.post(url, data=""" ]> "{uploadid}" &xxe; """) # noqa: E501 self.assertEqual(404, resp.status_code) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) resp = requests.post(url, data=""" ]> "&xxe;" 1 """) # noqa: E501 self.assertEqual(404, resp.status_code) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) finally: resp = self.conn.abort_multipart_upload( Bucket=self.bucket, Key='test', UploadId=uploadid) response_metadata = resp.pop('ResponseMetadata', {}) self.assertEqual(204, response_metadata.get('HTTPStatusCode')) def test_put_bucket_versioning(self): if 'object_versioning' not in tf.cluster_info: raise tf.SkipTest('S3 versioning requires that Swift object ' 'versioning be enabled') self._create_bucket() url = self._presign_url( 'put_bucket_versioning', VersioningConfiguration={ 'Status': 'Enabled' }) resp = requests.put(url, data=""" ]> &xxe; """) # noqa: E501 self.assertEqual(400, resp.status_code) self.assertNotIn(b'xxe', resp.content) self.assertNotIn(b'[swift-hash]', resp.content) versioning = self.conn.get_bucket_versioning(Bucket=self.bucket) response_metadata = versioning.pop('ResponseMetadata', {}) self.assertEqual(200, response_metadata.get('HTTPStatusCode')) self.assertDictEqual({}, versioning)