diff --git a/doc/source/rbac_validation.rst b/doc/source/rbac_validation.rst index ccaf3c85..a3cd7e68 100644 --- a/doc/source/rbac_validation.rst +++ b/doc/source/rbac_validation.rst @@ -7,7 +7,7 @@ RBAC Testing Validation Overview -------- -RBAC Testing Validation is broken up into 3 stages: +RBAC testing validation is broken up into 3 stages: 1. "Expected" stage. Determine whether the test should be able to succeed or fail based on the test role defined by ``[patrole] rbac_test_role``) @@ -41,11 +41,20 @@ is initiated. .. automodule:: patrole_tempest_plugin.rbac_rule_validation :members: + :private-members: --------------------------- The Policy Authority Module --------------------------- +Module called by the "RBAC Rule Validation Module" to verify whether the test +role is allowed to execute a policy action by querying ``oslo.policy`` with +required test data. The result is used by the "RBAC Rule Validation Module" as +the `expected` result. + +This module is only called for calculating the `expected` result if +``[patrole] test_custom_requirements`` is ``False``. + Using the Policy Authority Module, policy verification is performed by: 1. Pooling together the default `in-code` policy rules. diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py index af227c48..d2d07c06 100644 --- a/patrole_tempest_plugin/policy_authority.py +++ b/patrole_tempest_plugin/policy_authority.py @@ -32,37 +32,72 @@ LOG = logging.getLogger(__name__) class PolicyAuthority(RbacAuthority): - """A class for parsing policy rules into lists of allowed roles. - - RBAC testing requires that each rule in a policy file be broken up into - the roles that constitute it. This class automates that process. - - The list of roles per rule can be reverse-engineered by checking, for - each role, whether a given rule is allowed using oslo policy. - """ + """A class that uses ``oslo.policy`` for validating RBAC.""" def __init__(self, project_id, user_id, service, extra_target_data=None): - """Initialization of Rbac Policy Parser. + """Initialization of Policy Authority class. - Parses a policy file to create a dictionary, mapping policy actions to - roles. If a policy file does not exist, checks whether the policy file - is registered as a namespace under oslo.policy.policies. Nova, for - example, doesn't use a policy.json file by default; its policy is - implemented in code and registered as 'nova' under - oslo.policy.policies. + Validates whether a test role can perform a policy action by querying + ``oslo.policy`` with necessary test data. - If the policy file is not found in either place, raises an exception. + If a policy file does not exist, checks whether the policy file is + registered as a namespace under "oslo.policy.policies". Nova, for + example, doesn't use a policy file by default; its policies are + implemented in code and registered as "nova" under + "oslo.policy.policies". - Additionally, if the policy file exists in both code and as a - policy.json (for example, by creating a custom nova policy.json file), - the custom policy file over the default policy implementation is - prioritized. + If the policy file is not found in either code or in a policy file, + then an exception is raised. + + Additionally, if a custom policy file exists along with the default + policy in code implementation, the custom policy is prioritized. :param uuid project_id: project_id of object performing API call :param uuid user_id: user_id of object performing API call :param string service: service of the policy file :param dict extra_target_data: dictionary containing additional object data needed by oslo.policy to validate generic checks + + Example: + + .. code-block:: python + + # Below is the default policy implementation in code, defined in + # a service like Nova. + test_policies = [ + policy.DocumentedRuleDefault( + 'service:test_rule', + base.RULE_ADMIN_OR_OWNER, + "This is a description for a test policy", + [ + { + 'method': 'POST', + 'path': '/path/to/test/resource' + } + ]), + 'service:another_test_rule', + base.RULE_ADMIN_OR_OWNER, + "This is a description for another test policy", + [ + { + 'method': 'GET', + 'path': '/path/to/test/resource' + } + ]), + ] + + .. code-block:: yaml + + # Below is the custom override of the default policy in a YAML + # policy file. Note that the default rule is "rule:admin_or_owner" + # and the custom rule is "rule:admin_api". The `PolicyAuthority` + # class will use the "rule:admin_api" definition for this policy + # action. + "service:test_rule" : "rule:admin_api" + + # Note below that no override is provided for + # "service:another_test_rule", which means that the default policy + # rule is used: "rule:admin_or_owner". """ if extra_target_data is None: @@ -108,9 +143,10 @@ class PolicyAuthority(RbacAuthority): @classmethod def discover_policy_files(cls): - # Dynamically discover the policy file for each service in - # ``cls.available_services``. Pick the first ``candidate_path`` found - # out of the potential paths in ``CONF.patrole.custom_policy_files``. + """Dynamically discover the policy file for each service in + ``cls.available_services``. Pick the first candidate path found + out of the potential paths in ``[patrole] custom_policy_files``. + """ if not hasattr(cls, 'policy_files'): cls.policy_files = {} for service in cls.available_services: @@ -120,6 +156,11 @@ class PolicyAuthority(RbacAuthority): candidate_path % service) def allowed(self, rule_name, role): + """Checks if a given rule in a policy is allowed with given role. + + :param string rule_name: Rule to be checked using ``oslo.policy``. + :param bool is_admin: Whether admin context is used. + """ is_admin_context = self._is_admin_context(role) is_allowed = self._allowed( access=self._get_access_token(role), @@ -220,13 +261,11 @@ class PolicyAuthority(RbacAuthority): return access_token def _allowed(self, access, apply_rule, is_admin=False): - """Checks if a given rule in a policy is allowed with given access. + """Checks if a given rule in a policy is allowed with given ``access``. - Adapted from oslo_policy.shell. - - :param access: type dict: dictionary from ``_get_access_token`` - :param apply_rule: type string: rule to be checked - :param is_admin: type bool: whether admin context is used + :param dict access: Dictionary from ``_get_access_token``. + :param string apply_rule: Rule to be checked using ``oslo.policy``. + :param bool is_admin: Whether admin context is used. """ access_data = copy.copy(access['token']) access_data['roles'] = [role['name'] for role in access_data['roles']] diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py index dd5b6895..540d006a 100644 --- a/patrole_tempest_plugin/rbac_rule_validation.py +++ b/patrole_tempest_plugin/rbac_rule_validation.py @@ -38,7 +38,7 @@ RBACLOG = logging.getLogger('rbac_reporting') def action(service, rule='', admin_only=False, expected_error_code=403, extra_target_data=None): - """A decorator for verifying policy enforcement. + """A decorator for verifying OpenStack policy enforcement. A decorator which allows for positive and negative RBAC testing. Given: @@ -50,7 +50,7 @@ def action(service, rule='', admin_only=False, expected_error_code=403, API call that enforces the ``rule``. This decorator should only be applied to an instance or subclass of - `tempest.base.BaseTestCase`. + ``tempest.test.BaseTestCase``. The result from ``_is_authorized`` is used to determine the *expected* test result. The *actual* test result is determined by running the @@ -68,7 +68,7 @@ def action(service, rule='', admin_only=False, expected_error_code=403, As such, negative and positive testing can be applied using this decorator. - :param service: A OpenStack service. Examples: "nova" or "neutron". + :param service: An OpenStack service. Examples: "nova" or "neutron". :param rule: A policy action defined in a policy.json file (or in code). @@ -76,11 +76,10 @@ def action(service, rule='', admin_only=False, expected_error_code=403, Patrole currently only supports custom JSON policy files. - :param admin_only: Skips over `oslo.policy` check because the policy action - defined by `rule` is not enforced by the service's policy + :param admin_only: Skips over ``oslo.policy`` check because the policy + action defined by ``rule`` is not enforced by the service's policy enforcement engine. For example, Keystone v2 performs an admin check - for most of its endpoints. If True, `rule` is effectively - ignored. + for most of its endpoints. If True, ``rule`` is effectively ignored. :param expected_error_code: Overrides default value of 403 (Forbidden) with endpoint-specific error code. Currently only supports 403 and 404. Support for 404 is needed because some services, like Neutron, @@ -89,11 +88,11 @@ def action(service, rule='', admin_only=False, expected_error_code=403, .. warning:: A 404 should not be provided *unless* the endpoint masks a - `Forbidden` exception as a `Not Found` exception. + ``Forbidden`` exception as a ``NotFound`` exception. - :param extra_target_data: Dictionary, keyed with `oslo.policy` generic + :param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic check names, whose values are string literals that reference nested - `tempest.base.BaseTestCase` attributes. Used by `oslo.policy` for + ``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for performing matching against attributes that are sent along with the API calls. Example:: @@ -102,8 +101,7 @@ def action(service, rule='', admin_only=False, expected_error_code=403, "os_alt.auth_provider.credentials.user_id" }) - :raises NotFound: If `service` is invalid or if Tempest credentials cannot - be found. + :raises NotFound: If ``service`` is invalid. :raises Forbidden: For item (2) above. :raises RbacOverPermission: For item (3) above. @@ -112,7 +110,7 @@ def action(service, rule='', admin_only=False, expected_error_code=403, @rbac_rule_validation.action( service="nova", rule="os_compute_api:os-agents") def test_list_agents_rbac(self): - # The call to ``switch_role`` is mandatory. + # The call to `switch_role` is mandatory. self.rbac_utils.switch_role(self, toggle_rbac_role=True) self.agents_client.list_agents() """ @@ -194,20 +192,19 @@ def action(service, rule='', admin_only=False, expected_error_code=403, def _is_authorized(test_obj, service, rule, extra_target_data, admin_only): """Validates whether current RBAC role has permission to do policy action. - :param test_obj: An instance or subclass of `tempest.base.BaseTestCase`. + :param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``. :param service: The OpenStack service that enforces ``rule``. :param rule: The name of the policy action. Examples include "identity:create_user" or "os_compute_api:os-agents". - :param extra_target_data: Dictionary, keyed with `oslo.policy` generic + :param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic check names, whose values are string literals that reference nested - `tempest.base.BaseTestCase` attributes. Used by `oslo.policy` for + ``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for performing matching against attributes that are sent along with the API calls. - :param admin_only: Skips over `oslo.policy` check because the policy action - defined by `rule` is not enforced by the service's policy + :param admin_only: Skips over ``oslo.policy`` check because the policy + action defined by ``rule`` is not enforced by the service's policy enforcement engine. For example, Keystone v2 performs an admin check - for most of its endpoints. If True, `rule` is effectively - ignored. + for most of its endpoints. If True, ``rule`` is effectively ignored. :returns: True if the current RBAC role can perform the policy action, else False. @@ -268,14 +265,15 @@ def _get_exception_type(expected_error_code=403): """Dynamically calculate the expected exception to be caught. Dynamically calculate the expected exception to be caught by the test case. - Only `Forbidden` and `NotFound` exceptions are permitted. `NotFound` is - supported because Neutron, for security reasons, masks `Forbidden` - exceptions as `NotFound` exceptions. + Only ``Forbidden`` and ``NotFound`` exceptions are permitted. ``NotFound`` + is supported because Neutron, for security reasons, masks ``Forbidden`` + exceptions as ``NotFound`` exceptions. :param expected_error_code: the integer representation of the expected - exception to be caught. Must be contained in `_SUPPORTED_ERROR_CODES`. + exception to be caught. Must be contained in + ``_SUPPORTED_ERROR_CODES``. :returns: tuple of the exception type corresponding to - `expected_error_code` and a message explaining that a non-Forbidden + ``expected_error_code`` and a message explaining that a non-Forbidden exception was expected, if applicable. """ expected_exception = None @@ -304,7 +302,8 @@ def _format_extra_target_data(test_obj, extra_target_data): Before being formatted, "extra_target_data" is a dictionary that maps a policy string like "trust.trustor_user_id" to a nested list of - `tempest.base.BaseTestCase` attributes. For example, the attribute list in: + ``tempest.test.BaseTestCase`` attributes. For example, the attribute list + in: "trust.trustor_user_id": "os.auth_provider.credentials.user_id" @@ -313,14 +312,14 @@ def _format_extra_target_data(test_obj, extra_target_data): "trust.trustor_user_id": "the user_id of the `os_primary` credential" - :param test_obj: An instance or subclass of `tempest.base.BaseTestCase`. - :param extra_target_data: Dictionary, keyed with `oslo.policy` generic + :param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``. + :param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic check names, whose values are string literals that reference nested - `tempest.base.BaseTestCase` attributes. Used by `oslo.policy` for + ``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for performing matching against attributes that are sent along with the API calls. :returns: Dictionary containing additional object data needed by - `oslo.policy` to validate generic checks. + ``oslo.policy`` to validate generic checks. """ attr_value = test_obj formatted_target_data = {}