Merge "Individual content types for multi form upload"

This commit is contained in:
Zuul 2021-05-25 01:41:48 +00:00 committed by Gerrit Code Review
commit 94f21e15ae
2 changed files with 69 additions and 4 deletions

View File

@ -267,6 +267,7 @@ class FormPost(object):
boundary = boundary.encode('utf-8')
status = message = ''
attributes = {}
file_attributes = {}
subheaders = []
file_count = 0
for fp in iter_multipart_mime_documents(
@ -283,15 +284,17 @@ class FormPost(object):
break
except ValueError:
raise FormInvalid('max_file_count not an integer')
attributes['filename'] = attrs['filename'] or 'filename'
file_attributes = attributes.copy()
file_attributes['filename'] = attrs['filename'] or 'filename'
if 'content-type' not in attributes and 'content-type' in hdrs:
attributes['content-type'] = \
file_attributes['content-type'] = \
hdrs['Content-Type'] or 'application/octet-stream'
if 'content-encoding' not in attributes and \
'content-encoding' in hdrs:
attributes['content-encoding'] = hdrs['Content-Encoding']
file_attributes['content-encoding'] = \
hdrs['Content-Encoding']
status, subheaders = \
self._perform_subrequest(env, attributes, fp, keys)
self._perform_subrequest(env, file_attributes, fp, keys)
if not status.startswith('2'):
break
else:

View File

@ -2036,6 +2036,68 @@ class TestFormPost(unittest.TestCase):
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
def test_multiple_content_type_encoding(self):
body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="file4"; '
'filename="testfile4.txt"',
'Content-Type: application/json',
'',
'{"four": 4}\n',
]
if six.PY3:
body_part = [line.encode('utf-8') for line in body_part]
key = b'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
# splice in another file with a different content type
before_closing_boundary = len(body) - 2
body[before_closing_boundary:before_closing_boundary] = body_part
wsgi_input = b'\r\n'.join(body)
env['wsgi.input'] = BytesIO(wsgi_input)
env['swift.infocache'][get_cache_key('AUTH_test')] = (
self._fake_cache_env('AUTH_test', [key]))
env['swift.infocache'][get_cache_key(
'AUTH_test', 'container')] = {'meta': {}}
self.app = FakeApp(iter([('201 Created', {}, b'')] * 3))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
body = b''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
self.assertEqual(status, '201 Created')
self.assertTrue(b'201 Created' in body)
self.assertEqual(len(self.app.requests), 3)
self.assertEqual(self.app.requests[0].body, b'Test File\nOne\n')
self.assertEqual(self.app.requests[1].body, b'Test\nFile\nTwo\n')
self.assertEqual(self.app.requests[2].body, b'{"four": 4}\n')
self.assertIn("Content-Type", self.app.requests[0].headers)
self.assertIn("Content-Type", self.app.requests[1].headers)
self.assertIn("Content-Type", self.app.requests[2].headers)
self.assertEqual("text/plain",
self.app.requests[0].headers["Content-Type"])
self.assertEqual("text/plain",
self.app.requests[1].headers["Content-Type"])
self.assertEqual("application/json",
self.app.requests[2].headers["Content-Type"])
self.assertFalse("Content-Encoding" in self.app.requests[0].headers)
self.assertIn("Content-Encoding", self.app.requests[1].headers)
self.assertEqual("gzip",
self.app.requests[1].headers["Content-Encoding"])
self.assertFalse("Content-Encoding" in self.app.requests[2].headers)
if __name__ == '__main__':
unittest.main()