From 168fdb7f304bf16ce0170f517442504617a29fc6 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 30 Jun 2020 13:40:25 -0700 Subject: [PATCH] Add a functional test for non-owned image copying This adds a functional test that confirms the current expected behavior of what happens when a user attempts to copy an image that they don't own (i.e. they get a 401). Change-Id: Id49f83a63e59c262ea20062c6f3aed50c56adb74 (cherry picked from commit 019afb789408bb0e179c849a72fd37a63202e2d1) --- glance/tests/functional/v2/test_images.py | 105 ++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/glance/tests/functional/v2/test_images.py b/glance/tests/functional/v2/test_images.py index f4a768404e..a2f8156a32 100644 --- a/glance/tests/functional/v2/test_images.py +++ b/glance/tests/functional/v2/test_images.py @@ -6570,3 +6570,108 @@ class TestMultiStoreImageMembers(functional.MultipleBackendFunctionalTest): self.skipTest("Remote connection closed abruptly: %s" % e.args[0]) self.stop_servers() + + +class TestCopyImagePermissions(functional.MultipleBackendFunctionalTest): + + def setUp(self): + super(TestCopyImagePermissions, self).setUp() + self.cleanup() + self.include_scrubber = False + self.api_server_multiple_backend.deployment_flavor = 'noauth' + + def _url(self, path): + return 'http://127.0.0.1:%d%s' % (self.api_port, path) + + def _headers(self, custom_headers=None): + base_headers = { + 'X-Identity-Status': 'Confirmed', + 'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96', + 'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e', + 'X-Tenant-Id': TENANT1, + 'X-Roles': 'member', + } + base_headers.update(custom_headers or {}) + return base_headers + + def _create_and_import_image_data(self): + # Create a public image + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json'}) + data = jsonutils.dumps({'name': 'image-1', 'type': 'kernel', + 'visibility': 'public', + 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(http.CREATED, response.status_code) + + image = jsonutils.loads(response.text) + image_id = image['id'] + + path = self._url('/v2/images/%s/import' % image_id) + headers = self._headers({ + 'content-type': 'application/json', + 'X-Roles': 'admin' + }) + + # Start http server locally + thread, httpd, port = test_utils.start_standalone_http_server() + + image_data_uri = 'http://localhost:%s/' % port + data = jsonutils.dumps( + {'method': {'name': 'web-download', 'uri': image_data_uri}, + 'stores': ['file1']}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(http.ACCEPTED, response.status_code) + + # Verify image is in active state and checksum is set + # NOTE(abhishekk): As import is a async call we need to provide + # some timelap to complete the call. + path = self._url('/v2/images/%s' % image_id) + func_utils.wait_for_status(request_path=path, + request_headers=self._headers(), + status='active', + max_sec=40, + delay_sec=0.2, + start_delay_sec=1) + with requests.get(image_data_uri) as r: + expect_c = six.text_type(hashlib.md5(r.content).hexdigest()) + expect_h = six.text_type(hashlib.sha512(r.content).hexdigest()) + func_utils.verify_image_hashes_and_status(self, + image_id, + checksum=expect_c, + os_hash_value=expect_h, + status='active') + + # kill the local http server + httpd.shutdown() + httpd.server_close() + + return image_id + + def test_copy_public_image_as_non_admin(self): + self.start_servers(**self.__dict__.copy()) + + # Create a publicly-visible image as TENANT1 + image_id = self._create_and_import_image_data() + + # Ensure image is created in the one store + path = self._url('/v2/images/%s' % image_id) + response = requests.get(path, headers=self._headers()) + self.assertEqual(http.OK, response.status_code) + self.assertEqual('file1', jsonutils.loads(response.text)['stores']) + + # Copy newly created image to file2 store as TENANT2 + path = self._url('/v2/images/%s/import' % image_id) + headers = self._headers({ + 'content-type': 'application/json', + }) + headers = get_auth_header(TENANT2, TENANT2, + role='member', headers=headers) + data = jsonutils.dumps( + {'method': {'name': 'copy-image'}, + 'stores': ['file2']}) + response = requests.post(path, headers=headers, data=data) + + # Expect failure to copy another user's image + self.assertEqual(http.FORBIDDEN, response.status_code)