Pecan: process filters at end of hook pipeline

Separate user-applied filters from policy-enforcement
filters processing and put it at the end of the pipeline
so users can't put filters on the API request that impact
the fields available to the hooks.

This prevents a filter excluding the ID field from breaking
pagination.

Change-Id: I05f4582fb1e8809740d473e24fa54483e040a6c8
Closes-Bug: #1714131
This commit is contained in:
Kevin Benton 2017-08-30 19:46:39 -07:00
parent 6b650944e2
commit 41e6f02bf2
5 changed files with 68 additions and 6 deletions

View File

@ -25,9 +25,13 @@ def versions_factory(global_config, **local_config):
def v2_factory(global_config, **local_config):
# Processing Order:
# As request enters lower priority called before higher.
# Reponse from controller is passed from higher priority to lower.
app_hooks = [
hooks.ExceptionTranslationHook(), # priority 100
hooks.UserFilterHook(), # priority 90
hooks.ContextHook(), # priority 95
hooks.ExceptionTranslationHook(), # priority 100
hooks.BodyValidationHook(), # priority 120
hooks.OwnershipValidationHook(), # priority 125
hooks.QuotaEnforcementHook(), # priority 130

View File

@ -21,6 +21,7 @@ from neutron.pecan_wsgi.hooks import policy_enforcement
from neutron.pecan_wsgi.hooks import query_parameters
from neutron.pecan_wsgi.hooks import quota_enforcement
from neutron.pecan_wsgi.hooks import translation
from neutron.pecan_wsgi.hooks import userfilters
ExceptionTranslationHook = translation.ExceptionTranslationHook
@ -31,3 +32,4 @@ PolicyHook = policy_enforcement.PolicyHook
QuotaEnforcementHook = quota_enforcement.QuotaEnforcementHook
NotifierHook = notifier.NotifierHook
QueryParametersHook = query_parameters.QueryParametersHook
UserFilterHook = userfilters.UserFilterHook

View File

@ -211,10 +211,8 @@ class PolicyHook(hooks.PecanHook):
# This routine will remove the fields that were requested to the
# plugin for policy evaluation but were not specified in the
# API request
user_fields = request.params.getall('fields')
return dict(item for item in data.items()
if (item[0] not in fields_to_strip and
(not user_fields or item[0] in user_fields)))
if item[0] not in fields_to_strip)
def _exclude_attributes_by_policy(self, context, controller, resource,
collection, data):

View File

@ -0,0 +1,53 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pecan import hooks
class UserFilterHook(hooks.PecanHook):
# we do this at the very end to ensure user-defined filters
# don't impact things like pagination and notification hooks
priority = 90
def after(self, state):
user_fields = state.request.params.getall('fields')
if not user_fields:
return
try:
data = state.response.json
except ValueError:
return
resource = state.request.context.get('resource')
collection = state.request.context.get('collection')
if collection not in data and resource not in data:
return
is_single = resource in data
key = resource if resource in data else collection
if is_single:
data[key] = self._filter_item(
state.response.json[key], user_fields)
else:
data[key] = [
self._filter_item(i, user_fields)
for i in state.response.json[key]
]
state.response.json = data
def _filter_item(self, item, fields):
return {
field: value
for field, value in item.items()
if field in fields
}

View File

@ -524,8 +524,9 @@ class TestPaginationAndSorting(test_functional.PecanFunctionalTest):
if limit and marker:
links_key = '%s_links' % collection
self.assertIn(links_key, list_resp)
list_resp_ids = [item['id'] for item in list_resp[collection]]
self.assertEqual(expected_list, list_resp_ids)
if not fields or 'id' in fields:
list_resp_ids = [item['id'] for item in list_resp[collection]]
self.assertEqual(expected_list, list_resp_ids)
if fields:
for item in list_resp[collection]:
for field in fields:
@ -535,6 +536,10 @@ class TestPaginationAndSorting(test_functional.PecanFunctionalTest):
self._test_get_collection_with_pagination([self.networks[0]['id']],
limit=1)
def test_get_collection_with_pagination_fields_no_pk(self):
self._test_get_collection_with_pagination([self.networks[0]['id']],
limit=1, fields=['name'])
def test_get_collection_with_pagination_limit_over_count(self):
expected_ids = [network['id'] for network in self.networks]
self._test_get_collection_with_pagination(