check content type in JSONRequestDeserializer

Fixed Bug #1187882

Change-Id: I5e8922690af1cb497366c5f09183bbccbc68f5a2
This commit is contained in:
Zhang Lei (Sneeze) 2013-06-25 13:27:00 +08:00
parent cdb27991a3
commit 2970a3ecd9
2 changed files with 92 additions and 10 deletions

View File

@ -427,6 +427,26 @@ class Request(webob.Request):
return content_type
def is_json_content_type(request):
if request.method == 'GET':
try:
aws_content_type = request.params.get("ContentType")
except Exception:
aws_content_type = None
#respect aws_content_type when both available
content_type = aws_content_type or request.content_type
else:
content_type = request.content_type
#bug #1887882
#for back compatible for null or plain content type
if not content_type or content_type.startswith('text/plain'):
content_type = 'application/json'
if content_type in ('JSON', 'application/json')\
and request.body.startswith('{'):
return True
return False
class JSONRequestDeserializer(object):
def has_body(self, request):
"""
@ -434,9 +454,7 @@ class JSONRequestDeserializer(object):
:param request: Webob.Request object
"""
if 'transfer-encoding' in request.headers:
return True
elif request.content_length > 0:
if request.content_length > 0 and is_json_content_type(request):
return True
return False

View File

@ -200,6 +200,7 @@ class JSONRequestDeserializerTest(HeatTestCase):
request.method = 'POST'
request.body = 'asdf'
request.headers.pop('Content-Length')
request.headers['Content-Type'] = 'application/json'
self.assertFalse(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_zero_content_length(self):
@ -207,13 +208,68 @@ class JSONRequestDeserializerTest(HeatTestCase):
request.method = 'POST'
request.body = 'asdf'
request.headers['Content-Length'] = 0
request.headers['Content-Type'] = 'application/json'
self.assertFalse(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_content_length(self):
def test_has_body_has_content_length_no_content_type(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_content_length_plain_content_type(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
request.headers['Content-Type'] = 'text/plain'
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_content_type_malformed(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = 'asdf'
self.assertTrue('Content-Length' in request.headers)
request.headers['Content-Type'] = 'application/json'
self.assertFalse(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_content_type(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
request.headers['Content-Type'] = 'application/json'
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_wrong_content_type(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
request.headers['Content-Type'] = 'application/xml'
self.assertFalse(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_has_aws_content_type_only(self):
request = wsgi.Request.blank('/?ContentType=JSON')
request.method = 'GET'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_respect_aws_content_type(self):
request = wsgi.Request.blank('/?ContentType=JSON')
request.method = 'GET'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
request.headers['Content-Type'] = 'application/xml'
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_has_body_content_type_with_get(self):
request = wsgi.Request.blank('/')
request.method = 'GET'
request.body = '{"key": "value"}'
self.assertTrue('Content-Length' in request.headers)
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
def test_no_body_no_content_length(self):
@ -245,10 +301,18 @@ class JSONRequestDeserializerTest(HeatTestCase):
expected = {"body": {"key": "value"}}
self.assertEqual(actual, expected)
def test_has_body_has_transfer_encoding(self):
def test_default_with_get_with_body(self):
request = wsgi.Request.blank('/')
request.method = 'POST'
request.body = 'fake_body'
request.headers['transfer-encoding'] = 0
self.assertTrue('transfer-encoding' in request.headers)
self.assertTrue(wsgi.JSONRequestDeserializer().has_body(request))
request.method = 'GET'
request.body = '{"key": "value"}'
actual = wsgi.JSONRequestDeserializer().default(request)
expected = {"body": {"key": "value"}}
self.assertEqual(actual, expected)
def test_default_with_get_with_body_with_aws(self):
request = wsgi.Request.blank('/?ContentType=JSON')
request.method = 'GET'
request.body = '{"key": "value"}'
actual = wsgi.JSONRequestDeserializer().default(request)
expected = {"body": {"key": "value"}}
self.assertEqual(actual, expected)