diff --git a/oslo_policy/generator.py b/oslo_policy/generator.py index 1a09ec0e..0ca7d1d0 100644 --- a/oslo_policy/generator.py +++ b/oslo_policy/generator.py @@ -31,6 +31,10 @@ RULE_OPTS = [ required=True, help='Option namespace(s) under "oslo.policy.policies" in ' 'which to query for options.'), + cfg.StrOpt('format', + help='Desired format for the output.', + default='yaml', + choices=['json', 'yaml']), ] ENFORCER_OPTS = [ @@ -109,12 +113,15 @@ def _format_rule_default_yaml(default, include_help=True): text = ('"%(name)s": "%(check_str)s"\n' % {'name': default.name, 'check_str': default.check_str}) - op = "" - if hasattr(default, 'operations'): - for operation in default.operations: - op += ('# %(method)s %(path)s\n' % - {'method': operation['method'], 'path': operation['path']}) + if include_help: + op = "" + if hasattr(default, 'operations'): + for operation in default.operations: + op += ('# %(method)s %(path)s\n' % + {'method': operation['method'], + 'path': operation['path']}) + text = ('%(help)s\n%(op)s#%(text)s\n' % {'help': _format_help_text(default.description), 'op': op, @@ -123,7 +130,19 @@ def _format_rule_default_yaml(default, include_help=True): return text -def _sort_and_format_by_section(policies, include_help=True): +def _format_rule_default_json(default): + """Create a json node from policy.RuleDefault or policy.DocumentedRuleDefault. + + :param default: A policy.RuleDefault or policy.DocumentedRuleDefault object + :returns: A string containing a json representation of the RuleDefault + """ + return ('"%(name)s": "%(check_str)s"' % + {'name': default.name, + 'check_str': default.check_str}) + + +def _sort_and_format_by_section(policies, output_format='yaml', + include_help=True): """Generate a list of policy section texts The text for a section will be created and returned one at a time. The @@ -134,15 +153,20 @@ def _sort_and_format_by_section(policies, include_help=True): :param policies: A dict of {section1: [rule_default_1, rule_default_2], section2: [rule_default_3]} + :param output_format: The format of the file to output to. """ for section in sorted(policies.keys()): rule_defaults = policies[section] for rule_default in rule_defaults: - yield _format_rule_default_yaml(rule_default, - include_help=include_help) + if output_format == 'yaml': + yield _format_rule_default_yaml(rule_default, + include_help=include_help) + elif output_format == 'json': + yield _format_rule_default_json(rule_default) -def _generate_sample(namespaces, output_file=None, include_help=True): +def _generate_sample(namespaces, output_file=None, output_format='yaml', + include_help=True): """Generate a sample policy file. List all of the policies available via the namespace specified in the @@ -152,6 +176,7 @@ def _generate_sample(namespaces, output_file=None, include_help=True): 'oslo.policy.policies'. Stevedore will look here for policy options. :param output_file: The path of a file to output to. stdout used if None. + :param output_format: The format of the file to output to. :param include_help: True, generates a sample-policy file with help text along with rules in which everything is commented out. False, generates a sample-policy file with only rules. @@ -161,9 +186,18 @@ def _generate_sample(namespaces, output_file=None, include_help=True): output_file = (open(output_file, 'w') if output_file else sys.stdout) - for section in _sort_and_format_by_section(policies, + sections_text = [] + for section in _sort_and_format_by_section(policies, output_format, include_help=include_help): - output_file.write(section) + sections_text.append(section) + + if output_format == 'yaml': + output_file.writelines(sections_text) + elif output_format == 'json': + output_file.writelines(( + '{\n ', + ',\n '.join(sections_text), + '\n}\n')) def _generate_policy(namespace, output_file=None): @@ -221,7 +255,7 @@ def generate_sample(args=None): conf.register_cli_opts(GENERATOR_OPTS + RULE_OPTS) conf.register_opts(GENERATOR_OPTS + RULE_OPTS) conf(args) - _generate_sample(conf.namespace, conf.output_file) + _generate_sample(conf.namespace, conf.output_file, conf.format) def generate_policy(args=None): diff --git a/oslo_policy/tests/test_generator.py b/oslo_policy/tests/test_generator.py index 7452f3fc..97a4f594 100644 --- a/oslo_policy/tests/test_generator.py +++ b/oslo_policy/tests/test_generator.py @@ -44,9 +44,9 @@ OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True', } -class GenerateSampleTestCase(base.PolicyBaseTestCase): +class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase): def setUp(self): - super(GenerateSampleTestCase, self).setUp() + super(GenerateSampleYAMLTestCase, self).setUp() self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml') def _capture_stdout(self): @@ -195,6 +195,109 @@ class GenerateSampleTestCase(base.PolicyBaseTestCase): self.assertEqual(expected, written_policy) +class GenerateSampleJSONTestCase(base.PolicyBaseTestCase): + def setUp(self): + super(GenerateSampleJSONTestCase, self).setUp() + self.enforcer = policy.Enforcer(self.conf, policy_file='policy.json') + + def _capture_stdout(self): + self.useFixture(fixtures.MonkeyPatch('sys.stdout', moves.StringIO())) + return sys.stdout + + def test_generate_loadable_json(self): + extensions = [] + for name, opts in OPTS.items(): + ext = stevedore.extension.Extension(name=name, entry_point=None, + plugin=None, obj=opts) + extensions.append(ext) + test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( + extensions=extensions, namespace=['base_rules', 'rules']) + + output_file = self.get_config_file_fullname('policy.json') + with mock.patch('stevedore.named.NamedExtensionManager', + return_value=test_mgr) as mock_ext_mgr: + # generate sample-policy file with only rules + generator._generate_sample(['base_rules', 'rules'], output_file, + output_format='json', + include_help=False) + mock_ext_mgr.assert_called_once_with( + 'oslo.policy.policies', names=['base_rules', 'rules'], + on_load_failure_callback=generator.on_load_failure_callback, + invoke_on_load=True) + + self.enforcer.load_rules() + + self.assertIn('owner', self.enforcer.rules) + self.assertIn('admin', self.enforcer.rules) + self.assertIn('admin_or_owner', self.enforcer.rules) + self.assertEqual('project_id:%(project_id)s', + str(self.enforcer.rules['owner'])) + self.assertEqual('is_admin:True', str(self.enforcer.rules['admin'])) + self.assertEqual('(rule:admin or rule:owner)', + str(self.enforcer.rules['admin_or_owner'])) + + def test_expected_content(self): + extensions = [] + for name, opts in OPTS.items(): + ext = stevedore.extension.Extension(name=name, entry_point=None, + plugin=None, obj=opts) + extensions.append(ext) + test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( + extensions=extensions, namespace=['base_rules', 'rules']) + + expected = '''{ + "admin": "is_admin:True", + "owner": "project_id:%(project_id)s", + "shared": "field:networks:shared=True", + "admin_or_owner": "rule:admin or rule:owner" +} +''' + output_file = self.get_config_file_fullname('policy.json') + with mock.patch('stevedore.named.NamedExtensionManager', + return_value=test_mgr) as mock_ext_mgr: + generator._generate_sample(['base_rules', 'rules'], + output_file=output_file, + output_format='json') + mock_ext_mgr.assert_called_once_with( + 'oslo.policy.policies', names=['base_rules', 'rules'], + on_load_failure_callback=generator.on_load_failure_callback, + invoke_on_load=True) + + with open(output_file, 'r') as written_file: + written_policy = written_file.read() + + self.assertEqual(expected, written_policy) + + def test_expected_content_stdout(self): + extensions = [] + for name, opts in OPTS.items(): + ext = stevedore.extension.Extension(name=name, entry_point=None, + plugin=None, obj=opts) + extensions.append(ext) + test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( + extensions=extensions, namespace=['base_rules', 'rules']) + + expected = '''{ + "admin": "is_admin:True", + "owner": "project_id:%(project_id)s", + "shared": "field:networks:shared=True", + "admin_or_owner": "rule:admin or rule:owner" +} +''' + stdout = self._capture_stdout() + with mock.patch('stevedore.named.NamedExtensionManager', + return_value=test_mgr) as mock_ext_mgr: + generator._generate_sample(['base_rules', 'rules'], + output_file=None, + output_format='json') + mock_ext_mgr.assert_called_once_with( + 'oslo.policy.policies', names=['base_rules', 'rules'], + on_load_failure_callback=generator.on_load_failure_callback, + invoke_on_load=True) + + self.assertEqual(expected, stdout.getvalue()) + + class GeneratorRaiseErrorTestCase(testtools.TestCase): def test_generator_raises_error(self): """Verifies that errors from extension manager are not suppressed."""