Add ability to generate a temporary URL with an

IP range restriction

Change-Id: I4734599886e4f4a563162390d0ff3bb1ef639db4
This commit is contained in:
mmcardle 2018-07-10 14:45:32 +01:00
parent c2c5af603f
commit 47fb18c41b
4 changed files with 105 additions and 9 deletions

View File

@ -1325,6 +1325,8 @@ Optional arguments:
generated.
--iso8601 If present, the generated temporary URL will contain an
ISO 8601 UTC timestamp instead of a Unix timestamp.
--ip-range If present, the temporary URL will be restricted to the
given ip or ip range.
'''.strip('\n')
@ -1348,6 +1350,12 @@ def st_tempurl(parser, args, thread_manager):
help=("If present, the temporary URL will contain an ISO 8601 UTC "
"timestamp instead of a Unix timestamp."),
)
parser.add_argument(
'--ip-range', action='store',
default=None,
help=("If present, the temporary URL will be restricted to the "
"given ip or ip range."),
)
(options, args) = parse_args(parser, args)
args = args[1:]
@ -1367,7 +1375,8 @@ def st_tempurl(parser, args, thread_manager):
path = generate_temp_url(parsed.path, timestamp, key, method,
absolute=options['absolute_expiry'],
iso8601=options['iso8601'],
prefix=options['prefix_based'])
prefix=options['prefix_based'],
ip_range=options['ip_range'])
except ValueError as err:
thread_manager.error(err)
return

View File

@ -69,7 +69,7 @@ def prt_bytes(num_bytes, human_flag):
def generate_temp_url(path, seconds, key, method, absolute=False,
prefix=False, iso8601=False):
prefix=False, iso8601=False, ip_range=None):
"""Generates a temporary URL that gives unauthenticated access to the
Swift object.
@ -92,6 +92,8 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
:param prefix: if True then a prefix-based temporary URL will be generated.
:param iso8601: if True, a URL containing an ISO 8601 UTC timestamp
instead of a UNIX timestamp will be created.
:param ip_range: if a valid ip range, restricts the temporary URL to the
range of ips.
:raises ValueError: if timestamp or path is not in valid format.
:return: the path portion of a temporary URL
"""
@ -155,8 +157,21 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
expiration = int(time.time() + timestamp)
else:
expiration = timestamp
hmac_body = u'\n'.join([method.upper(), str(expiration),
('prefix:' if prefix else '') + path_for_body])
hmac_parts = [method.upper(), str(expiration),
('prefix:' if prefix else '') + path_for_body]
if ip_range:
if isinstance(ip_range, six.binary_type):
try:
ip_range = ip_range.decode('utf-8')
except UnicodeDecodeError:
raise ValueError(
'ip_range must be representable as UTF-8'
)
hmac_parts.insert(0, "ip=%s" % ip_range)
hmac_body = u'\n'.join(hmac_parts)
# Encode to UTF-8 for py3 compatibility
if not isinstance(key, six.binary_type):
@ -169,6 +184,10 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
temp_url = u'{path}?temp_url_sig={sig}&temp_url_expires={exp}'.format(
path=path_for_body, sig=sig, exp=expiration)
if ip_range:
temp_url += u'&temp_url_ip_range={}'.format(ip_range)
if prefix:
temp_url += u'&temp_url_prefix={}'.format(parts[4])
# Have return type match path from caller

View File

@ -1667,7 +1667,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False)
iso8601=False, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_prefix_based(self, temp_url):
@ -1676,7 +1676,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=True)
iso8601=False, prefix=True, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_in(self, temp_url):
@ -1688,7 +1688,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', d, 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False)
iso8601=False, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_out(self, temp_url):
@ -1697,7 +1697,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
iso8601=True, prefix=False)
iso8601=True, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_absolute_expiry_temp_url(self, temp_url):
@ -1706,7 +1706,16 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=True,
iso8601=False, prefix=False)
iso8601=False, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_with_ip_range(self, temp_url):
argv = ["", "tempurl", "GET", "60", "/v1/AUTH_account/c/o",
"secret_key", "--ip-range", "1.2.3.4"]
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
iso8601=False, prefix=False, ip_range='1.2.3.4')
def test_temp_url_output(self):
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
@ -1769,6 +1778,17 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
self.assertEqual(expected, output.out)
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
"secret_key", "--absolute", "--ip-range", "1.2.3.4"]
with CaptureOutput(suppress_systemexit=True) as output:
swiftclient.shell.main(argv)
sig = "6a6ec8efa4be53904ecba8d055d841e24a937c98"
expected = (
"/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60"
"&temp_url_ip_range=1.2.3.4\n" % sig
)
self.assertEqual(expected, output.out)
def test_temp_url_error_output(self):
expected = 'path must be full path to an object e.g. /v1/a/c/o\n'
for bad_path in ('/v1/a/c', 'v1/a/c/o', '/v1/a/c/', '/v1/a//o',

View File

@ -151,6 +151,54 @@ class TestTempURL(unittest.TestCase):
])
self.assertIsInstance(url, type(self.url))
@mock.patch('hmac.HMAC')
@mock.patch('time.time', return_value=1400000000)
def test_generate_temp_url_ip_range(self, time_mock, hmac_mock):
hmac_mock().hexdigest.return_value = 'temp_url_signature'
ip_ranges = [
'1.2.3.4', '1.2.3.4/24', '2001:db8::',
b'1.2.3.4', b'1.2.3.4/24', b'2001:db8::',
]
path = '/v1/AUTH_account/c/o/'
expected_url = path + ('?temp_url_sig=temp_url_signature'
'&temp_url_expires=1400003600'
'&temp_url_ip_range=')
for ip_range in ip_ranges:
hmac_mock.reset_mock()
url = u.generate_temp_url(path, self.seconds,
self.key, self.method,
ip_range=ip_range)
key = self.key
if not isinstance(key, six.binary_type):
key = key.encode('utf-8')
if isinstance(ip_range, six.binary_type):
ip_range_expected_url = (
expected_url + ip_range.decode('utf-8')
)
expected_body = '\n'.join([
'ip=' + ip_range.decode('utf-8'),
self.method,
'1400003600',
path,
]).encode('utf-8')
else:
ip_range_expected_url = expected_url + ip_range
expected_body = '\n'.join([
'ip=' + ip_range,
self.method,
'1400003600',
path,
]).encode('utf-8')
self.assertEqual(url, ip_range_expected_url)
self.assertEqual(hmac_mock.mock_calls, [
mock.call(key, expected_body, sha1),
mock.call().hexdigest(),
])
self.assertIsInstance(url, type(path))
@mock.patch('hmac.HMAC')
def test_generate_temp_url_iso8601_argument(self, hmac_mock):
hmac_mock().hexdigest.return_value = 'temp_url_signature'