Move 'upload_image' policy check to the controller

There are two methods to create images:-

Method A)
POST /v2/images
PUT /v2/images/{image_id}/file

Method B)
POST /v2/images
PUT /v2/images/{image_id}/stage
POST /v2/images/{image_id}/import

The traditional image upload API (PUT /v2/images/{image_id}/file)
uses 'upload_image' policy which is same for
Method B (POST /v2/images/{image_id}/import)
image-create-via-import(new API for image create) API.

The long term goal is to make users use method B to create images
and cross services to use Method A until changes are made to
use Method B.
To restrict normal users from using Method A to create images both
these APIs (/v2/images/{image_id}/file and /v2/images/{image_id}/import)
should have a distinct policy.

This patch move the 'upload_image' policy check from imge.set_data()
to the controller and not introduce any new policies at this point
for import API call (POST /v2/images/{image_id}/import)
on the theory that an operator can stop import by restricting the
'image_create' policy. And also this fix will not change the semantics
of the 'upload_image' policy from the operator perspective.

Closes-Bug: #1732141
Change-Id: Icc62add5f8d48549aac94c8058d66d6b77b56d41
This commit is contained in:
bhagyashris 2017-11-21 12:55:50 +05:30
parent 1aadd06320
commit 89feef0e2f
5 changed files with 35 additions and 24 deletions

View File

@ -190,7 +190,6 @@ class ImageProxy(glance.domain.proxy.Image):
return self.image.get_data(*args, **kwargs) return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs): def set_data(self, *args, **kwargs):
self.policy.enforce(self.context, 'upload_image', self.target)
return self.image.set_data(*args, **kwargs) return self.image.set_data(*args, **kwargs)

View File

@ -40,16 +40,13 @@ CONF = cfg.CONF
class ImageDataController(object): class ImageDataController(object):
def __init__(self, db_api=None, store_api=None, def __init__(self, db_api=None, store_api=None,
policy_enforcer=None, notifier=None, policy_enforcer=None, notifier=None):
gateway=None): db_api = db_api or glance.db.get_api()
if gateway is None: store_api = store_api or glance_store
db_api = db_api or glance.db.get_api() notifier = notifier or glance.notifier.Notifier()
store_api = store_api or glance_store self.policy = policy_enforcer or glance.api.policy.Enforcer()
policy = policy_enforcer or glance.api.policy.Enforcer() self.gateway = glance.gateway.Gateway(db_api, store_api,
notifier = notifier or glance.notifier.Notifier() notifier, self.policy)
gateway = glance.gateway.Gateway(db_api, store_api,
notifier, policy)
self.gateway = gateway
def _restore(self, image_repo, image): def _restore(self, image_repo, image):
""" """
@ -108,6 +105,7 @@ class ImageDataController(object):
refresher = None refresher = None
cxt = req.context cxt = req.context
try: try:
self.policy.enforce(cxt, 'upload_image', {})
image = image_repo.get(image_id) image = image_repo.get(image_id)
image.status = 'saving' image.status = 'saving'
try: try:

View File

@ -407,13 +407,6 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.policy.enforce.assert_called_once_with({}, "download_image", self.policy.enforce.assert_called_once_with({}, "download_image",
target) target)
def test_image_set_data(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.set_data)
self.policy.enforce.assert_called_once_with({}, "upload_image",
image.target)
class TestMemberPolicy(test_utils.BaseTestCase): class TestMemberPolicy(test_utils.BaseTestCase):
def setUp(self): def setUp(self):

View File

@ -109,8 +109,10 @@ class FakeDB(object):
images = [ images = [
{'id': UUID1, 'owner': TENANT1, 'status': 'queued', {'id': UUID1, 'owner': TENANT1, 'status': 'queued',
'locations': [{'url': '%s/%s' % (BASE_URI, UUID1), 'locations': [{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}, 'status': 'queued'}]}, 'metadata': {}, 'status': 'queued'}],
{'id': UUID2, 'owner': TENANT1, 'status': 'queued'}, 'disk_format': 'raw', 'container_format': 'bare'},
{'id': UUID2, 'owner': TENANT1, 'status': 'queued',
'disk_format': 'raw', 'container_format': 'bare'},
] ]
[simple_db.image_create(None, image) for image in images] [simple_db.image_create(None, image) for image in images]

View File

@ -90,7 +90,12 @@ class FakeImageRepo(object):
class FakeGateway(object): class FakeGateway(object):
def __init__(self, repo): def __init__(self, db=None, store=None, notifier=None,
policy=None, repo=None):
self.db = db
self.store = store
self.notifier = notifier
self.policy = policy
self.repo = repo self.repo = repo
def get_repo(self, context): def get_repo(self, context):
@ -103,9 +108,13 @@ class TestImagesController(base.StoreClearingUnitTest):
self.config(debug=True) self.config(debug=True)
self.image_repo = FakeImageRepo() self.image_repo = FakeImageRepo()
self.gateway = FakeGateway(self.image_repo) db = unit_test_utils.FakeDB()
self.controller = glance.api.v2.image_data.ImageDataController( policy = unit_test_utils.FakePolicyEnforcer()
gateway=self.gateway) notifier = unit_test_utils.FakeNotifier()
store = unit_test_utils.FakeStoreAPI()
self.controller = glance.api.v2.image_data.ImageDataController()
self.controller.gateway = FakeGateway(db, store, notifier, policy,
self.image_repo)
def test_download(self): def test_download(self):
request = unit_test_utils.get_fake_request() request = unit_test_utils.get_fake_request()
@ -191,6 +200,16 @@ class TestImagesController(base.StoreClearingUnitTest):
self.assertEqual('YYYY', image.data) self.assertEqual('YYYY', image.data)
self.assertIsNone(image.size) self.assertIsNone(image.size)
@mock.patch.object(glance.api.policy.Enforcer, 'enforce')
def test_upload_image_forbidden(self, mock_enforce):
request = unit_test_utils.get_fake_request()
mock_enforce.side_effect = exception.Forbidden
self.assertRaises(webob.exc.HTTPForbidden, self.controller.upload,
request, unit_test_utils.UUID2, 'YYYY', 4)
mock_enforce.assert_called_once_with(request.context,
"upload_image",
{})
def test_upload_invalid(self): def test_upload_invalid(self):
request = unit_test_utils.get_fake_request() request = unit_test_utils.get_fake_request()
image = FakeImage('abcd') image = FakeImage('abcd')