Add JSON output option to sample generator

Add JSON output option to the oslopolicy-sample-generator command
to generate JSON output.

Change-Id: Iba9d5b0c8b86bddee5cbbefe3fbc81a113bc88eb
Closes-Bug: #1607968
Signed-off-by: Sami Makki <mail@samimakki.fr>
This commit is contained in:
Sami Makki 2017-08-09 16:43:31 +02:00
parent cc30bf7466
commit f874f26e44
2 changed files with 151 additions and 14 deletions

View File

@ -31,6 +31,10 @@ RULE_OPTS = [
required=True,
help='Option namespace(s) under "oslo.policy.policies" in '
'which to query for options.'),
cfg.StrOpt('format',
help='Desired format for the output.',
default='yaml',
choices=['json', 'yaml']),
]
ENFORCER_OPTS = [
@ -109,12 +113,15 @@ def _format_rule_default_yaml(default, include_help=True):
text = ('"%(name)s": "%(check_str)s"\n' %
{'name': default.name,
'check_str': default.check_str})
op = ""
if hasattr(default, 'operations'):
for operation in default.operations:
op += ('# %(method)s %(path)s\n' %
{'method': operation['method'], 'path': operation['path']})
if include_help:
op = ""
if hasattr(default, 'operations'):
for operation in default.operations:
op += ('# %(method)s %(path)s\n' %
{'method': operation['method'],
'path': operation['path']})
text = ('%(help)s\n%(op)s#%(text)s\n' %
{'help': _format_help_text(default.description),
'op': op,
@ -123,7 +130,19 @@ def _format_rule_default_yaml(default, include_help=True):
return text
def _sort_and_format_by_section(policies, include_help=True):
def _format_rule_default_json(default):
"""Create a json node from policy.RuleDefault or policy.DocumentedRuleDefault.
:param default: A policy.RuleDefault or policy.DocumentedRuleDefault object
:returns: A string containing a json representation of the RuleDefault
"""
return ('"%(name)s": "%(check_str)s"' %
{'name': default.name,
'check_str': default.check_str})
def _sort_and_format_by_section(policies, output_format='yaml',
include_help=True):
"""Generate a list of policy section texts
The text for a section will be created and returned one at a time. The
@ -134,15 +153,20 @@ def _sort_and_format_by_section(policies, include_help=True):
:param policies: A dict of {section1: [rule_default_1, rule_default_2],
section2: [rule_default_3]}
:param output_format: The format of the file to output to.
"""
for section in sorted(policies.keys()):
rule_defaults = policies[section]
for rule_default in rule_defaults:
yield _format_rule_default_yaml(rule_default,
include_help=include_help)
if output_format == 'yaml':
yield _format_rule_default_yaml(rule_default,
include_help=include_help)
elif output_format == 'json':
yield _format_rule_default_json(rule_default)
def _generate_sample(namespaces, output_file=None, include_help=True):
def _generate_sample(namespaces, output_file=None, output_format='yaml',
include_help=True):
"""Generate a sample policy file.
List all of the policies available via the namespace specified in the
@ -152,6 +176,7 @@ def _generate_sample(namespaces, output_file=None, include_help=True):
'oslo.policy.policies'. Stevedore will look here for
policy options.
:param output_file: The path of a file to output to. stdout used if None.
:param output_format: The format of the file to output to.
:param include_help: True, generates a sample-policy file with help text
along with rules in which everything is commented out.
False, generates a sample-policy file with only rules.
@ -161,9 +186,18 @@ def _generate_sample(namespaces, output_file=None, include_help=True):
output_file = (open(output_file, 'w') if output_file
else sys.stdout)
for section in _sort_and_format_by_section(policies,
sections_text = []
for section in _sort_and_format_by_section(policies, output_format,
include_help=include_help):
output_file.write(section)
sections_text.append(section)
if output_format == 'yaml':
output_file.writelines(sections_text)
elif output_format == 'json':
output_file.writelines((
'{\n ',
',\n '.join(sections_text),
'\n}\n'))
def _generate_policy(namespace, output_file=None):
@ -221,7 +255,7 @@ def generate_sample(args=None):
conf.register_cli_opts(GENERATOR_OPTS + RULE_OPTS)
conf.register_opts(GENERATOR_OPTS + RULE_OPTS)
conf(args)
_generate_sample(conf.namespace, conf.output_file)
_generate_sample(conf.namespace, conf.output_file, conf.format)
def generate_policy(args=None):

View File

@ -44,9 +44,9 @@ OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True',
}
class GenerateSampleTestCase(base.PolicyBaseTestCase):
class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(GenerateSampleTestCase, self).setUp()
super(GenerateSampleYAMLTestCase, self).setUp()
self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml')
def _capture_stdout(self):
@ -195,6 +195,109 @@ class GenerateSampleTestCase(base.PolicyBaseTestCase):
self.assertEqual(expected, written_policy)
class GenerateSampleJSONTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(GenerateSampleJSONTestCase, self).setUp()
self.enforcer = policy.Enforcer(self.conf, policy_file='policy.json')
def _capture_stdout(self):
self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO()))
return sys.stdout
def test_generate_loadable_json(self):
extensions = []
for name, opts in OPTS.items():
ext = stevedore.extension.Extension(name=name, entry_point=None,
plugin=None, obj=opts)
extensions.append(ext)
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
extensions=extensions, namespace=['base_rules', 'rules'])
output_file = self.get_config_file_fullname('policy.json')
with mock.patch('stevedore.named.NamedExtensionManager',
return_value=test_mgr) as mock_ext_mgr:
# generate sample-policy file with only rules
generator._generate_sample(['base_rules', 'rules'], output_file,
output_format='json',
include_help=False)
mock_ext_mgr.assert_called_once_with(
'oslo.policy.policies', names=['base_rules', 'rules'],
on_load_failure_callback=generator.on_load_failure_callback,
invoke_on_load=True)
self.enforcer.load_rules()
self.assertIn('owner', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules)
self.assertIn('admin_or_owner', self.enforcer.rules)
self.assertEqual('project_id:%(project_id)s',
str(self.enforcer.rules['owner']))
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
self.assertEqual('(rule:admin or rule:owner)',
str(self.enforcer.rules['admin_or_owner']))
def test_expected_content(self):
extensions = []
for name, opts in OPTS.items():
ext = stevedore.extension.Extension(name=name, entry_point=None,
plugin=None, obj=opts)
extensions.append(ext)
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
extensions=extensions, namespace=['base_rules', 'rules'])
expected = '''{
"admin": "is_admin:True",
"owner": "project_id:%(project_id)s",
"shared": "field:networks:shared=True",
"admin_or_owner": "rule:admin or rule:owner"
}
'''
output_file = self.get_config_file_fullname('policy.json')
with mock.patch('stevedore.named.NamedExtensionManager',
return_value=test_mgr) as mock_ext_mgr:
generator._generate_sample(['base_rules', 'rules'],
output_file=output_file,
output_format='json')
mock_ext_mgr.assert_called_once_with(
'oslo.policy.policies', names=['base_rules', 'rules'],
on_load_failure_callback=generator.on_load_failure_callback,
invoke_on_load=True)
with open(output_file, 'r') as written_file:
written_policy = written_file.read()
self.assertEqual(expected, written_policy)
def test_expected_content_stdout(self):
extensions = []
for name, opts in OPTS.items():
ext = stevedore.extension.Extension(name=name, entry_point=None,
plugin=None, obj=opts)
extensions.append(ext)
test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
extensions=extensions, namespace=['base_rules', 'rules'])
expected = '''{
"admin": "is_admin:True",
"owner": "project_id:%(project_id)s",
"shared": "field:networks:shared=True",
"admin_or_owner": "rule:admin or rule:owner"
}
'''
stdout = self._capture_stdout()
with mock.patch('stevedore.named.NamedExtensionManager',
return_value=test_mgr) as mock_ext_mgr:
generator._generate_sample(['base_rules', 'rules'],
output_file=None,
output_format='json')
mock_ext_mgr.assert_called_once_with(
'oslo.policy.policies', names=['base_rules', 'rules'],
on_load_failure_callback=generator.on_load_failure_callback,
invoke_on_load=True)
self.assertEqual(expected, stdout.getvalue())
class GeneratorRaiseErrorTestCase(testtools.TestCase):
def test_generator_raises_error(self):
"""Verifies that errors from extension manager are not suppressed."""