tempurl: Support sha256 and sha512 signatures

Up the default to sha256 since

* the proxy has supported (and defaulted to allowing) it for four years
  now, and
* Rackspace has supported it for even longer.

Include a note in the --help about older clusters likely requiring sha1.

Change-Id: Ibac2bb7e2e4c9946c7384f0aab8e43d0d79ba645
Related-Change: Ia9dd1a91cc3c9c946f5f029cdefc9e66bcf01046
Related-Bug: #1733634
Closes-Bug: #1977867
This commit is contained in:
Tim Burke 2022-06-08 09:30:17 -07:00
parent 1dc635a32c
commit 9eee29d2e4
4 changed files with 151 additions and 48 deletions

@ -1427,6 +1427,8 @@ Optional arguments:
ISO 8601 UTC timestamp instead of a Unix timestamp.
--ip-range If present, the temporary URL will be restricted to the
given ip or ip range.
--digest The digest algorithm to use. Defaults to sha256, but
older clusters may only support sha1.
'''.strip('\n')
@ -1456,6 +1458,12 @@ def st_tempurl(parser, args, thread_manager, return_parser=False):
help=("If present, the temporary URL will be restricted to the "
"given ip or ip range."),
)
parser.add_argument(
'--digest', choices=('sha1', 'sha256', 'sha512'),
default='sha256',
help=("The digest algorithm to use. Defaults to sha256, but "
"older clusters may only support sha1."),
)
# We return the parser to build up the bash_completion
if return_parser:
@ -1480,7 +1488,8 @@ def st_tempurl(parser, args, thread_manager, return_parser=False):
absolute=options['absolute_expiry'],
iso8601=options['iso8601'],
prefix=options['prefix_based'],
ip_range=options['ip_range'])
ip_range=options['ip_range'],
digest=options['digest'])
except ValueError as err:
thread_manager.error(err)
return

@ -14,6 +14,7 @@
# limitations under the License.
"""Miscellaneous utility functions for use with Swift."""
import base64
from calendar import timegm
from collections.abc import Mapping
import gzip
@ -70,7 +71,8 @@ def prt_bytes(num_bytes, human_flag):
def generate_temp_url(path, seconds, key, method, absolute=False,
prefix=False, iso8601=False, ip_range=None):
prefix=False, iso8601=False, ip_range=None,
digest='sha256'):
"""Generates a temporary URL that gives unauthenticated access to the
Swift object.
@ -95,7 +97,11 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
instead of a UNIX timestamp will be created.
:param ip_range: if a valid ip range, restricts the temporary URL to the
range of ips.
:raises ValueError: if timestamp or path is not in valid format.
:param digest: digest algorithm to use. Must be one of ``sha1``,
``sha256``, or ``sha512``.
:raises ValueError: if timestamp or path is not in valid format,
or if digest is not one of ``sha1``, ``sha256``, or
``sha512``.
:return: the path portion of a temporary URL
"""
try:
@ -140,6 +146,11 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
else:
path_for_body = path
if isinstance(digest, str) and digest in ('sha1', 'sha256', 'sha512'):
digest = getattr(hashlib, digest)
if digest not in (hashlib.sha1, hashlib.sha256, hashlib.sha512):
raise ValueError('digest must be one of sha1, sha256, or sha512')
parts = path_for_body.split('/', 4)
if len(parts) != 5 or parts[0] or not all(parts[1:(4 if prefix else 5)]):
if prefix:
@ -177,7 +188,12 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
# Encode to UTF-8 for py3 compatibility
if not isinstance(key, bytes):
key = key.encode('utf-8')
sig = hmac.new(key, hmac_body.encode('utf-8'), hashlib.sha1).hexdigest()
mac = hmac.new(key, hmac_body.encode('utf-8'), digest)
if digest == hashlib.sha512:
sig = 'sha512:' + base64.urlsafe_b64encode(
mac.digest()).decode('ascii').strip('=')
else:
sig = mac.hexdigest()
if iso8601:
expiration = time.strftime(

@ -2035,7 +2035,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False, ip_range=None)
iso8601=False, prefix=False, ip_range=None, digest='sha256')
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_prefix_based(self, temp_url):
@ -2044,7 +2044,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=True, ip_range=None)
iso8601=False, prefix=True, ip_range=None, digest='sha256')
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_in(self, temp_url):
@ -2056,7 +2056,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', d, 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False, ip_range=None)
iso8601=False, prefix=False, ip_range=None, digest='sha256')
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_out(self, temp_url):
@ -2065,7 +2065,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
iso8601=True, prefix=False, ip_range=None)
iso8601=True, prefix=False, ip_range=None, digest='sha256')
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_absolute_expiry_temp_url(self, temp_url):
@ -2074,7 +2074,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=True,
iso8601=False, prefix=False, ip_range=None)
iso8601=False, prefix=False, ip_range=None, digest='sha256')
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_with_ip_range(self, temp_url):
@ -2083,11 +2083,11 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False, ip_range='1.2.3.4')
iso8601=False, prefix=False, ip_range='1.2.3.4', digest='sha256')
def test_temp_url_output(self):
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute"]
"secret_key", "--absolute", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = "63bc77a473a1c2ce956548cacf916f292eb9eac3"
@ -2095,14 +2095,14 @@ class TestShell(unittest.TestCase):
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "http://saio:8080/v1/a/c/o",
"secret_key", "--absolute"]
"secret_key", "--absolute", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
expected = "http://saio:8080%s" % expected
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/",
"secret_key", "--absolute", "--prefix"]
"secret_key", "--absolute", "--prefix", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = '00008c4be1573ba74fc2ab9bce02e3a93d04b349'
@ -2111,7 +2111,8 @@ class TestShell(unittest.TestCase):
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/",
"secret_key", "--absolute", "--prefix", '--iso8601']
"secret_key", "--absolute", "--prefix", '--iso8601',
"--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = '00008c4be1573ba74fc2ab9bce02e3a93d04b349'
@ -2124,7 +2125,7 @@ class TestShell(unittest.TestCase):
strftime(EXPIRES_ISO8601_FORMAT[:-1], localtime(60)))
for d in dates:
argv = ["", "tempurl", "GET", d, "/v1/a/c/o",
"secret_key"]
"secret_key", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = "63bc77a473a1c2ce956548cacf916f292eb9eac3"
@ -2135,19 +2136,20 @@ class TestShell(unittest.TestCase):
mktime(strptime('2005-05-01', SHORT_EXPIRES_ISO8601_FORMAT))))
argv = ["", "tempurl", "GET", ts, "/v1/a/c/",
"secret_key", "--absolute"]
"secret_key", "--absolute", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
expected = output.out
argv = ["", "tempurl", "GET", '2005-05-01', "/v1/a/c/",
"secret_key", "--absolute"]
"secret_key", "--absolute", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute", "--ip-range", "1.2.3.4"]
"secret_key", "--absolute", "--ip-range", "1.2.3.4",
"--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = "6a6ec8efa4be53904ecba8d055d841e24a937c98"
@ -2157,6 +2159,39 @@ class TestShell(unittest.TestCase):
)
self.assertEqual(expected, output.out)
def test_temp_url_digests_output(self):
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
s = "db04994a589b1a2538bff694f0a4f57c7a397617ac2cb49f924d222bbe2b3e01"
expected = "/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60\n" % s
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute", "--digest", "sha256"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
# same signature/expectation
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute", "--digest", "sha1"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = "63bc77a473a1c2ce956548cacf916f292eb9eac3"
expected = "/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60\n" % sig
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute", "--digest", "sha512"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = ("sha512:nMXwEAHu3jzlCZi4wWO1juEq4DikFlX8a729PLJVvUp"
"vg0GpgkJnX5uCG1x-v2KfTrmRtLOcT7KBK2RXLW1uKw")
expected = "/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60\n" % sig
self.assertEqual(expected, output.out)
def test_temp_url_error_output(self):
expected = 'path must be full path to an object e.g. /v1/a/c/o\n'
for bad_path in ('/v1/a/c', 'v1/a/c/o', '/v1/a/c/', '/v1/a//o',

@ -20,7 +20,7 @@ import unittest
from unittest import mock
import tempfile
from time import gmtime, localtime, mktime, strftime, strptime
from hashlib import md5, sha1
import hashlib
from swiftclient import utils as u
@ -127,17 +127,65 @@ class TestTempURL(unittest.TestCase):
seconds = 3600
key = 'correcthorsebatterystaple'
method = 'GET'
expected_url = url + ('?temp_url_sig=temp_url_signature'
'&temp_url_expires=1400003600')
expected_body = '\n'.join([
method,
'1400003600',
url,
]).encode('utf-8')
@property
def expected_url(self):
if isinstance(self.url, bytes):
return self.url + (b'?temp_url_sig=temp_url_signature'
b'&temp_url_expires=1400003600')
return self.url + (u'?temp_url_sig=temp_url_signature'
u'&temp_url_expires=1400003600')
@property
def expected_sha512_url(self):
if isinstance(self.url, bytes):
return self.url + (b'?temp_url_sig=sha512:dGVtcF91cmxfc2lnbmF0dXJl'
b'&temp_url_expires=1400003600')
return self.url + (u'?temp_url_sig=sha512:dGVtcF91cmxfc2lnbmF0dXJl'
u'&temp_url_expires=1400003600')
@mock.patch('hmac.HMAC')
@mock.patch('time.time', return_value=1400000000)
def test_generate_temp_url(self, time_mock, hmac_mock):
def test_generate_sha1_temp_url(self, time_mock, hmac_mock):
hmac_mock().hexdigest.return_value = 'temp_url_signature'
url = u.generate_temp_url(self.url, self.seconds,
self.key, self.method, digest='sha1')
key = self.key
if not isinstance(key, bytes):
key = key.encode('utf-8')
self.assertEqual(url, self.expected_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(),
mock.call(key, self.expected_body, hashlib.sha1),
mock.call().hexdigest(),
])
self.assertIsInstance(url, type(self.url))
@mock.patch('hmac.HMAC')
@mock.patch('time.time', return_value=1400000000)
def test_generate_sha512_temp_url(self, time_mock, hmac_mock):
hmac_mock().digest.return_value = b'temp_url_signature'
url = u.generate_temp_url(self.url, self.seconds,
self.key, self.method, digest=hashlib.sha512)
key = self.key
if not isinstance(key, bytes):
key = key.encode('utf-8')
self.assertEqual(url, self.expected_sha512_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(),
mock.call(key, self.expected_body, hashlib.sha512),
mock.call().digest(),
])
self.assertIsInstance(url, type(self.url))
@mock.patch('hmac.HMAC')
@mock.patch('time.time', return_value=1400000000)
def test_generate_sha256_temp_url_by_default(self, time_mock, hmac_mock):
hmac_mock().hexdigest.return_value = 'temp_url_signature'
url = u.generate_temp_url(self.url, self.seconds,
self.key, self.method)
@ -147,7 +195,7 @@ class TestTempURL(unittest.TestCase):
self.assertEqual(url, self.expected_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(),
mock.call(key, self.expected_body, sha1),
mock.call(key, self.expected_body, hashlib.sha256),
mock.call().hexdigest(),
])
self.assertIsInstance(url, type(self.url))
@ -195,7 +243,7 @@ class TestTempURL(unittest.TestCase):
self.assertEqual(url, ip_range_expected_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(key, expected_body, sha1),
mock.call(key, expected_body, hashlib.sha256),
mock.call().hexdigest(),
])
self.assertIsInstance(url, type(path))
@ -256,7 +304,7 @@ class TestTempURL(unittest.TestCase):
self.assertTrue(url.endswith(expires))
self.assertEqual(hmac_mock.mock_calls, [
mock.call(),
mock.call(key, self.expected_body, sha1),
mock.call(key, self.expected_body, hashlib.sha256),
mock.call().hexdigest(),
])
self.assertIsInstance(url, type(self.url))
@ -284,7 +332,7 @@ class TestTempURL(unittest.TestCase):
key = key.encode('utf-8')
self.assertEqual(url, expected_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(key, expected_body, sha1),
mock.call(key, expected_body, hashlib.sha256),
mock.call().hexdigest(),
])
@ -374,8 +422,6 @@ class TestTempURL(unittest.TestCase):
class TestTempURLUnicodePathAndKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'
key = 'k\u00e9y'
expected_url = ('%s?temp_url_sig=temp_url_signature'
'&temp_url_expires=1400003600') % url
expected_body = '\n'.join([
'GET',
'1400003600',
@ -386,8 +432,6 @@ class TestTempURLUnicodePathAndKey(TestTempURL):
class TestTempURLUnicodePathBytesKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'
key = 'k\u00e9y'.encode('utf-8')
expected_url = ('%s?temp_url_sig=temp_url_signature'
'&temp_url_expires=1400003600') % url
expected_body = '\n'.join([
'GET',
'1400003600',
@ -398,8 +442,6 @@ class TestTempURLUnicodePathBytesKey(TestTempURL):
class TestTempURLBytesPathUnicodeKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode('utf-8')
key = 'k\u00e9y'
expected_url = url + (b'?temp_url_sig=temp_url_signature'
b'&temp_url_expires=1400003600')
expected_body = b'\n'.join([
b'GET',
b'1400003600',
@ -410,8 +452,6 @@ class TestTempURLBytesPathUnicodeKey(TestTempURL):
class TestTempURLBytesPathAndKey(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode('utf-8')
key = 'k\u00e9y'.encode('utf-8')
expected_url = url + (b'?temp_url_sig=temp_url_signature'
b'&temp_url_expires=1400003600')
expected_body = b'\n'.join([
b'GET',
b'1400003600',
@ -422,8 +462,6 @@ class TestTempURLBytesPathAndKey(TestTempURL):
class TestTempURLBytesPathAndNonUtf8Key(TestTempURL):
url = '/v1/\u00e4/c/\u00f3'.encode('utf-8')
key = b'k\xffy'
expected_url = url + (b'?temp_url_sig=temp_url_signature'
b'&temp_url_expires=1400003600')
expected_body = b'\n'.join([
b'GET',
b'1400003600',
@ -436,7 +474,7 @@ class TestReadableToIterable(unittest.TestCase):
def test_iter(self):
chunk_size = 4
write_data = tuple(x.encode() for x in ('a', 'b', 'c', 'd'))
actual_md5sum = md5()
actual_md5sum = hashlib.md5()
with tempfile.TemporaryFile() as f:
for x in write_data:
@ -454,8 +492,8 @@ class TestReadableToIterable(unittest.TestCase):
def test_md5_creation(self):
# Check creation with a real and noop md5 class
data = u.ReadableToIterable(None, None, md5=True)
self.assertEqual(md5().hexdigest(), data.get_md5sum())
self.assertIs(type(md5()), type(data.md5sum))
self.assertEqual(hashlib.md5().hexdigest(), data.get_md5sum())
self.assertIs(type(hashlib.md5()), type(data.md5sum))
data = u.ReadableToIterable(None, None, md5=False)
self.assertEqual('', data.get_md5sum())
@ -464,7 +502,7 @@ class TestReadableToIterable(unittest.TestCase):
def test_unicode(self):
# Check no errors are raised if unicode data is feed in.
unicode_data = 'abc'
actual_md5sum = md5(unicode_data.encode()).hexdigest()
actual_md5sum = hashlib.md5(unicode_data.encode()).hexdigest()
chunk_size = 2
with tempfile.TemporaryFile(mode='w+') as f:
@ -495,15 +533,17 @@ class TestLengthWrapper(unittest.TestCase):
self.assertEqual(42, len(data))
self.assertEqual(42, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s.encode()).hexdigest(),
data.get_md5sum())
data.reset()
self.assertEqual(md5().hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5().hexdigest(), data.get_md5sum())
read_data = ''.join(iter(data.read, ''))
self.assertEqual(42, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s.encode()).hexdigest(),
data.get_md5sum())
def test_bytesio(self):
contents = io.BytesIO(b'a' * 50 + b'b' * 50)
@ -515,7 +555,7 @@ class TestLengthWrapper(unittest.TestCase):
self.assertEqual(42, len(data))
self.assertEqual(42, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s).hexdigest(), data.get_md5sum())
def test_tempfile(self):
with tempfile.NamedTemporaryFile(mode='wb') as f:
@ -529,7 +569,7 @@ class TestLengthWrapper(unittest.TestCase):
self.assertEqual(42, len(data))
self.assertEqual(42, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s).hexdigest(), data.get_md5sum())
def test_segmented_file(self):
with tempfile.NamedTemporaryFile(mode='wb') as f:
@ -548,15 +588,18 @@ class TestLengthWrapper(unittest.TestCase):
self.assertEqual(segment_length, len(data))
self.assertEqual(segment_length, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s).hexdigest(),
data.get_md5sum())
data.reset()
self.assertEqual(md5().hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5().hexdigest(),
data.get_md5sum())
read_data = b''.join(iter(data.read, ''))
self.assertEqual(segment_length, len(data))
self.assertEqual(segment_length, len(read_data))
self.assertEqual(s, read_data)
self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
self.assertEqual(hashlib.md5(s).hexdigest(),
data.get_md5sum())
class TestGroupers(unittest.TestCase):