Move Tasks policy checks in the API

This change uses new policy module to enforce tasks_api_access
policy check and excludes policy enforcement from the policy layer.

NOTE: Since we are excluding policy enforcement at policy layer, removed
below unit tests as those were checking controller methods and we no longer
enforcing policies at policy layer.
test_create_task_unauthorized
test_index_unauthorized
test_get_unauthorized

Partially implements: blueprint policy-refactor

Change-Id: I670c076527b7bce55696b294d2283121dab5c707
This commit is contained in:
Abhishek Kekane 2021-07-26 07:34:02 +00:00
parent 434f881d8c
commit 32ea21f3fa
7 changed files with 170 additions and 37 deletions

View File

@ -325,3 +325,15 @@ class MemberAPIPolicy(APIPolicyBase):
def add_member(self):
self._enforce("add_member")
class TasksAPIPolicy(APIPolicyBase):
def __init__(self, context, target=None, enforcer=None):
self._context = context
self._target = target or {}
self.enforcer = enforcer or policy.Enforcer()
super(TasksAPIPolicy, self).__init__(context, target=self._target,
enforcer=self.enforcer)
def tasks_api_access(self):
self._enforce('tasks_api_access')

View File

@ -30,6 +30,7 @@ import webob.exc
from glance.api import common
from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import timeutils
from glance.common import wsgi
@ -70,9 +71,12 @@ class TasksController(object):
# NOTE(rosmaita): access to this call is enforced in the deserializer
ctxt = req.context
task_factory = self.gateway.get_task_factory(ctxt)
executor_factory = self.gateway.get_task_executor_factory(ctxt)
task_repo = self.gateway.get_task_repo(ctxt)
task_factory = self.gateway.get_task_factory(
ctxt, authorization_layer=False)
executor_factory = self.gateway.get_task_executor_factory(
ctxt, authorization_layer=False)
task_repo = self.gateway.get_task_repo(ctxt,
authorization_layer=False)
try:
new_task = task_factory.new_task(
task_type=task['type'],
@ -106,7 +110,8 @@ class TasksController(object):
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)
task_repo = self.gateway.get_task_stub_repo(req.context)
task_repo = self.gateway.get_task_stub_repo(
req.context, authorization_layer=False)
try:
tasks = task_repo.list(marker, limit, sort_key,
sort_dir, filters)
@ -126,7 +131,8 @@ class TasksController(object):
def get(self, req, task_id):
_enforce_access_policy(self.policy, req)
try:
task_repo = self.gateway.get_task_repo(req.context)
task_repo = self.gateway.get_task_repo(
req.context, authorization_layer=False)
task = task_repo.get(task_id)
except exception.NotFound as e:
msg = (_LW("Failed to find task %(task_id)s. Reason: %(reason)s")
@ -423,11 +429,9 @@ _TASK_SCHEMA = {
def _enforce_access_policy(policy_engine, request):
try:
policy_engine.enforce(request.context, 'tasks_api_access', {})
except exception.Forbidden:
LOG.debug("User does not have permission to access the Tasks API")
raise webob.exc.HTTPForbidden()
api_policy.TasksAPIPolicy(
request.context,
enforcer=policy_engine).tasks_api_access()
def get_task_schema():

View File

@ -162,9 +162,8 @@ class Gateway(object):
context, authorization_layer=authorization_layer)
image_repo = self.get_repo(context,
authorization_layer=authorization_layer)
# TODO(abhishekk): Pass authorization_layer here once provision
# is made in get_image_factory method.
image_factory = self.get_image_factory(context)
image_factory = self.get_image_factory(
context, authorization_layer=authorization_layer)
if admin_context:
admin_repo = self.get_repo(admin_context,
authorization_layer=authorization_layer)

View File

@ -0,0 +1,123 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_policy.policy
from oslo_serialization import jsonutils
from glance.api import policy
from glance.tests import functional
TASK1 = {
"type": "import",
"input": {
"import_from": "http://example.com",
"import_from_format": "qcow2",
"image_properties": {
'disk_format': 'vhd',
'container_format': 'ovf'
}
}
}
TASK2 = {
"type": "api_image_import",
"input": {
"import_from": "http://example.com",
"import_from_format": "qcow2",
"image_properties": {
'disk_format': 'vhd',
'container_format': 'ovf'
}
}
}
class TestTasksPolicy(functional.SynchronousAPIBase):
def setUp(self):
super(TestTasksPolicy, self).setUp()
self.policy = policy.Enforcer()
def set_policy_rules(self, rules):
self.policy.set_rules(
oslo_policy.policy.Rules.from_dict(rules),
overwrite=True)
def start_server(self):
with mock.patch.object(policy, 'Enforcer') as mock_enf:
mock_enf.return_value = self.policy
super(TestTasksPolicy, self).start_server()
def _create_task(self, path=None, data=None, expected_code=201):
if not path:
path = "/v2/tasks"
resp = self.api_post(path,
json=data)
task = jsonutils.loads(resp.text)
self.assertEqual(expected_code, resp.status_code)
return task
def load_data(self):
tasks = []
for task in [TASK1, TASK2]:
resp = self._create_task(data=task)
tasks.append(resp['id'])
self.assertEqual(task['type'], resp['type'])
return tasks
def test_tasks_create_basic(self):
self.start_server()
# First make sure create tasks works with default policy
path = '/v2/tasks'
task = self._create_task(path=path, data=TASK1)
self.assertEqual('import', task['type'])
# Now disable tasks_api_access permissions and make sure any other
# attempts fail
self.set_policy_rules({'tasks_api_access': '!'})
resp = self.api_post(path, json=TASK2)
self.assertEqual(403, resp.status_code)
def test_tasks_index_basic(self):
self.start_server()
# First make sure get tasks works with default policy
tasks = self.load_data()
path = '/v2/tasks'
output = self.api_get(path).json
self.assertEqual(len(tasks), len(output['tasks']))
# Now disable tasks_api_access permissions and make sure any other
# attempts fail
self.set_policy_rules({'tasks_api_access': '!'})
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)
def test_tasks_get_basic(self):
self.start_server()
# First make sure get task works with default policy
tasks = self.load_data()
path = '/v2/tasks/%s' % tasks[0]
task = self.api_get(path).json
self.assertEqual('import', task['type'])
# Now disable tasks_api_access permissions and make sure any other
# attempts fail
self.set_policy_rules({'tasks_api_access': '!'})
path = '/v2/tasks/%s' % tasks[1]
resp = self.api_get(path)
self.assertEqual(403, resp.status_code)

View File

@ -42,7 +42,8 @@ class TestGateway(test_utils.BaseTestCase):
self.context, authorization_layer=True)
mock_gr.assert_called_once_with(
self.context, authorization_layer=True)
mock_gif.assert_called_once_with(self.context)
mock_gif.assert_called_once_with(
self.context, authorization_layer=True)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock_gr.return_value,
@ -69,7 +70,8 @@ class TestGateway(test_utils.BaseTestCase):
mock.call(mock.sentinel.admin_context,
authorization_layer=True),
])
mock_gif.assert_called_once_with(self.context)
mock_gif.assert_called_once_with(
self.context, authorization_layer=True)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock.sentinel.image_repo,

View File

@ -476,20 +476,6 @@ class TestTasksControllerPolicies(base.IsolatedUnitTest):
self.controller = glance.api.v2.tasks.TasksController(self.db,
self.policy)
def test_index_unauthorized(self):
rules = {"get_tasks": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.index,
request)
def test_get_unauthorized(self):
rules = {"get_task": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.get,
request, task_id=UUID2)
def test_access_get_unauthorized(self):
rules = {"tasks_api_access": False,
"get_task": True}
@ -498,14 +484,6 @@ class TestTasksControllerPolicies(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden, self.controller.get,
request, task_id=UUID2)
def test_create_task_unauthorized(self):
rules = {"add_task": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
task = {'type': 'import', 'input': {"import_from": "fake"}}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
request, task)
def test_delete(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPMethodNotAllowed,

View File

@ -650,3 +650,18 @@ class TestMemberAPIPolicy(utils.BaseTestCase):
mock_enf.assert_has_calls([
mock.call(mock.ANY, 'get_image', mock.ANY),
mock.call(mock.ANY, 'modify_member', mock.ANY)])
class TestTasksAPIPolicy(APIPolicyBase):
def setUp(self):
super(TestTasksAPIPolicy, self).setUp()
self.enforcer = mock.MagicMock()
self.context = mock.MagicMock()
self.policy = policy.TasksAPIPolicy(self.context,
enforcer=self.enforcer)
def test_tasks_api_access(self):
self.policy.tasks_api_access()
self.enforcer.enforce.assert_called_once_with(self.context,
'tasks_api_access',
mock.ANY)