py3: Port the tempurl middleware

This patch ports the tempurl middleware over to PY3. We use
an "all-native" string model, where we convert WSGI strings
as soon as we see them. It helps to deal with HMAC.

Aaand, we fix formpost along the way. It _clearly_ was
doing a wrong thing, encoding the same body several times
if we had several keys. On py2 it wasn't noticeable, but
on py3 this breaks, because the bytes type cannot be encoded
again.

Change-Id: I69974cc8a39731c980b54137b799a36b2e63a44a
This commit is contained in:
Matthew Oliver 2019-04-16 16:59:51 +10:00 committed by Tim Burke
parent d44a581cdd
commit 9f1ef35630
5 changed files with 267 additions and 284 deletions

View File

@ -135,7 +135,7 @@ from swift.common.utils import streq_const_time, register_swift_info, \
parse_content_disposition, parse_mime_headers, \
iter_multipart_mime_documents, reiterate, close_if_possible
from swift.common.wsgi import make_pre_authed_env
from swift.common.swob import HTTPUnauthorized
from swift.common.swob import HTTPUnauthorized, wsgi_to_str
from swift.proxy.controllers.base import get_account_info, get_container_info
@ -390,7 +390,7 @@ class FormPost(object):
except ValueError:
raise FormInvalid('expired not an integer')
hmac_body = '%s\n%s\n%s\n%s\n%s' % (
orig_env['PATH_INFO'],
wsgi_to_str(orig_env['PATH_INFO']),
attributes.get('redirect') or '',
attributes.get('max_file_size') or '0',
attributes.get('max_file_count') or '0',
@ -400,6 +400,9 @@ class FormPost(object):
has_valid_sig = False
for key in keys:
# Encode key like in swift.common.utls.get_hmac.
if not isinstance(key, six.binary_type):
key = key.encode('utf8')
sig = hmac.new(key, hmac_body, sha1).hexdigest()
if streq_const_time(sig, (attributes.get('signature') or
'invalid')):

View File

@ -312,7 +312,7 @@ from six.moves.urllib.parse import urlencode
from swift.proxy.controllers.base import get_account_info, get_container_info
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import header_to_environ_key, HTTPUnauthorized, \
HTTPBadRequest
HTTPBadRequest, wsgi_to_str
from swift.common.utils import split_path, get_valid_utf8_str, \
register_swift_info, get_hmac, streq_const_time, quote, get_logger, \
strict_b64decode
@ -362,7 +362,8 @@ def get_tempurl_keys_from_metadata(meta):
meta = get_account_info(...)['meta']
keys = get_tempurl_keys_from_metadata(meta)
"""
return [get_valid_utf8_str(value) for key, value in meta.items()
return [(get_valid_utf8_str(value) if six.PY2 else value)
for key, value in meta.items()
if key.lower() in ('temp-url-key', 'temp-url-key-2')]
@ -385,7 +386,7 @@ def authorize_same_account(account_to_match):
except ValueError:
return HTTPUnauthorized(request=req)
if acc == account_to_match:
if wsgi_to_str(acc) == account_to_match:
return None
else:
return HTTPUnauthorized(request=req)
@ -401,7 +402,8 @@ def authorize_same_container(account_to_match, container_to_match):
except ValueError:
return HTTPUnauthorized(request=req)
if acc == account_to_match and con == container_to_match:
if wsgi_to_str(acc) == account_to_match and \
wsgi_to_str(con) == container_to_match:
return None
else:
return HTTPUnauthorized(request=req)
@ -511,6 +513,8 @@ class TempURL(object):
try:
temp_url_sig = binascii.hexlify(strict_b64decode(
temp_url_sig + '=='))
if not six.PY2:
temp_url_sig = temp_url_sig.decode('ascii')
except ValueError:
return self._invalid(env, start_response)
elif len(temp_url_sig) == 40:
@ -620,7 +624,7 @@ class TempURL(object):
elif existing_disposition:
disposition_value = existing_disposition
else:
name = basename(env['PATH_INFO'].rstrip('/'))
name = basename(wsgi_to_str(env['PATH_INFO']).rstrip('/'))
disposition_value = disposition_format('attachment',
name)
# this is probably just paranoia, I couldn't actually get a
@ -653,7 +657,7 @@ class TempURL(object):
except ValueError:
return (None, None, None)
if ver == 'v1' and obj.strip('/'):
return (acc, cont, obj)
return (wsgi_to_str(acc), wsgi_to_str(cont), wsgi_to_str(obj))
return (None, None, None)
def _get_temp_url_info(self, env):
@ -793,7 +797,7 @@ class TempURL(object):
:param env: The WSGI environment for the request.
"""
for h in env.keys():
for h in list(env.keys()):
if h in self.incoming_allow_headers:
continue
for p in self.incoming_allow_headers_startswith:
@ -821,7 +825,7 @@ class TempURL(object):
outgoing responses.
"""
headers = HeaderKeyDict(headers)
for h in headers.keys():
for h in list(headers.keys()):
if h in self.outgoing_allow_headers:
continue
for p in self.outgoing_allow_headers_startswith:

View File

@ -116,9 +116,14 @@ class TestTempurl(Base):
self.env.tempurl_key)
def tempurl_parms(self, method, expires, path, key):
path = urllib.parse.unquote(path)
if not six.PY2:
method = method.encode('utf8')
path = path.encode('utf8')
key = key.encode('utf8')
sig = hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
b'%s\n%d\n%s' % (method, expires, path),
self.digest).hexdigest()
return {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
@ -129,7 +134,7 @@ class TestTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
# GET tempurls also allow HEAD requests
self.assertTrue(self.env.obj.info(parms=self.obj_tempurl_parms,
@ -142,19 +147,19 @@ class TestTempurl(Base):
self.env.tempurl_key2)
contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
def test_GET_DLO_inside_container(self):
seg1 = self.env.container.file(
"get-dlo-inside-seg1" + Utils.create_name())
seg2 = self.env.container.file(
"get-dlo-inside-seg2" + Utils.create_name())
seg1.write("one fish two fish ")
seg2.write("red fish blue fish")
seg1.write(b"one fish two fish ")
seg2.write(b"red fish blue fish")
manifest = self.env.container.file("manifest" + Utils.create_name())
manifest.write(
'',
b'',
hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
(self.env.container.name,)})
@ -164,22 +169,22 @@ class TestTempurl(Base):
self.env.tempurl_key)
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "one fish two fish red fish blue fish")
self.assertEqual(contents, b"one fish two fish red fish blue fish")
def test_GET_DLO_outside_container(self):
seg1 = self.env.container.file(
"get-dlo-outside-seg1" + Utils.create_name())
seg2 = self.env.container.file(
"get-dlo-outside-seg2" + Utils.create_name())
seg1.write("one fish two fish ")
seg2.write("red fish blue fish")
seg1.write(b"one fish two fish ")
seg2.write(b"red fish blue fish")
container2 = self.env.account.container(Utils.create_name())
container2.create()
manifest = container2.file("manifest" + Utils.create_name())
manifest.write(
'',
b'',
hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
(self.env.container.name,)})
@ -190,7 +195,7 @@ class TestTempurl(Base):
# cross container tempurl works fine for account tempurl key
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "one fish two fish red fish blue fish")
self.assertEqual(contents, b"one fish two fish red fish blue fish")
self.assert_status([200])
def test_PUT(self):
@ -206,9 +211,9 @@ class TestTempurl(Base):
for e in (str(expires), expires_8601):
put_parms['temp_url_expires'] = e
new_obj.write('new obj contents',
new_obj.write(b'new obj contents',
parms=put_parms, cfg={'no_auth_token': True})
self.assertEqual(new_obj.read(), "new obj contents")
self.assertEqual(new_obj.read(), b"new obj contents")
# PUT tempurls also allow HEAD requests
self.assertTrue(new_obj.info(parms=put_parms,
@ -225,7 +230,7 @@ class TestTempurl(Base):
# try to create manifest pointing to some random container
try:
new_obj.write('', {
new_obj.write(b'', {
'x-object-manifest': '%s/foo' % 'some_random_container'
}, parms=put_parms, cfg={'no_auth_token': True})
except ResponseError as e:
@ -240,7 +245,7 @@ class TestTempurl(Base):
# try to create manifest pointing to new container
try:
new_obj.write('', {
new_obj.write(b'', {
'x-object-manifest': '%s/foo' % other_container
}, parms=put_parms, cfg={'no_auth_token': True})
except ResponseError as e:
@ -249,7 +254,7 @@ class TestTempurl(Base):
self.fail('request did not error')
# try again using a tempurl POST to an already created object
new_obj.write('', {}, parms=put_parms, cfg={'no_auth_token': True})
new_obj.write(b'', {}, parms=put_parms, cfg={'no_auth_token': True})
expires = int(time()) + 86400
post_parms = self.tempurl_parms(
'POST', expires, self.env.conn.make_path(new_obj.path),
@ -279,7 +284,7 @@ class TestTempurl(Base):
self.assert_status([401])
self.assertRaises(ResponseError, self.env.other_obj.write,
'new contents',
b'new contents',
cfg={'no_auth_token': True},
parms=head_parms)
self.assert_status([401])
@ -288,7 +293,7 @@ class TestTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
@ -299,7 +304,7 @@ class TestTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_sig'][0] == 'a':
@ -316,7 +321,7 @@ class TestTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_expires'][-1] == '0':
@ -330,7 +335,7 @@ class TestTempurl(Base):
self.assert_status([401])
class TestTempURLPrefix(TestTempurl):
class TestTempurlPrefix(TestTempurl):
def tempurl_parms(self, method, expires, path, key,
prefix=None):
path_parts = urllib.parse.unquote(path).split('/')
@ -341,10 +346,14 @@ class TestTempURLPrefix(TestTempurl):
prefix = path_parts[4].decode('utf8')[:4].encode('utf8')
else:
prefix = path_parts[4][:4]
prefix_to_hash = '/'.join(path_parts[0:4]) + '/' + prefix
if not six.PY2:
method = method.encode('utf8')
prefix_to_hash = prefix_to_hash.encode('utf8')
key = key.encode('utf8')
sig = hmac.new(
key,
'%s\n%s\nprefix:%s' % (method, expires,
'/'.join(path_parts[0:4]) + '/' + prefix),
b'%s\n%d\nprefix:%s' % (method, expires, prefix_to_hash),
self.digest).hexdigest()
return {
'temp_url_sig': sig, 'temp_url_expires': str(expires),
@ -359,7 +368,7 @@ class TestTempURLPrefix(TestTempurl):
contents = self.env.obj.read(
parms=parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
def test_no_prefix_match(self):
prefix = 'b' if self.env.obj.name[0] == 'a' else 'a'
@ -375,7 +384,7 @@ class TestTempURLPrefix(TestTempurl):
self.assert_status([401])
def test_object_url_with_prefix(self):
parms = super(TestTempURLPrefix, self).tempurl_parms(
parms = super(TestTempurlPrefix, self).tempurl_parms(
'GET', self.expires,
self.env.conn.make_path(self.env.obj.path),
self.env.tempurl_key)
@ -399,6 +408,10 @@ class TestTempurlUTF8(Base2, TestTempurl):
pass
class TestTempurlPrefixUTF8(Base2, TestTempurlPrefix):
pass
class TestContainerTempurlEnv(BaseEnv):
tempurl_enabled = None # tri-state: None initially, then True/False
@ -474,16 +487,22 @@ class TestContainerTempurl(Base):
'temp_url_expires': str(expires)}
def tempurl_sig(self, method, expires, path, key):
path = urllib.parse.unquote(path)
if not six.PY2:
method = method.encode('utf8')
path = path.encode('utf8')
key = key.encode('utf8')
print(key, method, expires, path)
return hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
b'%s\n%d\n%s' % (method, expires, path),
self.digest).hexdigest()
def test_GET(self):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
# GET tempurls also allow HEAD requests
self.assertTrue(self.env.obj.info(parms=self.obj_tempurl_parms,
@ -498,7 +517,7 @@ class TestContainerTempurl(Base):
'temp_url_expires': str(expires)}
contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
def test_PUT(self):
new_obj = self.env.container.file(Utils.create_name())
@ -510,9 +529,9 @@ class TestContainerTempurl(Base):
put_parms = {'temp_url_sig': sig,
'temp_url_expires': str(expires)}
new_obj.write('new obj contents',
new_obj.write(b'new obj contents',
parms=put_parms, cfg={'no_auth_token': True})
self.assertEqual(new_obj.read(), "new obj contents")
self.assertEqual(new_obj.read(), b"new obj contents")
# PUT tempurls also allow HEAD requests
self.assertTrue(new_obj.info(parms=put_parms,
@ -536,7 +555,7 @@ class TestContainerTempurl(Base):
self.assert_status([401])
self.assertRaises(ResponseError, self.env.other_obj.write,
'new contents',
b'new contents',
cfg={'no_auth_token': True},
parms=self.obj_tempurl_parms)
self.assert_status([401])
@ -545,7 +564,7 @@ class TestContainerTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
self.assertRaises(ResponseError, self.env.other_obj.read,
cfg={'no_auth_token': True},
@ -556,7 +575,7 @@ class TestContainerTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_sig'][0] == 'a':
@ -573,7 +592,7 @@ class TestContainerTempurl(Base):
contents = self.env.obj.read(
parms=self.obj_tempurl_parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
parms = self.obj_tempurl_parms.copy()
if parms['temp_url_expires'][-1] == '0':
@ -613,12 +632,12 @@ class TestContainerTempurl(Base):
"get-dlo-inside-seg1" + Utils.create_name())
seg2 = self.env.container.file(
"get-dlo-inside-seg2" + Utils.create_name())
seg1.write("one fish two fish ")
seg2.write("red fish blue fish")
seg1.write(b"one fish two fish ")
seg2.write(b"red fish blue fish")
manifest = self.env.container.file("manifest" + Utils.create_name())
manifest.write(
'',
b'',
hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
(self.env.container.name,)})
@ -630,7 +649,7 @@ class TestContainerTempurl(Base):
'temp_url_expires': str(expires)}
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
self.assertEqual(contents, "one fish two fish red fish blue fish")
self.assertEqual(contents, b"one fish two fish red fish blue fish")
def test_GET_DLO_outside_container(self):
container2 = self.env.account.container(Utils.create_name())
@ -639,12 +658,12 @@ class TestContainerTempurl(Base):
"get-dlo-outside-seg1" + Utils.create_name())
seg2 = container2.file(
"get-dlo-outside-seg2" + Utils.create_name())
seg1.write("one fish two fish ")
seg2.write("red fish blue fish")
seg1.write(b"one fish two fish ")
seg2.write(b"red fish blue fish")
manifest = self.env.container.file("manifest" + Utils.create_name())
manifest.write(
'',
b'',
hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
(container2.name,)})
@ -695,10 +714,10 @@ class TestSloTempurlEnv(TestTempurlBaseEnv):
raise ResponseError(cls.conn.response)
seg1 = cls.segments_container.file(Utils.create_name())
seg1.write('1' * 1024 * 1024)
seg1.write(b'1' * 1024 * 1024)
seg2 = cls.segments_container.file(Utils.create_name())
seg2.write('2' * 1024 * 1024)
seg2.write(b'2' * 1024 * 1024)
cls.manifest_data = [{'size_bytes': 1024 * 1024,
'etag': seg1.md5,
@ -711,7 +730,7 @@ class TestSloTempurlEnv(TestTempurlBaseEnv):
cls.manifest = cls.manifest_container.file(Utils.create_name())
cls.manifest.write(
json.dumps(cls.manifest_data),
json.dumps(cls.manifest_data).encode('ascii'),
parms={'multipart-manifest': 'put'})
@ -736,9 +755,14 @@ class TestSloTempurl(Base):
self.digest = getattr(hashlib, self.digest_name)
def tempurl_sig(self, method, expires, path, key):
path = urllib.parse.unquote(path)
if not six.PY2:
method = method.encode('utf8')
path = path.encode('utf8')
key = key.encode('utf8')
return hmac.new(
key,
'%s\n%s\n%s' % (method, expires, urllib.parse.unquote(path)),
b'%s\n%d\n%s' % (method, expires, path),
self.digest).hexdigest()
def test_GET(self):
@ -779,22 +803,28 @@ class TestTempurlAlgorithms(Base):
env = TestTempurlEnv
def get_sig(self, expires, digest, encoding):
path = self.env.conn.make_path(self.env.obj.path)
path = urllib.parse.unquote(self.env.conn.make_path(self.env.obj.path))
if six.PY2:
key = self.env.tempurl_key
else:
path = path.encode('utf8')
key = self.env.tempurl_key.encode('utf8')
sig = hmac.new(
self.env.tempurl_key,
'%s\n%s\n%s' % ('GET', expires,
urllib.parse.unquote(path)),
key,
b'GET\n%d\n%s' % (expires, path),
getattr(hashlib, digest))
if encoding == 'hex':
return sig.hexdigest()
elif encoding == 'base64':
return digest + ':' + base64.b64encode(sig.digest())
return digest + ':' + base64.b64encode(
sig.digest()).decode('ascii')
elif encoding == 'base64-no-padding':
return digest + ':' + base64.b64encode(sig.digest()).strip('=')
return digest + ':' + base64.b64encode(
sig.digest()).decode('ascii').strip('=')
elif encoding == 'url-safe-base64':
return digest + ':' + base64.urlsafe_b64encode(sig.digest())
return digest + ':' + base64.urlsafe_b64encode(
sig.digest()).decode('ascii')
else:
raise ValueError('Unrecognized encoding: %r' % encoding)
@ -823,7 +853,7 @@ class TestTempurlAlgorithms(Base):
contents = self.env.obj.read(
parms=parms,
cfg={'no_auth_token': True})
self.assertEqual(contents, "obj contents")
self.assertEqual(contents, b"obj contents")
# GET tempurls also allow HEAD requests
self.assertTrue(self.env.obj.info(

View File

@ -34,6 +34,7 @@ import itertools
import mock
import unittest
import hashlib
import six
from time import time, strftime, gmtime
from swift.common.middleware import tempauth, tempurl
@ -117,7 +118,7 @@ class TestTempURL(unittest.TestCase):
def test_passthrough(self):
resp = self._make_request('/v1/a/c/o').get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertNotIn('Temp URL invalid', resp.body)
self.assertNotIn(b'Temp URL invalid', resp.body)
def test_allow_options(self):
self.app.status_headers_body_iter = iter([('200 Ok', {}, '')])
@ -130,6 +131,8 @@ class TestTempURL(unittest.TestCase):
prefix=None):
if not environ:
environ = {}
if six.PY3 and isinstance(sig, six.binary_type):
sig = sig.decode('utf-8')
environ['QUERY_STRING'] = 'temp_url_sig=%s&temp_url_expires=%s' % (
sig.replace('+', '%2B'), expires)
if prefix is not None:
@ -150,8 +153,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
self.assert_valid_sig(expires, path, [key], sig)
@ -160,19 +163,19 @@ class TestTempURL(unittest.TestCase):
sig = base64.b64encode(hmac.new(
key, hmac_body, hashlib.sha256).digest())
self.assert_valid_sig(expires, path, [key], 'sha256:' + sig)
self.assert_valid_sig(expires, path, [key], b'sha256:' + sig)
sig = base64.b64encode(hmac.new(
key, hmac_body, hashlib.sha512).digest())
self.assert_valid_sig(expires, path, [key], 'sha512:' + sig)
self.assert_valid_sig(expires, path, [key], b'sha512:' + sig)
def test_get_valid_key2(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key1 = 'abc123'
key2 = 'def456'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key1 = b'abc123'
key2 = b'def456'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig1 = hmac.new(key1, hmac_body, hashlib.sha1).hexdigest()
sig2 = hmac.new(key2, hmac_body, hashlib.sha1).hexdigest()
for sig in (sig1, sig2):
@ -193,9 +196,9 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key1 = 'me'
key2 = 'other'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key1 = b'me'
key2 = b'other'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig1 = hmac.new(key1, hmac_body, hashlib.sha1).hexdigest()
sig2 = hmac.new(key2, hmac_body, hashlib.sha1).hexdigest()
account_keys = []
@ -207,8 +210,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -229,8 +232,8 @@ class TestTempURL(unittest.TestCase):
method = 'HEAD'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'REQUEST_METHOD': 'HEAD',
@ -247,8 +250,8 @@ class TestTempURL(unittest.TestCase):
method = 'HEAD'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'REQUEST_METHOD': 'HEAD',
@ -258,7 +261,8 @@ class TestTempURL(unittest.TestCase):
resp = req.get_response(self.tempurl)
get_method = 'GET'
get_hmac_body = '%s\n%s\n%s' % (get_method, expires, path)
get_hmac_body = ('%s\n%i\n%s' %
(get_method, expires, path)).encode('utf-8')
get_sig = hmac.new(key, get_hmac_body, hashlib.sha1).hexdigest()
get_req = self._make_request(path, keys=[key], environ={
'REQUEST_METHOD': 'GET',
@ -273,8 +277,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = 1
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -295,8 +299,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -315,14 +319,16 @@ class TestTempURL(unittest.TestCase):
prefix = 'p1/p2/'
sig_path = 'prefix:/v1/a/c/' + prefix
query_path = '/v1/a/c/' + prefix + 'o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, sig_path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' %
(method, expires, sig_path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
self.assert_valid_sig(expires, query_path, [key], sig, prefix=prefix)
query_path = query_path[:-1] + 'p3/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, sig_path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' %
(method, expires, sig_path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
self.assert_valid_sig(expires, query_path, [key], sig, prefix=prefix)
@ -331,8 +337,9 @@ class TestTempURL(unittest.TestCase):
expires = int(time() + 86400)
sig_path = 'prefix:/v1/a/c/'
query_path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, sig_path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' %
(method, expires, sig_path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
self.assert_valid_sig(expires, query_path, [key], sig, prefix='')
@ -340,8 +347,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/a\r\nb'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
@ -360,8 +367,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
@ -380,8 +387,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o/'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
@ -400,8 +407,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -421,8 +428,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -439,8 +446,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -449,15 +456,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_put_valid(self):
method = 'PUT'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -473,8 +480,8 @@ class TestTempURL(unittest.TestCase):
method = 'PUT'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -482,45 +489,45 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_missing_sig(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
environ={'QUERY_STRING': 'temp_url_expires=%s' % expires})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_missing_expires(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
environ={'QUERY_STRING': 'temp_url_sig=%s' % sig})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_bad_path(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -528,15 +535,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_no_key(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[],
@ -544,15 +551,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_head_allowed_by_get(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -568,8 +575,8 @@ class TestTempURL(unittest.TestCase):
method = 'PUT'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -585,8 +592,8 @@ class TestTempURL(unittest.TestCase):
method = 'POST'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -602,8 +609,8 @@ class TestTempURL(unittest.TestCase):
method = 'PUT'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
# Deliberately fudge expires to show HEADs aren't just automatically
# allowed.
@ -622,8 +629,8 @@ class TestTempURL(unittest.TestCase):
method = 'POST'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -632,7 +639,7 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_delete_when_forbidden_by_config(self):
@ -640,8 +647,8 @@ class TestTempURL(unittest.TestCase):
method = 'DELETE'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -650,15 +657,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_delete_allowed(self):
method = 'DELETE'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -672,8 +679,8 @@ class TestTempURL(unittest.TestCase):
method = 'UNKNOWN'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -682,7 +689,7 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_authorize_limits_scope(self):
@ -701,8 +708,8 @@ class TestTempURL(unittest.TestCase):
expires = int(time() + 86400)
path = '/v1/a/c/o'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
sig = hmac.new('account-key', hmac_body, hashlib.sha1).hexdigest()
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(b'account-key', hmac_body, hashlib.sha1).hexdigest()
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
# make request will setup the environ cache for us
@ -723,8 +730,8 @@ class TestTempURL(unittest.TestCase):
# A request with the container key limits the pre-authed scope to
# the container level; a different container in the same account is
# out of scope and thus forbidden.
hmac_body = '%s\n%s\n%s' % (method, expires, path)
sig = hmac.new('container-key', hmac_body, hashlib.sha1).hexdigest()
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(b'container-key', hmac_body, hashlib.sha1).hexdigest()
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
req = self._make_request(path + qs, **key_kwargs)
@ -744,8 +751,8 @@ class TestTempURL(unittest.TestCase):
# account level. This prevents someone from shrinking the scope of
# account-level tempurls by reusing one of the account's keys on a
# container.
hmac_body = '%s\n%s\n%s' % (method, expires, path)
sig = hmac.new('shared-key', hmac_body, hashlib.sha1).hexdigest()
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(b'shared-key', hmac_body, hashlib.sha1).hexdigest()
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
req = self._make_request(path + qs, **key_kwargs)
@ -764,8 +771,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path + '2', keys=[key],
@ -773,15 +780,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_changed_sig_invalid(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
if sig[-1] != '0':
sig = sig[:-1] + '0'
@ -793,15 +800,15 @@ class TestTempURL(unittest.TestCase):
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_changed_expires_invalid(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -809,17 +816,18 @@ class TestTempURL(unittest.TestCase):
sig, expires + 1)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_ip_range_value_error(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '127.0.0.1'
not_an_ip = 'abcd'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -832,17 +840,18 @@ class TestTempURL(unittest.TestCase):
)
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_bad_ip_range_invalid(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '127.0.0.1'
bad_ip = '127.0.0.2'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -855,23 +864,23 @@ class TestTempURL(unittest.TestCase):
)
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_different_key_invalid(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key + '2'],
path, keys=[key + b'2'],
environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertTrue('Temp URL invalid' in resp.body)
self.assertTrue(b'Temp URL invalid' in resp.body)
self.assertTrue('Www-Authenticate' in resp.headers)
def test_no_prefix_match_invalid(self):
@ -879,8 +888,9 @@ class TestTempURL(unittest.TestCase):
expires = int(time() + 86400)
sig_path = 'prefix:/v1/a/c/p1/p2/'
query_path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, sig_path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' %
(method, expires, sig_path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
query_path, keys=[key],
@ -889,15 +899,16 @@ class TestTempURL(unittest.TestCase):
(sig, expires, 'p1/p2/')})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertTrue('Temp URL invalid' in resp.body)
self.assertTrue(b'Temp URL invalid' in resp.body)
self.assertTrue('Www-Authenticate' in resp.headers)
def test_object_url_with_prefix_invalid(self):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' %
(method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -906,18 +917,19 @@ class TestTempURL(unittest.TestCase):
(sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_disallowed_header_object_manifest(self):
self.tempurl = tempurl.filter_factory({})(self.auth)
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
for method in ('PUT', 'POST'):
for hdr, value in [('X-Object-Manifest', 'private/secret'),
('X-Symlink-Target', 'cont/symlink')]:
hmac_body = '%s\n%s\n%s' % (method, expires, path)
hmac_body = ('%s\n%i\n%s' %
(method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, method=method, keys=[key],
@ -927,9 +939,9 @@ class TestTempURL(unittest.TestCase):
% (sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 400)
self.assertTrue('header' in resp.body)
self.assertTrue('not allowed' in resp.body)
self.assertTrue(hdr in resp.body)
self.assertIn(b'header', resp.body)
self.assertIn(b'not allowed', resp.body)
self.assertIn(hdr.encode('utf-8'), resp.body)
def test_removed_incoming_header(self):
self.tempurl = tempurl.filter_factory({
@ -937,8 +949,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -956,8 +968,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -978,8 +990,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -997,8 +1009,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -1015,8 +1027,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -1034,8 +1046,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -1054,8 +1066,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -1076,8 +1088,8 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
key = b'abc'
hmac_body = ('%s\n%i\n%s' % (method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(
path, keys=[key],
@ -1233,10 +1245,10 @@ class TestTempURL(unittest.TestCase):
def _start_response(status, headers, exc_info=None):
self.assertTrue(status, '401 Unauthorized')
self.assertTrue('Temp URL invalid' in ''.join(
self.assertIn(b'Temp URL invalid', b''.join(
self.tempurl._invalid({'REQUEST_METHOD': 'GET'},
_start_response)))
self.assertEqual('', ''.join(
self.assertIn(b'', b''.join(
self.tempurl._invalid({'REQUEST_METHOD': 'HEAD'},
_start_response)))
@ -1246,7 +1258,7 @@ class TestTempURL(unittest.TestCase):
resp = self._make_request('/v1/a/c/o', environ=environ).get_response(
self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertNotIn('Temp URL invalid', resp.body)
self.assertNotIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
self.assertNotIn('swift.auth_scheme', environ)
@ -1258,7 +1270,7 @@ class TestTempURL(unittest.TestCase):
environ=environ)
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
def test_clean_incoming_headers(self):
@ -1388,9 +1400,10 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip_range = '127.0.0.0/29'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip_range, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip_range, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1411,9 +1424,10 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '127.0.0.1'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1433,10 +1447,11 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '127.0.0.1'
remote_addr = '127.0.0.2'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1446,7 +1461,7 @@ class TestTempURL(unittest.TestCase):
self.tempurl.app = FakeApp(iter([('200 Ok', (), '123')]))
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)
@mock.patch('swift.common.middleware.tempurl.time', return_value=0)
@ -1454,9 +1469,10 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '2001:db8::'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1477,9 +1493,10 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip_range = '2001:db8::/127'
hmac_body = 'ip=%s\n%s\n%s\n%s' % (ip_range, method, expires, path)
hmac_body = ('ip=%s\n%s\n%i\n%s' %
(ip_range, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1499,9 +1516,10 @@ class TestTempURL(unittest.TestCase):
method = 'GET'
expires = (((24 + 1) * 60 + 1) * 60) + 1
path = '/v1/a/c/o'
key = 'abc'
key = b'abc'
ip = '127.0.0.1'
hmac_body = '%s\n%s\n%s\n%s' % (ip, method, expires, path)
hmac_body = ('%s\n%s\n%i\n%s' %
(ip, method, expires, path)).encode('utf-8')
sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
req = self._make_request(path, keys=[key], environ={
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s&'
@ -1510,7 +1528,7 @@ class TestTempURL(unittest.TestCase):
self.tempurl.app = FakeApp(iter([('200 Ok', (), '123')]))
resp = req.get_response(self.tempurl)
self.assertEqual(resp.status_int, 401)
self.assertIn('Temp URL invalid', resp.body)
self.assertIn(b'Temp URL invalid', resp.body)
self.assertIn('Www-Authenticate', resp.headers)

74
tox.ini
View File

@ -27,79 +27,6 @@ setenv = VIRTUAL_ENV={envdir}
NOSE_COVER_HTML=1
NOSE_COVER_HTML_DIR={toxinidir}/cover
# Since the practice demonstrated that developers keep adding py2-specific
# code inadvertedly, we instituted a practice of "py3 ratchet": we convert
# tests file by file, add them into the list below, and never go back.
# This list also serves as a shared task board for those helping with py3.
# But mind that reviews expanding this list may be outstanding in Gerrit.
[testenv:py37]
commands =
find . ( -type f -o -type l ) -name "*.py[c|o]" -delete
find . -type d -name "__pycache__" -delete
nosetests {posargs:\
test/unit/account \
test/unit/cli \
test/unit/common/middleware/crypto \
test/unit/common/middleware/s3api/ \
test/unit/common/middleware/test_account_quotas.py \
test/unit/common/middleware/test_acl.py \
test/unit/common/middleware/test_bulk.py \
test/unit/common/middleware/test_catch_errors.py \
test/unit/common/middleware/test_cname_lookup.py \
test/unit/common/middleware/test_container_sync.py \
test/unit/common/middleware/test_copy.py \
test/unit/common/middleware/test_crossdomain.py \
test/unit/common/middleware/test_dlo.py \
test/unit/common/middleware/test_domain_remap.py \
test/unit/common/middleware/test_formpost.py \
test/unit/common/middleware/test_gatekeeper.py \
test/unit/common/middleware/test_healthcheck.py \
test/unit/common/middleware/test_keystoneauth.py \
test/unit/common/middleware/test_list_endpoints.py \
test/unit/common/middleware/test_listing_formats.py \
test/unit/common/middleware/test_memcache.py \
test/unit/common/middleware/test_name_check.py \
test/unit/common/middleware/test_proxy_logging.py \
test/unit/common/middleware/test_quotas.py \
test/unit/common/middleware/test_ratelimit.py \
test/unit/common/middleware/test_read_only.py \
test/unit/common/middleware/test_recon.py \
test/unit/common/middleware/test_slo.py \
test/unit/common/middleware/test_subrequest_logging.py \
test/unit/common/middleware/test_staticweb.py \
test/unit/common/middleware/test_symlink.py \
test/unit/common/middleware/test_tempauth.py \
test/unit/common/middleware/test_versioned_writes.py \
test/unit/common/middleware/test_xprofile.py \
test/unit/common/ring \
test/unit/common/test_base_storage_server.py \
test/unit/common/test_bufferedhttp.py \
test/unit/common/test_constraints.py \
test/unit/common/test_container_sync_realms.py \
test/unit/common/test_daemon.py \
test/unit/common/test_db.py \
test/unit/common/test_db_replicator.py \
test/unit/common/test_direct_client.py \
test/unit/common/test_exceptions.py \
test/unit/common/test_header_key_dict.py \
test/unit/common/test_internal_client.py \
test/unit/common/test_linkat.py \
test/unit/common/test_manager.py \
test/unit/common/test_memcached.py \
test/unit/common/test_request_helpers.py \
test/unit/common/test_splice.py \
test/unit/common/test_storage_policy.py \
test/unit/common/test_swob.py \
test/unit/common/test_utils.py \
test/unit/common/test_wsgi.py \
test/unit/container \
test/unit/obj \
test/unit/proxy \
test/unit/test_locale}
[testenv:py36]
commands = {[testenv:py37]commands}
[testenv:pep8]
basepython = python2.7
commands =
@ -122,6 +49,7 @@ basepython = python3
commands =
nosetests {posargs: \
test/functional/test_symlink.py \
test/functional/test_tempurl.py \
test/functional/tests.py}
[testenv:func-encryption]