Rules in policy directory files can be deleted.

Policy directory files can only add new rules or
update existing rules in cache, but cannot return back
loaded rules in memory to their default value.
This incorrect behavior was fixed in the patch.

Member "_loaded_files" of class Enforcer should keep
list of loaded policy config files paths.
In fact if the same file is changed many times
then the same file path is added many times.
If a file is deleted it's path not deleted from "_loaded_files".
The member is very misleading and is not used in code.
So this member was deleted in the patch because of
above mentioned resons.

Change-Id: I9ede38d8cf2ae968d3d8c0b1240bd6a51e6aa931
Closes-Bug: 1943584
This commit is contained in:
Mitya_Eremeev 2021-09-14 15:37:31 +03:00
parent c7fd9f4fcd
commit 949289e094
3 changed files with 77 additions and 62 deletions

View File

@ -545,7 +545,6 @@ class Enforcer(object):
self.use_conf = use_conf
self._need_check_rule = True
self.overwrite = overwrite
self._loaded_files = []
self._policy_dir_mtimes = {}
self._file_cache = {}
self._informed_no_policy_file = False
@ -586,7 +585,6 @@ class Enforcer(object):
self.set_rules({})
self.default_rule = None
self.policy_path = None
self._loaded_files = []
self._policy_dir_mtimes = {}
self._file_cache.clear()
self.registered_rules = {}
@ -627,22 +625,48 @@ class Enforcer(object):
overwrite=self.overwrite
)
force_reload_policy_dir = force_reload
force_reload_policy_dirs = force_reload
if policy_file_rules_changed:
force_reload_policy_dir = True
force_reload_policy_dirs = True
existing_policy_dirs = []
for path in self.conf.oslo_policy.policy_dirs:
try:
path = self._get_policy_path(path)
absolute_path = self._get_policy_path(path)
existing_policy_dirs.append(absolute_path)
except cfg.ConfigFilesNotFoundError:
continue
if (self._is_directory_updated(self._policy_dir_mtimes, path)
or force_reload_policy_dir):
# If change was made in any policy directory or main policy
# file then all policy directories and main file are
# re-calculated from scratch. We don't have separate rule sets
# for every policy folder, we only have the only rule set in
# RAM for all rule configs (self.rules). So it's the only way
# to be consistent.
if self._is_directory_updated(self._policy_dir_mtimes,
absolute_path):
force_reload_policy_dirs = True
if force_reload_policy_dirs and existing_policy_dirs:
# Here we realize that some policy folders or main policy file
# were changed and we need to recalculate all rules from
# scratch.
# If policy_file_rules_changed is True then we know:
# 1. all rules were already reset.
# 2. rules from main policy file were already applied.
# Otherwise main policy file was not changed and rules were not
# reset and. So we reset rules and force to re-calculate
# rules in main policy file. And after that we apply rules
# from every policy directory.
if self.policy_path:
if not policy_file_rules_changed:
self._load_policy_file(path=self.policy_path,
force_reload=True,
overwrite=self.overwrite
)
else:
self.rules = Rules(default_rule=self.default_rule)
for path in existing_policy_dirs:
self._walk_through_policy_directory(
path,
self._load_policy_file,
force_reload_policy_dir, False
)
path, self._load_policy_file, True, False)
for default in self.registered_rules.values():
if default.deprecated_for_removal:
@ -917,7 +941,6 @@ class Enforcer(object):
self.set_rules(rules, overwrite=overwrite, use_conf=True)
rules_changed = True
self._record_file_rules(data, overwrite)
self._loaded_files.append(path)
LOG.debug('Reloaded policy file: %(path)s', {'path': path})
return rules_changed

View File

@ -238,13 +238,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
super(EnforcerTest, self).setUp()
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
def check_loaded_files(self, filenames):
self.assertEqual(
[self.get_config_file_fullname(n)
for n in filenames],
self.enforcer._loaded_files
)
def _test_scenario_with_opts_registered(self, scenario, *args, **kwargs):
# This test registers some rules, calls the scenario and then checks
# the registered rules. The scenario should be a method which loads
@ -291,11 +284,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'b.conf'),
])
def test_load_directory_after_file_update(self):
self.create_config_file(
@ -305,10 +293,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
new_policy_json_contents = jsonutils.dumps({
"default": "rule:admin",
"admin": "is_admin:True",
@ -332,12 +316,41 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.assertEqual('rule:bar', loaded_rules['foo'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_after_file_is_emptied(self):
def dict_rules(enforcer_rules):
"""Converts enforcer rules to dictionary.
:param enforcer_rules: enforcer rules represented as a class Rules
:return: enforcer rules represented as a dictionary
"""
return jsonutils.loads(str(enforcer_rules))
self.assertEqual(self.enforcer.rules, {})
self.enforcer.load_rules()
main_policy_file_rules = jsonutils.loads(POLICY_JSON_CONTENTS)
self.assertEqual(main_policy_file_rules,
dict_rules(self.enforcer.rules))
folder_policy_file = os.path.join('policy.d', 'a.conf')
self.create_config_file(folder_policy_file, POLICY_A_CONTENTS)
self.enforcer.load_rules()
expected_rules = main_policy_file_rules.copy()
expected_rules.update(jsonutils.loads(POLICY_A_CONTENTS))
self.assertEqual(expected_rules, dict_rules(self.enforcer.rules))
self.create_config_file(folder_policy_file, '{}')
# Force the mtime change since the unit test may write to this file
# too fast for mtime to actually change.
absolute_folder_policy_file_path = self.get_config_file_fullname(
folder_policy_file)
stinfo = os.stat(absolute_folder_policy_file_path)
os.utime(absolute_folder_policy_file_path,
(stinfo.st_atime + 42, stinfo.st_mtime + 42))
self.enforcer.load_rules()
self.assertEqual(main_policy_file_rules,
dict_rules(self.enforcer.rules))
def test_load_directory_opts_registered(self):
self._test_scenario_with_opts_registered(self.test_load_directory)
@ -364,11 +377,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_caching_with_files_updated_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -392,10 +400,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_caching_with_files_same_but_overwrite_false(self):
self.test_load_directory_caching_with_files_same(overwrite=False)
@ -453,12 +457,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeC', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'b.conf'),
os.path.join('policy.2.d', 'fake.conf'),
])
def test_load_multiple_directories_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -474,8 +472,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIsNotNone(self.enforcer.rules)
self.assertIn('default', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules)
self.check_loaded_files(
['policy.json', os.path.join('policy.d', 'a.conf')])
def test_load_non_existed_directory_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -1011,13 +1007,6 @@ class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerNoPolicyFileTest, self).setUp()
def check_loaded_files(self, filenames):
self.assertEqual(
[self.get_config_file_fullname(n)
for n in filenames],
self.enforcer._loaded_files
)
def test_load_rules(self):
# Check that loading rules with no policy file does not error
self.enforcer.load_rules(True)
@ -1043,10 +1032,6 @@ class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.d/a.conf',
'policy.d/b.conf',
])
class CheckFunctionTestCase(base.PolicyBaseTestCase):

View File

@ -0,0 +1,7 @@
---
fixes:
- |
[`bug 1943584 <https://bugs.launchpad.net/oslo.policy/+bug/1943584>`_]
If file in policy directory was emptied, rules were not re-calculated. The
only workaround was to restart an application. Now rules are re-calculated
"on the fly", without app restart.