From 47fb18c41b4851ba6071f0215e96e222b8ccef29 Mon Sep 17 00:00:00 2001 From: mmcardle Date: Tue, 10 Jul 2018 14:45:32 +0100 Subject: [PATCH] Add ability to generate a temporary URL with an IP range restriction Change-Id: I4734599886e4f4a563162390d0ff3bb1ef639db4 --- swiftclient/shell.py | 11 ++++++++- swiftclient/utils.py | 25 ++++++++++++++++++--- tests/unit/test_shell.py | 30 ++++++++++++++++++++----- tests/unit/test_utils.py | 48 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 9 deletions(-) diff --git a/swiftclient/shell.py b/swiftclient/shell.py index e91a16ff..74a47b70 100755 --- a/swiftclient/shell.py +++ b/swiftclient/shell.py @@ -1325,6 +1325,8 @@ Optional arguments: generated. --iso8601 If present, the generated temporary URL will contain an ISO 8601 UTC timestamp instead of a Unix timestamp. + --ip-range If present, the temporary URL will be restricted to the + given ip or ip range. '''.strip('\n') @@ -1348,6 +1350,12 @@ def st_tempurl(parser, args, thread_manager): help=("If present, the temporary URL will contain an ISO 8601 UTC " "timestamp instead of a Unix timestamp."), ) + parser.add_argument( + '--ip-range', action='store', + default=None, + help=("If present, the temporary URL will be restricted to the " + "given ip or ip range."), + ) (options, args) = parse_args(parser, args) args = args[1:] @@ -1367,7 +1375,8 @@ def st_tempurl(parser, args, thread_manager): path = generate_temp_url(parsed.path, timestamp, key, method, absolute=options['absolute_expiry'], iso8601=options['iso8601'], - prefix=options['prefix_based']) + prefix=options['prefix_based'], + ip_range=options['ip_range']) except ValueError as err: thread_manager.error(err) return diff --git a/swiftclient/utils.py b/swiftclient/utils.py index 8afcde97..5c17c613 100644 --- a/swiftclient/utils.py +++ b/swiftclient/utils.py @@ -69,7 +69,7 @@ def prt_bytes(num_bytes, human_flag): def generate_temp_url(path, seconds, key, method, absolute=False, - prefix=False, iso8601=False): + prefix=False, iso8601=False, ip_range=None): """Generates a temporary URL that gives unauthenticated access to the Swift object. @@ -92,6 +92,8 @@ def generate_temp_url(path, seconds, key, method, absolute=False, :param prefix: if True then a prefix-based temporary URL will be generated. :param iso8601: if True, a URL containing an ISO 8601 UTC timestamp instead of a UNIX timestamp will be created. + :param ip_range: if a valid ip range, restricts the temporary URL to the + range of ips. :raises ValueError: if timestamp or path is not in valid format. :return: the path portion of a temporary URL """ @@ -155,8 +157,21 @@ def generate_temp_url(path, seconds, key, method, absolute=False, expiration = int(time.time() + timestamp) else: expiration = timestamp - hmac_body = u'\n'.join([method.upper(), str(expiration), - ('prefix:' if prefix else '') + path_for_body]) + + hmac_parts = [method.upper(), str(expiration), + ('prefix:' if prefix else '') + path_for_body] + + if ip_range: + if isinstance(ip_range, six.binary_type): + try: + ip_range = ip_range.decode('utf-8') + except UnicodeDecodeError: + raise ValueError( + 'ip_range must be representable as UTF-8' + ) + hmac_parts.insert(0, "ip=%s" % ip_range) + + hmac_body = u'\n'.join(hmac_parts) # Encode to UTF-8 for py3 compatibility if not isinstance(key, six.binary_type): @@ -169,6 +184,10 @@ def generate_temp_url(path, seconds, key, method, absolute=False, temp_url = u'{path}?temp_url_sig={sig}&temp_url_expires={exp}'.format( path=path_for_body, sig=sig, exp=expiration) + + if ip_range: + temp_url += u'&temp_url_ip_range={}'.format(ip_range) + if prefix: temp_url += u'&temp_url_prefix={}'.format(parts[4]) # Have return type match path from caller diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index 3db48a48..8c995e5d 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -1667,7 +1667,7 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) temp_url.assert_called_with( '/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False, - iso8601=False, prefix=False) + iso8601=False, prefix=False, ip_range=None) @mock.patch('swiftclient.shell.generate_temp_url', return_value='') def test_temp_url_prefix_based(self, temp_url): @@ -1676,7 +1676,7 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) temp_url.assert_called_with( '/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False, - iso8601=False, prefix=True) + iso8601=False, prefix=True, ip_range=None) @mock.patch('swiftclient.shell.generate_temp_url', return_value='') def test_temp_url_iso8601_in(self, temp_url): @@ -1688,7 +1688,7 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) temp_url.assert_called_with( '/v1/AUTH_account/c/', d, 'secret_key', 'GET', absolute=False, - iso8601=False, prefix=False) + iso8601=False, prefix=False, ip_range=None) @mock.patch('swiftclient.shell.generate_temp_url', return_value='') def test_temp_url_iso8601_out(self, temp_url): @@ -1697,7 +1697,7 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) temp_url.assert_called_with( '/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False, - iso8601=True, prefix=False) + iso8601=True, prefix=False, ip_range=None) @mock.patch('swiftclient.shell.generate_temp_url', return_value='') def test_absolute_expiry_temp_url(self, temp_url): @@ -1706,7 +1706,16 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) temp_url.assert_called_with( '/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=True, - iso8601=False, prefix=False) + iso8601=False, prefix=False, ip_range=None) + + @mock.patch('swiftclient.shell.generate_temp_url', return_value='') + def test_temp_url_with_ip_range(self, temp_url): + argv = ["", "tempurl", "GET", "60", "/v1/AUTH_account/c/o", + "secret_key", "--ip-range", "1.2.3.4"] + swiftclient.shell.main(argv) + temp_url.assert_called_with( + '/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False, + iso8601=False, prefix=False, ip_range='1.2.3.4') def test_temp_url_output(self): argv = ["", "tempurl", "GET", "60", "/v1/a/c/o", @@ -1769,6 +1778,17 @@ class TestShell(unittest.TestCase): swiftclient.shell.main(argv) self.assertEqual(expected, output.out) + argv = ["", "tempurl", "GET", "60", "/v1/a/c/o", + "secret_key", "--absolute", "--ip-range", "1.2.3.4"] + with CaptureOutput(suppress_systemexit=True) as output: + swiftclient.shell.main(argv) + sig = "6a6ec8efa4be53904ecba8d055d841e24a937c98" + expected = ( + "/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60" + "&temp_url_ip_range=1.2.3.4\n" % sig + ) + self.assertEqual(expected, output.out) + def test_temp_url_error_output(self): expected = 'path must be full path to an object e.g. /v1/a/c/o\n' for bad_path in ('/v1/a/c', 'v1/a/c/o', '/v1/a/c/', '/v1/a//o', diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index adead005..e54b90c7 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -151,6 +151,54 @@ class TestTempURL(unittest.TestCase): ]) self.assertIsInstance(url, type(self.url)) + @mock.patch('hmac.HMAC') + @mock.patch('time.time', return_value=1400000000) + def test_generate_temp_url_ip_range(self, time_mock, hmac_mock): + hmac_mock().hexdigest.return_value = 'temp_url_signature' + ip_ranges = [ + '1.2.3.4', '1.2.3.4/24', '2001:db8::', + b'1.2.3.4', b'1.2.3.4/24', b'2001:db8::', + ] + path = '/v1/AUTH_account/c/o/' + expected_url = path + ('?temp_url_sig=temp_url_signature' + '&temp_url_expires=1400003600' + '&temp_url_ip_range=') + for ip_range in ip_ranges: + hmac_mock.reset_mock() + url = u.generate_temp_url(path, self.seconds, + self.key, self.method, + ip_range=ip_range) + key = self.key + if not isinstance(key, six.binary_type): + key = key.encode('utf-8') + + if isinstance(ip_range, six.binary_type): + ip_range_expected_url = ( + expected_url + ip_range.decode('utf-8') + ) + expected_body = '\n'.join([ + 'ip=' + ip_range.decode('utf-8'), + self.method, + '1400003600', + path, + ]).encode('utf-8') + else: + ip_range_expected_url = expected_url + ip_range + expected_body = '\n'.join([ + 'ip=' + ip_range, + self.method, + '1400003600', + path, + ]).encode('utf-8') + + self.assertEqual(url, ip_range_expected_url) + + self.assertEqual(hmac_mock.mock_calls, [ + mock.call(key, expected_body, sha1), + mock.call().hexdigest(), + ]) + self.assertIsInstance(url, type(path)) + @mock.patch('hmac.HMAC') def test_generate_temp_url_iso8601_argument(self, hmac_mock): hmac_mock().hexdigest.return_value = 'temp_url_signature'