swob: Fix up some WSGI string business

Change-Id: Iee1bab5775b243c318aa22ee4a548d793e6684ca
This commit is contained in:
Tim Burke 2018-06-26 14:19:48 -07:00
parent 306f9a150f
commit efcf7e6a95
3 changed files with 101 additions and 14 deletions

View File

@ -732,7 +732,7 @@ class Accept(object):
return self.headerval
def _req_environ_property(environ_field):
def _req_environ_property(environ_field, is_wsgi_string_field=True):
"""
Set and retrieve value of the environ_field entry in self.environ.
(Used by both request and response)
@ -746,6 +746,9 @@ def _req_environ_property(environ_field):
elif not six.PY2 and isinstance(value, six.binary_type):
self.environ[environ_field] = value.decode('latin1')
else:
if not six.PY2 and is_wsgi_string_field:
# Check that input is valid before setting
value.encode('latin1')
self.environ[environ_field] = value
return property(getter, setter, doc=("Get and set the %s property "
@ -832,7 +835,8 @@ class Request(object):
user_agent = _req_environ_property('HTTP_USER_AGENT')
query_string = _req_environ_property('QUERY_STRING')
if_match = _req_fancy_property(Match, 'if-match')
body_file = _req_environ_property('wsgi.input')
body_file = _req_environ_property('wsgi.input',
is_wsgi_string_field=False)
content_length = _header_int_property('content-length')
if_modified_since = _datetime_property('if-modified-since')
if_unmodified_since = _datetime_property('if-unmodified-since')
@ -840,7 +844,7 @@ class Request(object):
charset = None
_params_cache = None
_timestamp = None
acl = _req_environ_property('swob.ACL')
acl = _req_environ_property('swob.ACL', is_wsgi_string_field=False)
def __init__(self, environ):
self.environ = environ
@ -862,8 +866,12 @@ class Request(object):
environ = environ or {}
if six.PY2 and isinstance(path, six.text_type):
path = path.encode('utf-8')
elif not six.PY2 and isinstance(path, six.binary_type):
elif not six.PY2:
if isinstance(path, six.binary_type):
path = path.decode('latin1')
else:
# Check that the input is valid
path.encode('latin1')
parsed_path = urllib.parse.urlparse(path)
server_name = 'localhost'
@ -876,7 +884,11 @@ class Request(object):
'https': 443}.get(parsed_path.scheme, 80)
if parsed_path.scheme and parsed_path.scheme not in ['http', 'https']:
raise TypeError('Invalid scheme: %s' % parsed_path.scheme)
if six.PY2:
path_info = urllib.parse.unquote(parsed_path.path)
else:
path_info = urllib.parse.unquote(parsed_path.path,
encoding='latin-1')
env = {
'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': '',
@ -959,8 +971,13 @@ class Request(object):
@property
def path(self):
"Provides the full path of the request, excluding the QUERY_STRING"
if six.PY2:
return urllib.parse.quote(self.environ.get('SCRIPT_NAME', '') +
self.environ['PATH_INFO'])
else:
return urllib.parse.quote(self.environ.get('SCRIPT_NAME', '') +
self.environ['PATH_INFO'],
encoding='latin-1')
@property
def swift_entity_path(self):

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
import unittest
from swift.common.swob import Request, HTTPMovedPermanently
@ -24,6 +25,10 @@ class FakeApp(object):
def __call__(self, env, start_response):
start_response('200 OK', [])
if six.PY2:
return [env['PATH_INFO']]
else:
print(env)
return [env['PATH_INFO'].encode('latin-1')]
@ -91,6 +96,13 @@ class TestDomainRemap(unittest.TestCase):
resp = self.app(req.environ, start_response)
self.assertEqual(resp, [b'/v1/AUTH_a/v1'])
def test_domain_remap_account_with_path_root_unicode_container(self):
req = Request.blank('/%E4%BD%A0%E5%A5%BD',
environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'AUTH_a.example.com'})
resp = self.app(req.environ, start_response)
self.assertEqual(resp, [b'/v1/AUTH_a/\xe4\xbd\xa0\xe5\xa5\xbd'])
def test_domain_remap_account_container_with_path_root_obj(self):
req = Request.blank('/v1', environ={'REQUEST_METHOD': 'GET'},
headers={'Host': 'c.AUTH_a.example.com'})

View File

@ -892,18 +892,76 @@ class TestRequest(unittest.TestCase):
self.assertEqual(str(err), 'Invalid path: o%0An%20e')
def test_unicode_path(self):
# Byte sequences always make sense
req = swift.common.swob.Request.blank(u'/\u2661'.encode('utf8'))
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
req = swift.common.swob.Request.blank('/')
req.path_info = u'/\u2661'.encode('utf8')
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
if six.PY2:
# Unicode is encoded to UTF-8 on py2, to paper over deserialized
# JSON slipping into subrequests
req = swift.common.swob.Request.blank(u'/\u2661')
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
req = swift.common.swob.Request.blank('/')
req.path_info = u'/\u2661'
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
else:
# Arbitrary Unicode *is not* supported on py3 -- only latin-1
# encodable is supported, because PEP-3333.
with self.assertRaises(UnicodeEncodeError):
req = swift.common.swob.Request.blank(u'/\u2661')
req = swift.common.swob.Request.blank('/')
with self.assertRaises(UnicodeEncodeError):
req.path_info = u'/\u2661'
# Update didn't take
self.assertEqual(req.path, '/')
self.assertEqual(req.environ['PATH_INFO'], '/')
# Needs to be a "WSGI string"
req = swift.common.swob.Request.blank('/\xe2\x99\xa1')
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
req = swift.common.swob.Request.blank('/')
req.path_info = '/\xe2\x99\xa1'
self.assertEqual(req.path, quote(u'/\u2661'.encode('utf-8')))
self.assertEqual(req.environ['PATH_INFO'], '/\xe2\x99\xa1')
def test_unicode_query(self):
req = swift.common.swob.Request.blank(u'/')
req.query_string = u'x=\u2661'
# Bytes are always OK
req = swift.common.swob.Request.blank('/')
encoded = u'\u2661'.encode('utf-8')
req.query_string = b'x=' + encoded
if six.PY2:
self.assertEqual(req.params['x'], encoded)
else:
# XXX should this be latin1?
self.assertEqual(req.params['x'], encoded.decode('utf8'))
self.assertEqual(req.params['x'], encoded.decode('latin1'))
if six.PY2:
# Unicode will be UTF-8-encoded on py2
req = swift.common.swob.Request.blank('/')
req.query_string = u'x=\u2661'
self.assertEqual(req.params['x'], encoded)
else:
# ...but py3 requires "WSGI strings"
req = swift.common.swob.Request.blank('/')
with self.assertRaises(UnicodeEncodeError):
req.query_string = u'x=\u2661'
self.assertEqual(req.params, {})
req = swift.common.swob.Request.blank('/')
req.query_string = 'x=' + encoded.decode('latin-1')
self.assertEqual(req.params['x'], encoded.decode('latin-1'))
def test_url2(self):
pi = '/hi/there'