From ee8a69d50675097ec58dcb61c84966fe6255eb80 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 30 Jun 2020 14:42:03 -0700 Subject: [PATCH] Add a policy knob for allowing non-owned image copying This adds a copy_image policy knob which can be used to grant users the ability to copy images other than their own using the new functionality just added to the api_image_import task. By default, only admins are allowed to do this. A functional test modification is included to show that users can be granted permission to do this based on something like the "public visibility" attribute of an image. Change-Id: Idebf66e2944bcddb7a5c76b81e47c654b401c2a8 --- glance/api/v2/images.py | 11 +++- glance/policies/image.py | 2 + glance/tests/functional/v2/test_images.py | 63 +++++++++++++++++++- glance/tests/unit/v2/test_images_resource.py | 6 +- 4 files changed, 75 insertions(+), 7 deletions(-) diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 5b4b2f93a0..c18e635572 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -131,7 +131,14 @@ class ImagesController(object): if not getattr(image, 'disk_format', None): msg = _("'disk_format' needs to be set before import") raise exception.Conflict(msg) - if not authorization.is_image_mutable(req.context, image): + + # NOTE(danms): For copy-image only, we check policy to decide + # if the user should be able to do this. Otherwise, we forbid + # the import if the user is not the owner. + if import_method == 'copy-image': + self.policy.enforce(req.context, 'copy_image', + dict(policy.ImageTarget(image))) + elif not authorization.is_image_mutable(req.context, image): raise webob.exc.HTTPForbidden( explanation=_("Operation not permitted")) @@ -174,6 +181,8 @@ class ImagesController(object): raise webob.exc.HTTPConflict(explanation=e.msg) except exception.NotFound as e: raise webob.exc.HTTPNotFound(explanation=e.msg) + except exception.Forbidden as e: + raise webob.exc.HTTPForbidden(explanation=e.msg) if (not all_stores_must_succeed) and (not CONF.enabled_backends): msg = (_("All_stores_must_succeed can only be set with " diff --git a/glance/policies/image.py b/glance/policies/image.py index 77ddd09ded..2a1ad72686 100644 --- a/glance/policies/image.py +++ b/glance/policies/image.py @@ -40,6 +40,8 @@ image_policies = [ policy.RuleDefault(name="deactivate", check_str="rule:default"), policy.RuleDefault(name="reactivate", check_str="rule:default"), + + policy.RuleDefault(name="copy_image", check_str="role:admin"), ] diff --git a/glance/tests/functional/v2/test_images.py b/glance/tests/functional/v2/test_images.py index a0817a656d..d2c667ffc1 100644 --- a/glance/tests/functional/v2/test_images.py +++ b/glance/tests/functional/v2/test_images.py @@ -6628,7 +6628,7 @@ class TestCopyImagePermissions(functional.MultipleBackendFunctionalTest): return image_id - def test_copy_public_image_as_non_admin(self): + def _test_copy_public_image_as_non_admin(self): self.start_servers(**self.__dict__.copy()) # Create a publicly-visible image as TENANT1 @@ -6651,6 +6651,67 @@ class TestCopyImagePermissions(functional.MultipleBackendFunctionalTest): {'method': {'name': 'copy-image'}, 'stores': ['file2']}) response = requests.post(path, headers=headers, data=data) + return image_id, response + def test_copy_public_image_as_non_admin(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": "", + "download_image": "", + "add_member": "", + "publicize_image": "", + "copy_image": "role:admin", + } + + self.set_policy_rules(rules) + + image_id, response = self._test_copy_public_image_as_non_admin() # Expect failure to copy another user's image self.assertEqual(http.FORBIDDEN, response.status_code) + + def test_copy_public_image_as_non_admin_permitted(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": "", + "download_image": "", + "add_member": "", + "publicize_image": "", + "copy_image": "'public':%(visibility)s", + } + + self.set_policy_rules(rules) + + image_id, response = self._test_copy_public_image_as_non_admin() + # Expect success because image is public + self.assertEqual(http.ACCEPTED, response.status_code) + + # Verify image is copied + # NOTE(abhishekk): As import is a async call we need to provide + # some timelap to complete the call. + path = self._url('/v2/images/%s' % image_id) + func_utils.wait_for_copying(request_path=path, + request_headers=self._headers(), + stores=['file2'], + max_sec=40, + delay_sec=0.2, + start_delay_sec=1) + + # Ensure image is copied to the file2 and file3 store + path = self._url('/v2/images/%s' % image_id) + response = requests.get(path, headers=self._headers()) + self.assertEqual(http.OK, response.status_code) + self.assertIn('file2', jsonutils.loads(response.text)['stores']) diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index b7369fb4fb..762f293cd8 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -2961,12 +2961,10 @@ class TestImagesController(base.IsolatedUnitTest): mock_new_task.assert_not_called() @mock.patch('glance.context.RequestContext.elevated') - @mock.patch.object(glance.api.authorization, 'is_image_mutable') @mock.patch.object(glance.domain.TaskFactory, 'new_task') @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') def test_image_import_copy_allowed_by_policy(self, mock_get, mock_new_task, - mock_policy_check, mock_elevated, allowed=True): # NOTE(danms): FakeImage is owned by utils.TENANT1. Try to do the @@ -2974,9 +2972,7 @@ class TestImagesController(base.IsolatedUnitTest): request = unit_test_utils.get_fake_request(tenant=TENANT2) mock_get.return_value = FakeImage(status='active', locations=[]) - # FIXME(danms): This should control whatever policy knob we make, - # but for now control the strict check we have. - mock_policy_check.return_value = allowed + self.policy.rules = {'copy_image': allowed} req_body = {'method': {'name': 'copy-image'}, 'stores': ['cheap']}