Modify policy parser to combine custom and default policy files.

Currently, the rbac policy parser file tries to:
  1) Read the custom policy file if it exists
  2) Otherwise check if the default policy file exists in code

The problem with this approach is:
  - What if the custom policy file does not specify all policy actions?
    This is problematic when it comes to validating the policy action:
    is it defined or not?
  - This also holds true for default policy files which may not define
    all the policy actions enforced by the service explicitly.

This patch partially fixes this issue by 1) using all the
default policy actions defined in code, if they exist and 2)
overwriting any default policy actions with the custom
policy actions provided by the user in a custom policy file.

The end result is that the Patrole framework uses as many policy actions
as possible for reference, while using as many custom-defined policy
actions as possible. This patch, therefore, makes it more feasible to
throw an exception if a policy action is invalid.

Change-Id: Idb6b8a99170fd32097940d5b23182f5e43956548
Depends-On: I7feb522b2ea5f56e48982169c7ebbb2ec2ef2cb3
This commit is contained in:
Felipe Monteiro 2017-03-23 22:49:06 +00:00
parent d4a4aa674b
commit ae2ebab27c
4 changed files with 157 additions and 28 deletions

View File

@ -14,16 +14,19 @@
# under the License.
import copy
import json
import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_policy import generator
from oslo_policy import policy
import stevedore
from tempest.common import credentials_factory as credentials
from patrole_tempest_plugin import rbac_exceptions
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -59,6 +62,7 @@ class RbacPolicyParser(object):
:param service: type string
:param path: type string
"""
# First check if the service is valid
service = service.lower().strip() if service else None
self.admin_mgr = credentials.AdminManager()
@ -69,32 +73,11 @@ class RbacPolicyParser(object):
LOG.debug(str(service) + " is NOT a valid service.")
raise rbac_exceptions.RbacInvalidService
# Use default path if no path provided
if path is None:
self.path = os.path.join('/etc', service, 'policy.json')
else:
self.path = path
policy_data = "{}"
# Check whether policy file exists.
if os.path.isfile(self.path):
policy_data = open(self.path, 'r').read()
# Otherwise use oslo_policy to fetch the rules for provided service.
else:
policy_generator = generator._get_policies_dict([service])
if policy_generator and service in policy_generator:
policy_data = "{\n"
for r in policy_generator[service]:
policy_data = policy_data + r.__str__() + ",\n"
policy_data = policy_data[:-2] + "\n}"
# Otherwise raise an exception.
else:
raise rbac_exceptions.RbacParsingException(
'Policy file for service: {0}, {1} not found.'
.format(service, self.path))
self.rules = policy.Rules.load(policy_data, "default")
# Use default path in /etc/<service_name/policy.json if no path
# is provided.
self.path = path or os.path.join('/etc', service, 'policy.json')
self.rules = policy.Rules.load(self._get_policy_data(service),
'default')
self.tenant_id = tenant_id
self.user_id = user_id
@ -106,6 +89,61 @@ class RbacPolicyParser(object):
is_admin=is_admin_context)
return is_allowed
def _get_policy_data(self, service):
file_policy_data = {}
mgr_policy_data = {}
policy_data = {}
# Check whether policy file exists.
if os.path.isfile(self.path):
with open(self.path, 'r') as policy_file:
file_policy_data = policy_file.read()
try:
file_policy_data = json.loads(file_policy_data)
except ValueError:
pass
# Check whether policy actions are defined in code. Nova and Keystone,
# for example, define their default policy actions in code.
mgr = stevedore.named.NamedExtensionManager(
'oslo.policy.policies',
names=[service],
on_load_failure_callback=None,
invoke_on_load=True,
warn_on_missing_entrypoint=False)
if mgr:
policy_generator = {policy.name: policy.obj for policy in mgr}
if policy_generator and service in policy_generator:
for rule in policy_generator[service]:
mgr_policy_data[rule.name] = str(rule.check)
# If data from both file and code exist, combine both together.
if file_policy_data and mgr_policy_data:
# Add the policy actions from code first.
for action, rule in mgr_policy_data.items():
policy_data[action] = rule
# Overwrite with any custom policy actions defined in policy.json.
for action, rule in file_policy_data.items():
policy_data[action] = rule
elif file_policy_data:
policy_data = file_policy_data
elif mgr_policy_data:
policy_data = mgr_policy_data
else:
error_message = 'Policy file for {0} service neither found in '\
'code nor at {1}.'.format(service, self.path)
raise rbac_exceptions.RbacParsingException(error_message)
try:
policy_data = json.dumps(policy_data)
except ValueError:
error_message = 'Policy file for {0} service is invalid.'.format(
service)
raise rbac_exceptions.RbacParsingException(error_message)
return policy_data
def _is_admin_context(self, role):
"""Checks whether a role has admin context.

View File

@ -10,5 +10,5 @@
"policy_action_3": "rule:zero_rule",
"policy_action_4": "rule:prime_rule",
"policy_action_5": "rule:all_rule",
"policy_action_6": "role:eight",
"policy_action_6": "role:eight"
}

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import os
@ -75,6 +76,11 @@ class RbacPolicyTest(base.TestCase):
identity_services_v3_client.list_services.return_value = \
services
def _get_fake_policy_rule(self, name, rule):
fake_rule = mock.Mock(check=rule)
fake_rule.name = name
return fake_rule
@mock.patch.object(rbac_policy_parser, 'LOG', autospec=True)
def test_custom_policy(self, m_log):
default_roles = ['zero', 'one', 'two', 'three', 'four',
@ -294,3 +300,87 @@ class RbacPolicyTest(base.TestCase):
parser.allowed, mock.sentinel.rule, None)
self.assertIn(expected_message, str(e))
m_log.debug.assert_called_once_with(expected_message)
@mock.patch.object(rbac_policy_parser, 'stevedore', autospec=True)
def test_get_policy_data_from_file_and_from_code(self, mock_stevedore):
fake_policy_rules = [
self._get_fake_policy_rule('code_policy_action_1',
'rule:code_rule_1'),
self._get_fake_policy_rule('code_policy_action_2',
'rule:code_rule_2'),
self._get_fake_policy_rule('code_policy_action_3',
'rule:code_rule_3'),
]
mock_manager = mock.Mock(obj=fake_policy_rules)
mock_manager.configure_mock(name='fake_service')
mock_stevedore.named.NamedExtensionManager.return_value = [
mock_manager
]
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
parser = rbac_policy_parser.RbacPolicyParser(
test_tenant_id, test_user_id, "test", self.tenant_policy_file)
policy_data = parser._get_policy_data('fake_service')
self.assertIsInstance(policy_data, str)
actual_policy_data = json.loads(policy_data)
expected_policy_data = {
"code_policy_action_1": "rule:code_rule_1",
"code_policy_action_2": "rule:code_rule_2",
"code_policy_action_3": "rule:code_rule_3",
"rule1": "tenant_id:%(network:tenant_id)s",
"rule2": "tenant_id:%(tenant_id)s",
"rule3": "project_id:%(project_id)s",
"rule4": "user_id:%(user_id)s",
"admin_tenant_rule": "role:admin and tenant_id:%(tenant_id)s",
"admin_user_rule": "role:admin and user_id:%(user_id)s"
}
self.assertEqual(expected_policy_data, actual_policy_data)
@mock.patch.object(rbac_policy_parser, 'stevedore', autospec=True)
def test_get_policy_data_from_file_and_from_code_with_overwrite(
self, mock_stevedore):
# The custom policy file should overwrite default rules rule1 and rule2
# that are defined in code.
fake_policy_rules = [
self._get_fake_policy_rule('rule1', 'rule:code_rule_1'),
self._get_fake_policy_rule('rule2', 'rule:code_rule_2'),
self._get_fake_policy_rule('code_policy_action_3',
'rule:code_rule_3'),
]
mock_manager = mock.Mock(obj=fake_policy_rules)
mock_manager.configure_mock(name='fake_service')
mock_stevedore.named.NamedExtensionManager.return_value = [
mock_manager
]
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
parser = rbac_policy_parser.RbacPolicyParser(
test_tenant_id, test_user_id, "test", self.tenant_policy_file)
policy_data = parser._get_policy_data('fake_service')
self.assertIsInstance(policy_data, str)
actual_policy_data = json.loads(policy_data)
expected_policy_data = {
"code_policy_action_3": "rule:code_rule_3",
"rule1": "tenant_id:%(network:tenant_id)s",
"rule2": "tenant_id:%(tenant_id)s",
"rule3": "project_id:%(project_id)s",
"rule4": "user_id:%(user_id)s",
"admin_tenant_rule": "role:admin and tenant_id:%(tenant_id)s",
"admin_user_rule": "role:admin and user_id:%(user_id)s"
}
self.assertEqual(expected_policy_data, actual_policy_data)

View File

@ -7,3 +7,4 @@ urllib3>=1.15.1 # MIT
oslo.log>=3.11.0 # Apache-2.0
oslo.config>=3.22.0 # Apache-2.0
tempest>=14.0.0 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0