fix checking if enforcer was loaded

enforcer for policy can be loaded from a single file or from multiple
files (policy_dir). so checking for policy_file is not good enough.

If a policy is loaded it will have some rules, if not loaded then,
oslo_policy rejects all acccess, but it is not saved to the
_ENFORCER object, which is holds the objects used for enforcing
policies. So checking for existance of rules is a better check.

Some refactoring for better logging
added tests: test_nonexisting_policy_file_load

Change-Id: Id1f65058014ef5b14449b502d6741da9d34767b3
Closes-Bug: 1804174
This commit is contained in:
Sumit Jamgade 2018-11-19 17:01:44 +01:00 committed by Akihiro Motoki
parent d3f9ca8373
commit 96912aea91
2 changed files with 28 additions and 11 deletions

View File

@ -35,7 +35,6 @@ def _get_policy_conf(policy_file, policy_dirs=None):
# Passing [] is required. Otherwise oslo.config looks up sys.argv.
conf([])
policy_opts.set_defaults(conf)
policy_file = os.path.join(_BASE_PATH, policy_file)
conf.set_default('policy_file', policy_file, 'oslo_policy')
# Policy Enforcer has been updated to take in a policy directory
# as a config option. However, the default value in is set to
@ -43,30 +42,39 @@ def _get_policy_conf(policy_file, policy_dirs=None):
# value to empty list for now.
if policy_dirs is None:
policy_dirs = []
policy_dirs = [os.path.join(_BASE_PATH, policy_dir)
for policy_dir in policy_dirs]
conf.set_default('policy_dirs', policy_dirs, 'oslo_policy')
return conf
def _get_policy_file_with_full_path(service):
policy_files = getattr(settings, 'POLICY_FILES', {})
policy_file = os.path.join(_BASE_PATH, policy_files[service])
policy_dirs = getattr(settings, 'POLICY_DIRS', {}).get(service, [])
policy_dirs = [os.path.join(_BASE_PATH, policy_dir)
for policy_dir in policy_dirs]
return policy_file, policy_dirs
def _get_enforcer():
global _ENFORCER
if not _ENFORCER:
_ENFORCER = {}
policy_files = getattr(settings, 'POLICY_FILES', {})
policy_dirs = getattr(settings, 'POLICY_DIRS', {})
for service in policy_files:
conf = _get_policy_conf(policy_file=policy_files[service],
policy_dirs=policy_dirs.get(service, []))
for service in policy_files.keys():
policy_file, policy_dirs = _get_policy_file_with_full_path(service)
conf = _get_policy_conf(policy_file, policy_dirs)
enforcer = policy.Enforcer(conf)
# Ensure enforcer.policy_path is populated.
enforcer.load_rules()
if os.path.isfile(enforcer.policy_path):
# Ensure enforcer.rules is populated.
if enforcer.rules:
LOG.debug("adding enforcer for service: %s", service)
_ENFORCER[service] = enforcer
else:
LOG.warning("policy file for service: %s not found at %s",
(service, enforcer.policy_path))
locations = policy_file
if policy_dirs:
locations += ' and files under %s' % policy_dirs
LOG.warning("No policy rules for service '%s' in %s",
service, locations)
return _ENFORCER

View File

@ -28,6 +28,15 @@ class PolicyLoaderTestCase(test.TestCase):
self.assertIn('identity', enforcer)
self.assertIn('compute', enforcer)
def test_nonexisting_policy_file_load(self):
policy_files = {
'dinosaur': 'no_godzilla.json',
}
policy.reset()
with self.settings(POLICY_FILES=policy_files):
enforcer = policy._get_enforcer()
self.assertEqual(0, len(enforcer))
def test_policy_reset(self):
policy._get_enforcer()
self.assertEqual(2, len(policy._ENFORCER))