Merge "Add a policy knob for allowing non-owned image copying"

This commit is contained in:
Zuul 2020-07-16 20:14:40 +00:00 committed by Gerrit Code Review
commit a826535905
4 changed files with 75 additions and 7 deletions

View File

@ -131,7 +131,14 @@ class ImagesController(object):
if not getattr(image, 'disk_format', None):
msg = _("'disk_format' needs to be set before import")
raise exception.Conflict(msg)
if not authorization.is_image_mutable(req.context, image):
# NOTE(danms): For copy-image only, we check policy to decide
# if the user should be able to do this. Otherwise, we forbid
# the import if the user is not the owner.
if import_method == 'copy-image':
self.policy.enforce(req.context, 'copy_image',
dict(policy.ImageTarget(image)))
elif not authorization.is_image_mutable(req.context, image):
raise webob.exc.HTTPForbidden(
explanation=_("Operation not permitted"))
@ -174,6 +181,8 @@ class ImagesController(object):
raise webob.exc.HTTPConflict(explanation=e.msg)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=e.msg)
if (not all_stores_must_succeed) and (not CONF.enabled_backends):
msg = (_("All_stores_must_succeed can only be set with "

View File

@ -40,6 +40,8 @@ image_policies = [
policy.RuleDefault(name="deactivate", check_str="rule:default"),
policy.RuleDefault(name="reactivate", check_str="rule:default"),
policy.RuleDefault(name="copy_image", check_str="role:admin"),
]

View File

@ -6628,7 +6628,7 @@ class TestCopyImagePermissions(functional.MultipleBackendFunctionalTest):
return image_id
def test_copy_public_image_as_non_admin(self):
def _test_copy_public_image_as_non_admin(self):
self.start_servers(**self.__dict__.copy())
# Create a publicly-visible image as TENANT1
@ -6651,6 +6651,67 @@ class TestCopyImagePermissions(functional.MultipleBackendFunctionalTest):
{'method': {'name': 'copy-image'},
'stores': ['file2']})
response = requests.post(path, headers=headers, data=data)
return image_id, response
def test_copy_public_image_as_non_admin(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted": "",
"download_image": "",
"add_member": "",
"publicize_image": "",
"copy_image": "role:admin",
}
self.set_policy_rules(rules)
image_id, response = self._test_copy_public_image_as_non_admin()
# Expect failure to copy another user's image
self.assertEqual(http.FORBIDDEN, response.status_code)
def test_copy_public_image_as_non_admin_permitted(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted": "",
"download_image": "",
"add_member": "",
"publicize_image": "",
"copy_image": "'public':%(visibility)s",
}
self.set_policy_rules(rules)
image_id, response = self._test_copy_public_image_as_non_admin()
# Expect success because image is public
self.assertEqual(http.ACCEPTED, response.status_code)
# Verify image is copied
# NOTE(abhishekk): As import is a async call we need to provide
# some timelap to complete the call.
path = self._url('/v2/images/%s' % image_id)
func_utils.wait_for_copying(request_path=path,
request_headers=self._headers(),
stores=['file2'],
max_sec=40,
delay_sec=0.2,
start_delay_sec=1)
# Ensure image is copied to the file2 and file3 store
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(http.OK, response.status_code)
self.assertIn('file2', jsonutils.loads(response.text)['stores'])

View File

@ -2961,12 +2961,10 @@ class TestImagesController(base.IsolatedUnitTest):
mock_new_task.assert_not_called()
@mock.patch('glance.context.RequestContext.elevated')
@mock.patch.object(glance.api.authorization, 'is_image_mutable')
@mock.patch.object(glance.domain.TaskFactory, 'new_task')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
def test_image_import_copy_allowed_by_policy(self, mock_get,
mock_new_task,
mock_policy_check,
mock_elevated,
allowed=True):
# NOTE(danms): FakeImage is owned by utils.TENANT1. Try to do the
@ -2974,9 +2972,7 @@ class TestImagesController(base.IsolatedUnitTest):
request = unit_test_utils.get_fake_request(tenant=TENANT2)
mock_get.return_value = FakeImage(status='active', locations=[])
# FIXME(danms): This should control whatever policy knob we make,
# but for now control the strict check we have.
mock_policy_check.return_value = allowed
self.policy.rules = {'copy_image': allowed}
req_body = {'method': {'name': 'copy-image'},
'stores': ['cheap']}