Improve naming of get_allowed_params() argument

This used to be just a list of allowed names, but now it's a dict
mapping names to types. Use variable names that reflect the current
meaning, fix the docs, and use named constants where available.

Change-Id: I3aadca4e1e9db5da5d07d521c2313162062639b7
This commit is contained in:
Zane Bitter 2020-07-14 10:44:22 -04:00
parent 24bae944dc
commit 73d05c0cdc
10 changed files with 62 additions and 62 deletions

View File

@ -109,21 +109,21 @@ class EventController(object):
@util.registered_identified_stack
def index(self, req, identity, resource_name=None):
"""Lists summary information for all events."""
whitelist = {
param_types = {
'limit': util.PARAM_TYPE_SINGLE,
'marker': util.PARAM_TYPE_SINGLE,
'sort_dir': util.PARAM_TYPE_SINGLE,
'sort_keys': util.PARAM_TYPE_MULTI,
'nested_depth': util.PARAM_TYPE_SINGLE,
}
filter_whitelist = {
filter_param_types = {
'resource_status': util.PARAM_TYPE_MIXED,
'resource_action': util.PARAM_TYPE_MIXED,
'resource_name': util.PARAM_TYPE_MIXED,
'resource_type': util.PARAM_TYPE_MIXED,
}
params = util.get_allowed_params(req.params, whitelist)
filter_params = util.get_allowed_params(req.params, filter_whitelist)
params = util.get_allowed_params(req.params, param_types)
filter_params = util.get_allowed_params(req.params, filter_param_types)
int_params = (rpc_api.PARAM_LIMIT, rpc_api.PARAM_NESTED_DEPTH)
try:

View File

@ -96,18 +96,18 @@ class ResourceController(object):
def index(self, req, identity):
"""Lists information for all resources."""
whitelist = {
'type': 'mixed',
'status': 'mixed',
'name': 'mixed',
'action': 'mixed',
'id': 'mixed',
'physical_resource_id': 'mixed'
param_types = {
'type': util.PARAM_TYPE_MIXED,
'status': util.PARAM_TYPE_MIXED,
'name': util.PARAM_TYPE_MIXED,
'action': util.PARAM_TYPE_MIXED,
'id': util.PARAM_TYPE_MIXED,
'physical_resource_id': util.PARAM_TYPE_MIXED,
}
invalid_keys = (set(req.params.keys()) -
set(list(whitelist) + [rpc_api.PARAM_NESTED_DEPTH,
rpc_api.PARAM_WITH_DETAIL]))
set(list(param_types) + [rpc_api.PARAM_NESTED_DEPTH,
rpc_api.PARAM_WITH_DETAIL]))
if invalid_keys:
raise exc.HTTPBadRequest(_('Invalid filter parameters %s') %
str(list(invalid_keys)))
@ -121,7 +121,7 @@ class ResourceController(object):
param_utils.extract_bool,
default=False)
params = util.get_allowed_params(req.params, whitelist)
params = util.get_allowed_params(req.params, param_types)
res_list = self.rpc_client.list_stack_resources(req.context,
identity,
@ -135,8 +135,8 @@ class ResourceController(object):
def show(self, req, identity, resource_name):
"""Gets detailed information for a resource."""
whitelist = {'with_attr': util.PARAM_TYPE_MULTI}
params = util.get_allowed_params(req.params, whitelist)
param_types = {'with_attr': util.PARAM_TYPE_MULTI}
params = util.get_allowed_params(req.params, param_types)
if 'with_attr' not in params:
params['with_attr'] = None
res = self.rpc_client.describe_stack_resource(req.context,

View File

@ -44,11 +44,11 @@ class SoftwareConfigController(object):
raise exc.HTTPBadRequest(str(e))
def _index(self, req, use_admin_cnxt=False):
whitelist = {
param_types = {
'limit': util.PARAM_TYPE_SINGLE,
'marker': util.PARAM_TYPE_SINGLE
}
params = util.get_allowed_params(req.params, whitelist)
params = util.get_allowed_params(req.params, param_types)
if use_admin_cnxt:
cnxt = context.get_admin_context()

View File

@ -37,10 +37,10 @@ class SoftwareDeploymentController(object):
@util.registered_policy_enforce
def index(self, req):
"""List software deployments."""
whitelist = {
param_types = {
'server_id': util.PARAM_TYPE_SINGLE,
}
params = util.get_allowed_params(req.params, whitelist)
params = util.get_allowed_params(req.params, param_types)
sds = self.rpc_client.list_software_deployments(req.context, **params)
return {'software_deployments': sds}

View File

@ -204,7 +204,7 @@ class StackController(object):
raise exc.HTTPBadRequest(str(e))
def _index(self, req, use_admin_cnxt=False):
filter_whitelist = {
filter_param_types = {
# usage of keys in this list are not encouraged, please use
# rpc_api.STACK_KEYS instead
'id': util.PARAM_TYPE_MIXED,
@ -215,7 +215,7 @@ class StackController(object):
'username': util.PARAM_TYPE_MIXED,
'owner_id': util.PARAM_TYPE_MIXED,
}
whitelist = {
param_types = {
'limit': util.PARAM_TYPE_SINGLE,
'marker': util.PARAM_TYPE_SINGLE,
'sort_dir': util.PARAM_TYPE_SINGLE,
@ -228,7 +228,7 @@ class StackController(object):
'not_tags': util.PARAM_TYPE_SINGLE,
'not_tags_any': util.PARAM_TYPE_SINGLE,
}
params = util.get_allowed_params(req.params, whitelist)
params = util.get_allowed_params(req.params, param_types)
stack_keys = dict.fromkeys(rpc_api.STACK_KEYS, util.PARAM_TYPE_MIXED)
unsupported = (
rpc_api.STACK_ID, # not user visible
@ -246,7 +246,7 @@ class StackController(object):
for key in unsupported:
stack_keys.pop(key)
# downward compatibility
stack_keys.update(filter_whitelist)
stack_keys.update(filter_param_types)
filter_params = util.get_allowed_params(req.params, stack_keys)
show_deleted = False
@ -519,8 +519,8 @@ class StackController(object):
raise exc.HTTPAccepted()
def _param_show_nested(self, req):
whitelist = {'show_nested': 'single'}
params = util.get_allowed_params(req.params, whitelist)
param_types = {'show_nested': util.PARAM_TYPE_SINGLE}
params = util.get_allowed_params(req.params, param_types)
p_name = 'show_nested'
if p_name in params:
@ -604,9 +604,9 @@ class StackController(object):
data = InstantiationData(body)
whitelist = {'show_nested': util.PARAM_TYPE_SINGLE,
'ignore_errors': util.PARAM_TYPE_SINGLE}
params = util.get_allowed_params(req.params, whitelist)
param_types = {'show_nested': util.PARAM_TYPE_SINGLE,
'ignore_errors': util.PARAM_TYPE_SINGLE}
params = util.get_allowed_params(req.params, param_types)
show_nested = False
p_name = rpc_api.PARAM_SHOW_NESTED

View File

@ -86,22 +86,22 @@ PARAM_TYPES = (
)
def get_allowed_params(params, whitelist):
"""Extract from ``params`` all entries listed in ``whitelist``.
def get_allowed_params(params, param_types):
"""Extract from ``params`` all entries listed in ``param_types``.
The returning dict will contain an entry for a key if, and only if,
there's an entry in ``whitelist`` for that key and at least one entry in
there's an entry in ``param_types`` for that key and at least one entry in
``params``. If ``params`` contains multiple entries for the same key, it
will yield an array of values: ``{key: [v1, v2,...]}``
:param params: a NestedMultiDict from webob.Request.params
:param whitelist: an array of strings to whitelist
:param param_types: an dict of allowed parameters and their types
:returns: a dict with {key: value} pairs
"""
allowed_params = {}
for key, get_type in whitelist.items():
for key, get_type in param_types.items():
assert get_type in PARAM_TYPES
value = None

View File

@ -374,7 +374,7 @@ class EventControllerTest(tools.ControllerTest, common.HeatTestCase):
)
@mock.patch.object(rpc_client.EngineClient, 'call')
def test_index_whitelists_pagination_params(self, mock_call, mock_enforce):
def test_index_bogus_pagination_param(self, mock_call, mock_enforce):
self._mock_enforce_setup(mock_enforce, 'index', True)
params = {
'limit': 10,
@ -428,7 +428,7 @@ class EventControllerTest(tools.ControllerTest, common.HeatTestCase):
self.assertFalse(mock_call.called)
@mock.patch.object(rpc_client.EngineClient, 'call')
def test_index_whitelist_filter_params(self, mock_call, mock_enforce):
def test_index_bogus_filter_param(self, mock_call, mock_enforce):
self._mock_enforce_setup(mock_enforce, 'index', True)
params = {
'resource_status': 'COMPLETE',

View File

@ -48,8 +48,8 @@ class SoftwareDeploymentControllerTest(tools.ControllerTest,
resp = self.controller.index(req, tenant_id=self.tenant)
self.assertEqual(
{'software_deployments': []}, resp)
whitelist = mock_call.call_args[1]
self.assertEqual({}, whitelist)
params = mock_call.call_args[1]
self.assertEqual({}, params)
server_id = 'fb322564-7927-473d-8aad-68ae7fbf2abf'
req = self._get('/software_deployments', {'server_id': server_id})
with mock.patch.object(
@ -59,8 +59,8 @@ class SoftwareDeploymentControllerTest(tools.ControllerTest,
resp = self.controller.index(req, tenant_id=self.tenant)
self.assertEqual(
{'software_deployments': []}, resp)
whitelist = mock_call.call_args[1]
self.assertEqual({'server_id': server_id}, whitelist)
params = mock_call.call_args[1]
self.assertEqual({'server_id': server_id}, params)
@mock.patch.object(policy.Enforcer, 'enforce')
def test_show(self, mock_enforce):

View File

@ -313,7 +313,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
req.context, ('list_stacks', default_args), version='1.33')
@mock.patch.object(rpc_client.EngineClient, 'call')
def test_index_whitelists_pagination_params(self, mock_call, mock_enforce):
def test_index_bogus_pagination_param(self, mock_call, mock_enforce):
self._mock_enforce_setup(mock_enforce, 'index', True)
params = {
'limit': 10,
@ -351,7 +351,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
self.assertFalse(mock_call.called)
@mock.patch.object(rpc_client.EngineClient, 'call')
def test_index_whitelist_filter_params(self, mock_call, mock_enforce):
def test_index_bogus_filter_param(self, mock_call, mock_enforce):
self._mock_enforce_setup(mock_enforce, 'index', True)
params = {
'id': 'fake id',
@ -381,7 +381,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
'parent': 'fake parent',
'stack_user_project_id': 'fake project id',
'tags': 'fake tags',
'barlog': 'you shall not pass!'
'balrog': 'you shall not pass!'
}
req = self._get('/stacks', params=params)
mock_call.return_value = []
@ -403,7 +403,7 @@ class StackControllerTest(tools.ControllerTest, common.HeatTestCase):
for key in ('stack_identity', 'creation_time', 'updated_time',
'deletion_time', 'notification_topics', 'description',
'template_description', 'parameters', 'outputs',
'capabilities', 'tags', 'barlog'):
'capabilities', 'tags', 'balrog'):
self.assertNotIn(key, filters)
def test_index_returns_stack_count_if_with_count_is_true(

View File

@ -28,60 +28,60 @@ class TestGetAllowedParams(common.HeatTestCase):
req = wsgi.Request({})
self.params = req.params.copy()
self.params.add('foo', 'foo value')
self.whitelist = {'foo': util.PARAM_TYPE_SINGLE}
self.param_types = {'foo': util.PARAM_TYPE_SINGLE}
def test_returns_empty_dict(self):
self.whitelist = {}
self.param_types = {}
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertEqual({}, result)
def test_only_adds_whitelisted_params_if_param_exists(self):
self.whitelist = {'foo': util.PARAM_TYPE_SINGLE}
def test_only_adds_allowed_param_if_param_exists(self):
self.param_types = {'foo': util.PARAM_TYPE_SINGLE}
self.params.clear()
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertNotIn('foo', result)
def test_returns_only_whitelisted_params(self):
def test_returns_only_allowed_params(self):
self.params.add('bar', 'bar value')
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertIn('foo', result)
self.assertNotIn('bar', result)
def test_handles_single_value_params(self):
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertEqual('foo value', result['foo'])
def test_handles_multiple_value_params(self):
self.whitelist = {'foo': util.PARAM_TYPE_MULTI}
self.param_types = {'foo': util.PARAM_TYPE_MULTI}
self.params.add('foo', 'foo value 2')
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertEqual(2, len(result['foo']))
self.assertIn('foo value', result['foo'])
self.assertIn('foo value 2', result['foo'])
def test_handles_mixed_value_param_with_multiple_entries(self):
self.whitelist = {'foo': util.PARAM_TYPE_MIXED}
self.param_types = {'foo': util.PARAM_TYPE_MIXED}
self.params.add('foo', 'foo value 2')
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertEqual(2, len(result['foo']))
self.assertIn('foo value', result['foo'])
self.assertIn('foo value 2', result['foo'])
def test_handles_mixed_value_param_with_single_entry(self):
self.whitelist = {'foo': util.PARAM_TYPE_MIXED}
self.param_types = {'foo': util.PARAM_TYPE_MIXED}
result = util.get_allowed_params(self.params, self.whitelist)
result = util.get_allowed_params(self.params, self.param_types)
self.assertEqual('foo value', result['foo'])
def test_bogus_whitelist_items(self):
self.whitelist = {'foo': 'blah'}
def test_bogus_param_type(self):
self.param_types = {'foo': 'blah'}
self.assertRaises(AssertionError, util.get_allowed_params,
self.params, self.whitelist)
self.params, self.param_types)
class TestPolicyEnforce(common.HeatTestCase):