Check deactivate, reactivate policy in the API

Move deactivate/reactivate policy changes in API layer.

Partially-Implements: blueprint policy-refactor

Change-Id: Ia6e51228aee1ddf09734f922c5f6f3eb4ece60c1
This commit is contained in:
Abhishek Kekane
2021-06-28 09:02:28 +00:00
parent fa6f871e20
commit 82111150d6
5 changed files with 327 additions and 6 deletions

View File

@@ -18,6 +18,7 @@ from six.moves import http_client as http
import webob.exc
from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import utils
from glance.common import wsgi
@@ -42,9 +43,20 @@ class ImageActionsController(object):
@utils.mutating
def deactivate(self, req, image_id):
image_repo = self.gateway.get_repo(req.context)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
try:
# FIXME(danms): This will still enforce the get_image policy
# which we don't want
image = image_repo.get(image_id)
# NOTE(abhishekk): This is the right place to check whether user
# have permission to deactivate the image and remove the policy
# check later from the policy layer.
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
api_pol.deactivate_image()
status = image.status
image.deactivate()
# not necessary to change the status if it's already 'deactivated'
@@ -61,9 +73,20 @@ class ImageActionsController(object):
@utils.mutating
def reactivate(self, req, image_id):
image_repo = self.gateway.get_repo(req.context)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
try:
# FIXME(danms): This will still enforce the get_image policy
# which we don't want
image = image_repo.get(image_id)
# NOTE(abhishekk): This is the right place to check whether user
# have permission to reactivate the image and remove the policy
# check later from the policy layer.
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
api_pol.reactivate_image()
status = image.status
image.reactivate()
# not necessary to change the status if it's already 'active'

View File

@@ -166,6 +166,20 @@ class ImageAPIPolicy(APIPolicyBase):
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
def deactivate_image(self):
self._enforce('deactivate')
# TODO(danms): Remove this legacy fallback when secure RBAC
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
def reactivate_image(self):
self._enforce('reactivate')
# TODO(danms): Remove this legacy fallback when secure RBAC
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
class MetadefAPIPolicy(APIPolicyBase):
def __init__(self, context, md_resource=None, target=None, enforcer=None):

View File

@@ -1741,11 +1741,18 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
'/v2/images/%s/import' % image_id,
json=body)
def _create_and_upload(self, data_iter=None, expected_code=204):
def _create_and_upload(self, data_iter=None, expected_code=204,
visibility=None):
data = {
'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw'
}
if visibility:
data['visibility'] = visibility
resp = self.api_post('/v2/images',
json={'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw'})
json=data)
self.assertEqual(201, resp.status_code, resp.text)
image = jsonutils.loads(resp.text)

View File

@@ -422,3 +422,206 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
headers=headers,
data=b'IMAGEDATA')
self.assertEqual(403, resp.status_code)
def test_image_deactivate(self):
self.start_server()
image_id = self._create_and_upload()
# Make sure we can deactivate the image
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure it is really deactivated
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual('deactivated', resp.json['status'])
# Create another image
image_id = self._create_and_upload()
# Now disable deactivate permissions, but allow get_image
self.set_policy_rules({'get_image': '',
'deactivate': '!'})
# Make sure deactivate returns 403 because we can see the image,
# just not deactivate it
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(403, resp.status_code)
# Now disable deactivate permissions, including get_image
self.set_policy_rules({'get_image': '!',
'deactivate': '!'})
# Make sure deactivate returns 404 because we can not see nor
# reactivate it
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(404, resp.status_code)
# Now allow deactivate, but disallow get_image, just to prove that
# you do not need get_image in order to be granted deactivate, and
# that we only use it for error code determination if
# permission is denied.
self.set_policy_rules({'get_image': '!',
'deactivate': ''})
# Make sure deactivate returns 204 because even though we can not
# see the image, we can deactivate it
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure you can not deactivate image using non-admin role of
# different project
self.set_policy_rules({
'get_image': '',
'modify_image': '',
'add_image': '',
'upload_image': '',
'add_member': '',
'deactivate': '',
'publicize_image': '',
'communitize_image': ''
})
headers = self._headers({
'X-Project-Id': 'fake-project-id',
'X-Roles': 'member'
})
for visibility in ('community', 'shared', 'private', 'public'):
image_id = self._create_and_upload(visibility=visibility)
resp = self.api_post(
'/v2/images/%s/actions/deactivate' % image_id, headers=headers)
# 'shared' image will return 404 until it is not shared with
# project accessing it
if visibility == 'shared':
self.assertEqual(404, resp.status_code)
# Now lets share the image and try to deactivate it
share_path = '/v2/images/%s/members' % image_id
data = {
'member': 'fake-project-id'
}
response = self.api_post(share_path, json=data)
member = response.json
self.assertEqual(200, response.status_code)
self.assertEqual(image_id, member['image_id'])
# Now ensure deactivating image by another tenant will
# return 403
resp = self.api_post(
'/v2/images/%s/actions/deactivate' % image_id,
headers=headers)
self.assertEqual(403, resp.status_code)
elif visibility == 'private':
# private image will also return 404 as it is not visible
self.assertEqual(404, resp.status_code)
else:
# public and community visibility will return 403
self.assertEqual(403, resp.status_code)
def test_image_reactivate(self):
self.start_server()
image_id = self._create_and_upload()
# deactivate the image
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure it is really deactivated
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual('deactivated', resp.json['status'])
# Make sure you can reactivate the image
resp = self.api_post('/v2/images/%s/actions/reactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure it is really reactivated
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual('active', resp.json['status'])
# Deactivate it again to test further scenarios
resp = self.api_post('/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Now disable reactivate permissions, but allow get_image
self.set_policy_rules({'get_image': '',
'reactivate': '!'})
# Make sure reactivate returns 403 because we can see the image,
# just not reactivate it
resp = self.api_post('/v2/images/%s/actions/reactivate' % image_id)
self.assertEqual(403, resp.status_code)
# Now disable reactivate permissions, including get_image
self.set_policy_rules({'get_image': '!',
'reactivate': '!'})
# Make sure reactivate returns 404 because we can not see nor
# reactivate it
resp = self.api_post('/v2/images/%s/actions/reactivate' % image_id)
self.assertEqual(404, resp.status_code)
# Now allow reactivate, but disallow get_image, just to prove that
# you do not need get_image in order to be granted reactivate, and
# that we only use it for error code determination if
# permission is denied.
self.set_policy_rules({'get_image': '!',
'reactivate': ''})
# Make sure reactivate returns 204 because even though we can not
# see the image, we can reactivate it
resp = self.api_post('/v2/images/%s/actions/reactivate' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure you can not reactivate image using non-admin role of
# different project
self.set_policy_rules({
'get_image': '',
'modify_image': '',
'add_image': '',
'upload_image': '',
'add_member': '',
'deactivate': '',
'reactivate': '',
'publicize_image': '',
'communitize_image': ''
})
headers = self._headers({
'X-Project-Id': 'fake-project-id',
'X-Roles': 'member'
})
for visibility in ('public', 'community', 'shared', 'private'):
image_id = self._create_and_upload(visibility=visibility)
# deactivate the image
resp = self.api_post(
'/v2/images/%s/actions/deactivate' % image_id)
self.assertEqual(204, resp.status_code)
# try to reactivate the image
resp = self.api_post(
'/v2/images/%s/actions/reactivate' % image_id, headers=headers)
# 'shared' image will return 404 until it is not shared with
# project accessing it
if visibility == 'shared':
self.assertEqual(404, resp.status_code)
# Now lets share the image and try to reactivate it
share_path = '/v2/images/%s/members' % image_id
data = {
'member': 'fake-project-id'
}
response = self.api_post(share_path, json=data)
member = response.json
self.assertEqual(200, response.status_code)
self.assertEqual(image_id, member['image_id'])
# Now ensure reactivating image by another tenant will
# return 403
resp = self.api_post(
'/v2/images/%s/actions/reactivate' % image_id,
headers=headers)
self.assertEqual(403, resp.status_code)
elif visibility == 'private':
# private image will also return 404 as it is not visible
self.assertEqual(404, resp.status_code)
else:
# public and community visibility will return 403
self.assertEqual(403, resp.status_code)

View File

@@ -303,6 +303,80 @@ class APIImagePolicy(APIPolicyBase):
self.policy.modify_image()
self.assertFalse(m.called)
def test_deactivate_image(self):
self.policy.deactivate_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'deactivate',
mock.ANY)
def test_deactivate_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
# As admin, image is mutable even if owner does not match
self.context.is_admin = True
self.context.owner = 'someuser'
self.image.owner = 'someotheruser'
self.policy.deactivate_image()
# As non-admin, owner matches, so we're good
self.context.is_admin = False
self.context.owner = 'someuser'
self.image.owner = 'someuser'
self.policy.delete_image()
# If owner does not match, we fail
self.image.owner = 'someotheruser'
self.assertRaises(exception.Forbidden,
self.policy.deactivate_image)
# Make sure we are checking the legacy handler
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.deactivate_image()
m.assert_called_once_with(self.context, self.image)
# Make sure we are not checking it if enforce_secure_rbac=True
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.deactivate_image()
self.assertFalse(m.called)
def test_reactivate_image(self):
self.policy.reactivate_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'reactivate',
mock.ANY)
def test_reactivate_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
# As admin, image is mutable even if owner does not match
self.context.is_admin = True
self.context.owner = 'someuser'
self.image.owner = 'someotheruser'
self.policy.reactivate_image()
# As non-admin, owner matches, so we're good
self.context.is_admin = False
self.context.owner = 'someuser'
self.image.owner = 'someuser'
self.policy.delete_image()
# If owner does not match, we fail
self.image.owner = 'someotheruser'
self.assertRaises(exception.Forbidden,
self.policy.reactivate_image)
# Make sure we are checking the legacy handler
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.reactivate_image()
m.assert_called_once_with(self.context, self.image)
# Make sure we are not checking it if enforce_secure_rbac=True
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.reactivate_image()
self.assertFalse(m.called)
class TestMetadefAPIPolicy(APIPolicyBase):
def setUp(self):