Fix creating policy rules from subattributes.

In case of policy rule checks for rules like e.g.
"create_port:fixed_ips:subnet" couldn't be created to be
passed to policy enforcer because policy module could only
create rule checks for subattributes which are dict types.

With this patch checks for such rules can be created also for
attributes which are list of dicts, like e.g. fixed_ips in port
resource.

Conflicts:
    neutron/conf/policies/port.py

Change-Id: I02fffe77f57a513d2362df78885d327042bb8095
Closes-Bug: #1822105
(cherry picked from commit 9318fb8bb9)
(cherry picked from commit a238b1bed6)
This commit is contained in:
Slawek Kaplonski 2019-03-28 21:36:11 +01:00
parent de95bc658e
commit 8e777c681f
4 changed files with 62 additions and 7 deletions

View File

@ -74,7 +74,7 @@
"create_port": "",
"create_port:device_owner": "not rule:network_device or rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:mac_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"create_port:fixed_ips:ip_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips:subnet_id": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"create_port:port_security_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
@ -82,6 +82,8 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:allowed_address_pairs:mac_address": "rule:admin_or_network_owner",
"create_port:allowed_address_pairs:ip_address": "rule:admin_or_network_owner",
"get_port": "rule:context_is_advsvc or rule:admin_owner_or_network_owner",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
@ -91,7 +93,7 @@
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:device_owner": "not rule:network_device or rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"update_port:fixed_ips:ip_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:fixed_ips:subnet_id": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"update_port:port_security_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
@ -99,6 +101,8 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:allowed_address_pairs:mac_address": "rule:admin_or_network_owner",
"update_port:allowed_address_pairs:ip_address": "rule:admin_or_network_owner",
"update_port:data_plane_status": "rule:admin_or_data_plane_int",
"delete_port": "rule:context_is_advsvc or rule:admin_owner_or_network_owner",

View File

@ -140,6 +140,17 @@ def _build_subattr_match_rule(attr_name, attr, action, target):
return policy.AndCheck(sub_attr_rules)
def _build_list_of_subattrs_rule(attr_name, attribute_value, action):
rules = []
for sub_attr in attribute_value:
if isinstance(sub_attr, dict):
for k in sub_attr:
rules.append(policy.RuleCheck(
'rule', '%s:%s:%s' % (action, attr_name, k)))
if rules:
return policy.AndCheck(rules)
def _process_rules_list(rules, match_rule):
"""Recursively walk a policy rule to extract a list of match entries."""
if isinstance(match_rule, policy.RuleCheck):
@ -175,8 +186,8 @@ def _build_match_rule(action, target, pluralized):
target, action):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name))
attr_rule = policy.RuleCheck(
'rule', '%s:%s' % (action, attribute_name))
# Build match entries for sub-attributes
if _should_validate_sub_attributes(
attribute, target[attribute_name]):
@ -184,6 +195,15 @@ def _build_match_rule(action, target, pluralized):
[attr_rule, _build_subattr_match_rule(
attribute_name, attribute,
action, target)])
attribute_value = target[attribute_name]
if isinstance(attribute_value, list):
subattr_rule = _build_list_of_subattrs_rule(
attribute_name, attribute_value, action)
if subattr_rule:
attr_rule = policy.AndCheck(
[attr_rule, subattr_rule])
match_rule = policy.AndCheck([match_rule, attr_rule])
return match_rule

View File

@ -74,7 +74,7 @@
"create_port": "",
"create_port:device_owner": "not rule:network_device or rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:mac_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"create_port:fixed_ips:ip_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:fixed_ips:subnet_id": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"create_port:port_security_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
@ -82,6 +82,8 @@
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
"create_port:allowed_address_pairs": "rule:admin_or_network_owner",
"create_port:allowed_address_pairs:mac_address": "rule:admin_or_network_owner",
"create_port:allowed_address_pairs:ip_address": "rule:admin_or_network_owner",
"get_port": "rule:context_is_advsvc or rule:admin_owner_or_network_owner",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
@ -91,7 +93,7 @@
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:device_owner": "not rule:network_device or rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:fixed_ips": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"update_port:fixed_ips:ip_address": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:fixed_ips:subnet_id": "rule:context_is_advsvc or rule:admin_or_network_owner or rule:shared",
"update_port:port_security_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
@ -99,6 +101,8 @@
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:context_is_advsvc or rule:admin_or_network_owner",
"update_port:allowed_address_pairs": "rule:admin_or_network_owner",
"update_port:allowed_address_pairs:mac_address": "rule:admin_or_network_owner",
"update_port:allowed_address_pairs:ip_address": "rule:admin_or_network_owner",
"update_port:data_plane_status": "rule:admin_or_data_plane_int",
"delete_port": "rule:context_is_advsvc or rule:admin_owner_or_network_owner",

View File

@ -184,7 +184,13 @@ FAKE_RESOURCES = {"%ss" % FAKE_RESOURCE_NAME:
'validate': {'type:dict':
{'sub_attr_1': {'type:string': None},
'sub_attr_2': {'type:string': None}}}
}},
},
'list_attr': {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'enforce_policy': True
}},
# special plural name
"%s" % FAKE_SPECIAL_RESOURCE_NAME.replace('y', 'ies'):
{'attr': {'allow_post': True,
@ -253,6 +259,10 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"create_fake_resource:attr": "rule:admin_or_owner",
"create_fake_resource:attr:sub_attr_1": "rule:admin_or_owner",
"create_fake_resource:attr:sub_attr_2": "rule:admin_only",
"create_fake_resource:list_attr": "rule:admin_only_or_owner",
"create_fake_resource:list_attr:admin_element": "rule:admin_only",
"create_fake_resource:list_attr:user_element": (
"rule:admin_or_owner"),
"create_fake_policy:": "rule:admin_or_owner",
}
@ -471,6 +481,23 @@ class NeutronPolicyTestCase(base.BaseTestCase):
action,
target)
def test_enforce_subattribute_as_list(self):
action = "create_" + FAKE_RESOURCE_NAME
target = {
'tenant_id': 'fake',
'list_attr': [{'user_element': 'x'}]}
result = policy.enforce(self.context,
action, target, None)
self.assertTrue(result)
def test_enforce_subattribute_as_list_forbiden(self):
action = "create_" + FAKE_RESOURCE_NAME
target = {
'tenant_id': 'fake',
'list_attr': [{'admin_element': 'x'}]}
self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,
self.context, action, target, None)
def test_retryrequest_on_notfound(self):
failure = exceptions.NetworkNotFound(net_id='whatever')
action = "create_port:mac"