Browse Source

Reload files in policy_dirs on primary file change

It was determined that rules from policy files located in the directory
specified in the policy_dirs option (/etc/<config_dir>/policy.d by
default) are not re-applied after the rules from the primary policy file
is re-applied due to a change.

This change introduces additional behavior to make sure the rules from
policy_dirs are reapplied if there is a change to the primary policy
file.

Change-Id: I8a6f8e971d881365c41ea409966723319d5b239a
Closes-Bug: #1880959
Related-Bug: #1880847
(cherry picked from commit 75677a3110)
(cherry picked from commit c8c138e69d)
tags/2.3.4^0
Dmitrii Shcherbakov 2 months ago
parent
commit
5904564bf1
3 changed files with 85 additions and 7 deletions
  1. +34
    -7
      oslo_policy/policy.py
  2. +43
    -0
      oslo_policy/tests/test_policy.py
  3. +8
    -0
      releasenotes/notes/bug-1880959-8f1370a59759d40d.yaml

+ 34
- 7
oslo_policy/policy.py View File

@@ -553,6 +553,8 @@ class Enforcer(object):
if force_reload:
self.use_conf = force_reload

policy_file_rules_changed = False

if self.use_conf:
if not self.policy_path:
try:
@@ -564,18 +566,30 @@ class Enforcer(object):
self._informed_no_policy_file = True

if self.policy_path:
self._load_policy_file(self.policy_path, force_reload,
overwrite=self.overwrite)
# If the policy file rules have changed any policy.d rules
# also need to be reapplied on top of that change.
policy_file_rules_changed = self._load_policy_file(
self.policy_path,
force_reload,
overwrite=self.overwrite
)

force_reload_policy_dir = force_reload
if policy_file_rules_changed:
force_reload_policy_dir = True

for path in self.conf.oslo_policy.policy_dirs:
try:
path = self._get_policy_path(path)
except cfg.ConfigFilesNotFoundError:
continue
if (force_reload or self._is_directory_updated(
self._policy_dir_mtimes, path)):
self._walk_through_policy_directory(path,
self._load_policy_file,
force_reload, False)
if (self._is_directory_updated(self._policy_dir_mtimes, path)
or force_reload_policy_dir):
self._walk_through_policy_directory(
path,
self._load_policy_file,
force_reload_policy_dir, False
)

for default in self.registered_rules.values():
if default.deprecated_for_removal:
@@ -761,6 +775,8 @@ class Enforcer(object):
# is in the cache
mtime = 0
if os.path.exists(path):
if not os.path.isdir(path):
raise ValueError('{} is not a directory'.format(path))
# Make a list of all the files
files = [path] + [os.path.join(path, file) for file in
os.listdir(path)]
@@ -799,14 +815,25 @@ class Enforcer(object):
self.file_rules[name] = RuleDefault(name, check_str)

def _load_policy_file(self, path, force_reload, overwrite=True):
"""Load policy rules from the specified policy file.

:param str path: A path of the policy file to load rules from.
:param bool force_reload: Forcefully reload the policy file content.
:param bool overwrite: Replace policy rules instead of updating them.
:return: A bool indicating whether rules have been changed or not.
:rtype: bool
"""
rules_changed = False
reloaded, data = _cache_handler.read_cached_file(
self._file_cache, path, force_reload=force_reload)
if reloaded or not self.rules:
rules = Rules.load(data, self.default_rule)
self.set_rules(rules, overwrite=overwrite, use_conf=True)
rules_changed = True
self._record_file_rules(data, overwrite)
self._loaded_files.append(path)
LOG.debug('Reloaded policy file: %(path)s', {'path': path})
return rules_changed

def _get_policy_path(self, path):
"""Locate the policy YAML/JSON data file/path.


+ 43
- 0
oslo_policy/tests/test_policy.py View File

@@ -296,6 +296,48 @@ class EnforcerTest(base.PolicyBaseTestCase):
os.path.join('policy.d', 'b.conf'),
])

def test_load_directory_after_file_update(self):
self.create_config_file(
os.path.join('policy.d', 'a.conf'), POLICY_A_CONTENTS)
self.enforcer.load_rules(True)
self.assertIsNotNone(self.enforcer.rules)
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
new_policy_json_contents = jsonutils.dumps({
"default": "rule:admin",
"admin": "is_admin:True",
"foo": "rule:bar",
})
# Modify the policy.json file and then validate that the rules
# from the policy directory are re-applied on top of the
# new rules from the file.
self.create_config_file('policy.json', new_policy_json_contents)
policy_file_path = self.get_config_file_fullname('policy.json')
# Force the mtime change since the unit test may write to this file
# too fast for mtime to actually change.
stinfo = os.stat(policy_file_path)
os.utime(policy_file_path, (stinfo.st_atime + 42,
stinfo.st_mtime + 42))

self.enforcer.load_rules()

self.assertIsNotNone(self.enforcer.rules)
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.assertEqual('rule:bar', loaded_rules['foo'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
'policy.json',
os.path.join('policy.d', 'a.conf'),
])

def test_load_directory_opts_registered(self):
self._test_scenario_with_opts_registered(self.test_load_directory)

@@ -421,6 +463,7 @@ class EnforcerTest(base.PolicyBaseTestCase):
[os.path.join('policy.d', 'a.conf')],
group='oslo_policy')
self.assertRaises(ValueError, self.enforcer.load_rules, True)
self.assertRaises(ValueError, self.enforcer.load_rules, False)

@mock.patch('oslo_policy.policy.Enforcer.check_rules')
def test_load_rules_twice(self, mock_check_rules):


+ 8
- 0
releasenotes/notes/bug-1880959-8f1370a59759d40d.yaml View File

@@ -0,0 +1,8 @@
---
fixes:
- |
[`bug 1880959 <https://bugs.launchpad.net/keystone/+bug/1880959>`_]
The behavior of policy file reloading from policy directories was fixed.
Previously the rules from policy files located in the directories
specified in the ``policy_dirs`` option were not reapplied after the rules
from the primary policy file have been reapplied due to a change.

Loading…
Cancel
Save