Emit warning when use 'user_id' in policy rule
User-based policy enforcement isn't supported by Nova. There still exists support for some APIs to keep backwards compatiblity, but support for them will be removed in the future. So this patch adds a warning message when people use a rule that is user-based. Partially implements blueprint user-id-based-policy-enforcement Co-Authored-By: Ed Leafe <ed@leafe.com> Change-Id: Iaa9a142ce93a8d13452f0c6318a3d0b54f6220ce
This commit is contained in:
parent
1b53f12119
commit
9d9bfea8c9
@ -14,21 +14,33 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""Policy Engine For Nova."""
|
"""Policy Engine For Nova."""
|
||||||
|
import copy
|
||||||
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_policy import policy
|
from oslo_policy import policy
|
||||||
from oslo_utils import excutils
|
from oslo_utils import excutils
|
||||||
|
import six
|
||||||
|
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova.i18n import _LE
|
from nova.i18n import _LE, _LW
|
||||||
from nova import policies
|
from nova import policies
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
_ENFORCER = None
|
_ENFORCER = None
|
||||||
|
# This list is about the resources which support user based policy enforcement.
|
||||||
|
# Avoid sending deprecation warning for those resources.
|
||||||
|
USER_BASED_RESOURCES = ['os-keypairs']
|
||||||
|
# oslo_policy will read the policy configuration file again when the file
|
||||||
|
# is changed in runtime so the old policy rules will be saved to
|
||||||
|
# saved_file_rules and used to compare with new rules to determine the
|
||||||
|
# rules whether were updated.
|
||||||
|
saved_file_rules = []
|
||||||
|
KEY_EXPR = re.compile(r'%\((\w+)\)s')
|
||||||
|
|
||||||
|
|
||||||
def reset():
|
def reset():
|
||||||
@ -51,6 +63,8 @@ def init(policy_file=None, rules=None, default_rule=None, use_conf=True):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
global _ENFORCER
|
global _ENFORCER
|
||||||
|
global saved_file_rules
|
||||||
|
|
||||||
if not _ENFORCER:
|
if not _ENFORCER:
|
||||||
_ENFORCER = policy.Enforcer(CONF,
|
_ENFORCER = policy.Enforcer(CONF,
|
||||||
policy_file=policy_file,
|
policy_file=policy_file,
|
||||||
@ -58,6 +72,42 @@ def init(policy_file=None, rules=None, default_rule=None, use_conf=True):
|
|||||||
default_rule=default_rule,
|
default_rule=default_rule,
|
||||||
use_conf=use_conf)
|
use_conf=use_conf)
|
||||||
register_rules(_ENFORCER)
|
register_rules(_ENFORCER)
|
||||||
|
_ENFORCER.load_rules()
|
||||||
|
|
||||||
|
# Only the rules which are loaded from file may be changed.
|
||||||
|
current_file_rules = _ENFORCER.file_rules
|
||||||
|
current_file_rules = _serialize_rules(current_file_rules)
|
||||||
|
|
||||||
|
# Checks whether the rules are updated in the runtime
|
||||||
|
if saved_file_rules != current_file_rules:
|
||||||
|
_warning_for_deprecated_user_based_rules(current_file_rules)
|
||||||
|
saved_file_rules = copy.deepcopy(current_file_rules)
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_rules(rules):
|
||||||
|
"""Serialize all the Rule object as string which is used to compare the
|
||||||
|
rules list.
|
||||||
|
"""
|
||||||
|
result = [(rule_name, str(rule))
|
||||||
|
for rule_name, rule in six.iteritems(rules)]
|
||||||
|
return sorted(result, key=lambda rule: rule[0])
|
||||||
|
|
||||||
|
|
||||||
|
def _warning_for_deprecated_user_based_rules(rules):
|
||||||
|
"""Warning user based policy enforcement used in the rule but the rule
|
||||||
|
doesn't support it.
|
||||||
|
"""
|
||||||
|
for rule in rules:
|
||||||
|
# We will skip the warning for the resources which support user based
|
||||||
|
# policy enforcement.
|
||||||
|
if [resource for resource in USER_BASED_RESOURCES
|
||||||
|
if resource in rule[0]]:
|
||||||
|
continue
|
||||||
|
if 'user_id' in KEY_EXPR.findall(rule[1]):
|
||||||
|
LOG.warning(_LW("The user_id attribute isn't supported in the "
|
||||||
|
"rule '%s'. All the user_id based policy "
|
||||||
|
"enforcement will be removed in the "
|
||||||
|
"future."), rule[0])
|
||||||
|
|
||||||
|
|
||||||
def set_rules(rules, overwrite=True, use_conf=False):
|
def set_rules(rules, overwrite=True, use_conf=False):
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
"""Test of Policy Engine For Nova."""
|
"""Test of Policy Engine For Nova."""
|
||||||
|
|
||||||
|
import mock
|
||||||
import os.path
|
import os.path
|
||||||
|
|
||||||
from oslo_policy import policy as oslo_policy
|
from oslo_policy import policy as oslo_policy
|
||||||
@ -156,6 +157,30 @@ class PolicyTestCase(test.NoDBTestCase):
|
|||||||
policy.authorize(admin_context, lowercase_action, self.target)
|
policy.authorize(admin_context, lowercase_action, self.target)
|
||||||
policy.authorize(admin_context, uppercase_action, self.target)
|
policy.authorize(admin_context, uppercase_action, self.target)
|
||||||
|
|
||||||
|
@mock.patch.object(policy.LOG, 'warning')
|
||||||
|
def test_warning_when_deprecated_user_based_rule_used(self, mock_warning):
|
||||||
|
policy._warning_for_deprecated_user_based_rules(
|
||||||
|
[("os_compute_api:servers:index",
|
||||||
|
"project_id:%(project_id)s or user_id:%(user_id)s")])
|
||||||
|
mock_warning.assert_called_once_with(
|
||||||
|
u"The user_id attribute isn't supported in the rule "
|
||||||
|
"'%s'. All the user_id based policy enforcement will be removed "
|
||||||
|
"in the future.", "os_compute_api:servers:index")
|
||||||
|
|
||||||
|
@mock.patch.object(policy.LOG, 'warning')
|
||||||
|
def test_no_warning_for_user_based_resource(self, mock_warning):
|
||||||
|
policy._warning_for_deprecated_user_based_rules(
|
||||||
|
[("os_compute_api:os-keypairs:index",
|
||||||
|
"user_id:%(user_id)s")])
|
||||||
|
mock_warning.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(policy.LOG, 'warning')
|
||||||
|
def test_no_warning_for_no_user_based_rule(self, mock_warning):
|
||||||
|
policy._warning_for_deprecated_user_based_rules(
|
||||||
|
[("os_compute_api:servers:index",
|
||||||
|
"project_id:%(project_id)s")])
|
||||||
|
mock_warning.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
class IsAdminCheckTestCase(test.NoDBTestCase):
|
class IsAdminCheckTestCase(test.NoDBTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user