Merge "Adds nova-policy-check cmd"
This commit is contained in:
commit
c52920aced
174
nova/cmd/policy_check.py
Normal file
174
nova/cmd/policy_check.py
Normal file
@ -0,0 +1,174 @@
|
||||
# Copyright 2016 Cloudbase Solutions Srl
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
CLI interface for nova policy rule commands.
|
||||
"""
|
||||
|
||||
import functools
|
||||
import os
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from nova.cmd import common as cmd_common
|
||||
import nova.conf
|
||||
from nova import config
|
||||
from nova import context as nova_context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
from nova import policies
|
||||
from nova import version
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
cli_opts = [
|
||||
cfg.ListOpt(
|
||||
'os-roles',
|
||||
metavar='<auth-roles>',
|
||||
default=os.environ.get('OS_ROLES'),
|
||||
help=_('Defaults to env[OS_ROLES].')),
|
||||
cfg.StrOpt(
|
||||
'os-user-id',
|
||||
metavar='<auth-user-id>',
|
||||
default=os.environ.get('OS_USER_ID'),
|
||||
help=_('Defaults to env[OS_USER_ID].')),
|
||||
cfg.StrOpt(
|
||||
'os-tenant-id',
|
||||
metavar='<auth-tenant-id>',
|
||||
default=os.environ.get('OS_TENANT_ID'),
|
||||
help=_('Defaults to env[OS_TENANT_ID].')),
|
||||
]
|
||||
|
||||
|
||||
class PolicyCommands(object):
|
||||
"""Commands for policy rules."""
|
||||
|
||||
_ACCEPTABLE_TARGETS = [
|
||||
'project_id', 'user_id', 'quota_class', 'availability_zone',
|
||||
'instance_id']
|
||||
|
||||
@cmd_common.args('--api-name', dest='api_name', metavar='<API name>',
|
||||
help='Will return only passing policy rules containing '
|
||||
'the given API name.')
|
||||
@cmd_common.args('--target', nargs='+', dest='target', metavar='<Target>',
|
||||
help='Will return only passing policy rules for the '
|
||||
'given target. The available targets are %s. When '
|
||||
'"instance_id" is used, the other targets will be '
|
||||
'overwritten.' % ','.join(_ACCEPTABLE_TARGETS))
|
||||
def check(self, api_name=None, target=None):
|
||||
"""Prints all passing policy rules for the given user.
|
||||
|
||||
:param api_name: If None, all passing policy rules will be printed,
|
||||
otherwise, only passing policies that contain the
|
||||
given api_name in their names.
|
||||
:param target: The target against which the policy rule authorization
|
||||
will be tested. If None, the given user will be
|
||||
considered as the target.
|
||||
"""
|
||||
context = self._get_context()
|
||||
api_name = api_name or ''
|
||||
target = self._get_target(context, target)
|
||||
|
||||
allowed_operations = self._filter_rules(context, api_name, target)
|
||||
|
||||
if allowed_operations:
|
||||
print('\n'.join(allowed_operations))
|
||||
return 0
|
||||
else:
|
||||
print('No rules matched or allowed')
|
||||
return 1
|
||||
|
||||
def _get_context(self):
|
||||
return nova_context.RequestContext(
|
||||
roles=CONF.os_roles,
|
||||
user_id=CONF.os_user_id,
|
||||
project_id=CONF.os_tenant_id)
|
||||
|
||||
def _get_target(self, context, target):
|
||||
"""Processes and validates the CLI given target and adapts it for
|
||||
policy authorization.
|
||||
|
||||
:returns: None if the given target is None, otherwise returns a proper
|
||||
authorization target.
|
||||
:raises nova.exception.InvalidAttribute: if a key in the given target
|
||||
is not an acceptable.
|
||||
:raises nova.exception.InstanceNotFound: if 'instance_id' is given, and
|
||||
there is no instance match the id.
|
||||
"""
|
||||
if not target:
|
||||
return None
|
||||
|
||||
new_target = {}
|
||||
for t in target:
|
||||
key, value = t.split('=')
|
||||
if key not in self._ACCEPTABLE_TARGETS:
|
||||
raise exception.InvalidAttribute(attr=key)
|
||||
new_target[key] = value
|
||||
|
||||
# if the target is an instance_id, return an instance instead.
|
||||
instance_id = new_target.get('instance_id')
|
||||
if instance_id:
|
||||
admin_ctxt = nova_context.get_admin_context()
|
||||
instance = db.instance_get_by_uuid(admin_ctxt, instance_id)
|
||||
new_target = {'user_id': instance['user_id'],
|
||||
'project_id': instance['project_id']}
|
||||
|
||||
return new_target
|
||||
|
||||
def _filter_rules(self, context, api_name, target):
|
||||
all_rules = policies.list_rules()
|
||||
return [rule.name for rule in all_rules if api_name in rule.name and
|
||||
context.can(rule.name, target, fatal=False)]
|
||||
|
||||
|
||||
CATEGORIES = {
|
||||
'policy': PolicyCommands,
|
||||
}
|
||||
|
||||
|
||||
add_command_parsers = functools.partial(cmd_common.add_command_parsers,
|
||||
categories=CATEGORIES)
|
||||
|
||||
|
||||
category_opt = cfg.SubCommandOpt('category',
|
||||
title='Command categories',
|
||||
help='Available categories',
|
||||
handler=add_command_parsers)
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse options and call the appropriate class/method."""
|
||||
CONF.register_cli_opts(cli_opts)
|
||||
CONF.register_cli_opt(category_opt)
|
||||
config.parse_args(sys.argv)
|
||||
|
||||
if CONF.category.name == "version":
|
||||
print(version.version_string_with_package())
|
||||
return 0
|
||||
|
||||
if CONF.category.name == "bash-completion":
|
||||
cmd_common.print_bash_completion(CATEGORIES)
|
||||
return 0
|
||||
|
||||
try:
|
||||
fn, fn_args, fn_kwargs = cmd_common.get_action_fn()
|
||||
ret = fn(*fn_args, **fn_kwargs)
|
||||
return(ret)
|
||||
except Exception as ex:
|
||||
print(_("error: %s") % ex)
|
||||
return 1
|
198
nova/tests/unit/cmd/test_policy_check.py
Normal file
198
nova/tests/unit/cmd/test_policy_check.py
Normal file
@ -0,0 +1,198 @@
|
||||
# Copyright 2016 Cloudbase Solutions Srl
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Unit tests for the nova-policy-check CLI interfaces.
|
||||
"""
|
||||
|
||||
import mock
|
||||
|
||||
from nova.cmd import policy_check
|
||||
import nova.conf
|
||||
from nova import context as nova_context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.policies import base as base_policies
|
||||
from nova.policies import instance_actions as ia_policies
|
||||
from nova import test
|
||||
from nova.tests import fixtures
|
||||
from nova.tests.unit import fake_instance
|
||||
from nova.tests.unit import policy_fixture
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
class TestPolicyCheck(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPolicyCheck, self).setUp()
|
||||
self.output = self.useFixture(fixtures.OutputStreamCapture())
|
||||
self.policy = self.useFixture(policy_fixture.RealPolicyFixture())
|
||||
self.cmd = policy_check.PolicyCommands()
|
||||
|
||||
@mock.patch.object(policy_check.PolicyCommands, '_filter_rules')
|
||||
@mock.patch.object(policy_check.PolicyCommands, '_get_target')
|
||||
@mock.patch.object(policy_check.PolicyCommands, '_get_context')
|
||||
def test_check(self, mock_get_context, mock_get_target,
|
||||
mock_filter_rules):
|
||||
fake_rules = ['fake:rule', 'faux:roule']
|
||||
mock_filter_rules.return_value = fake_rules
|
||||
|
||||
self.cmd.check(target=mock.sentinel.target)
|
||||
|
||||
mock_get_context.assert_called_once_with()
|
||||
mock_get_target.assert_called_once_with(mock_get_context.return_value,
|
||||
mock.sentinel.target)
|
||||
mock_filter_rules.assert_called_once_with(
|
||||
mock_get_context.return_value, '', mock_get_target.return_value)
|
||||
self.assertEqual('\n'.join(fake_rules), self.output.stdout)
|
||||
|
||||
@mock.patch.object(nova_context, 'RequestContext')
|
||||
@mock.patch.object(policy_check, 'CONF')
|
||||
def test_get_context(self, mock_CONF, mock_RequestContext):
|
||||
context = self.cmd._get_context()
|
||||
|
||||
self.assertEqual(mock_RequestContext.return_value, context)
|
||||
mock_RequestContext.assert_called_once_with(
|
||||
roles=mock_CONF.os_roles,
|
||||
user_id=mock_CONF.os_user_id,
|
||||
project_id=mock_CONF.os_tenant_id)
|
||||
|
||||
def test_get_target_none(self):
|
||||
target = self.cmd._get_target(mock.sentinel.context, None)
|
||||
self.assertIsNone(target)
|
||||
|
||||
def test_get_target_invalid_attribute(self):
|
||||
self.assertRaises(exception.InvalidAttribute, self.cmd._get_target,
|
||||
mock.sentinel.context, ['nope=nada'])
|
||||
|
||||
def test_get_target(self):
|
||||
expected_target = {
|
||||
'project_id': 'fake-proj',
|
||||
'user_id': 'fake-user',
|
||||
'quota_class': 'fake-quota-class',
|
||||
'availability_zone': 'fake-az',
|
||||
}
|
||||
given_target = ['='.join([key, val])
|
||||
for key, val in expected_target.items()]
|
||||
|
||||
actual_target = self.cmd._get_target(mock.sentinel.context,
|
||||
given_target)
|
||||
self.assertDictEqual(expected_target, actual_target)
|
||||
|
||||
@mock.patch.object(nova_context, 'get_admin_context')
|
||||
@mock.patch.object(db, 'instance_get_by_uuid')
|
||||
def test_get_target_instance(self, mock_instance_get,
|
||||
mock_get_admin_context):
|
||||
admin_context = nova_context.RequestContext(is_admin=True)
|
||||
mock_get_admin_context.return_value = admin_context
|
||||
given_target = ['instance_id=fake_id']
|
||||
mock_instance_get.return_value = fake_instance.fake_db_instance()
|
||||
target = self.cmd._get_target(mock.sentinel.context,
|
||||
given_target)
|
||||
self.assertEqual(target,
|
||||
{'user_id': 'fake-user', 'project_id': 'fake-project'})
|
||||
mock_instance_get.assert_called_once_with(admin_context,
|
||||
'fake_id')
|
||||
|
||||
def _check_filter_rules(self, context=None, target=None,
|
||||
expected_rules=None):
|
||||
context = context or nova_context.get_admin_context()
|
||||
expected_rules = expected_rules or [
|
||||
r.name for r in ia_policies.list_rules()]
|
||||
|
||||
passing_rules = self.cmd._filter_rules(
|
||||
context, 'os-instance-actions', target)
|
||||
self.assertEqual(set(expected_rules), set(passing_rules))
|
||||
|
||||
def test_filter_rules_non_admin(self):
|
||||
context = nova_context.RequestContext()
|
||||
rule_conditions = [base_policies.RULE_ANY,
|
||||
base_policies.RULE_ADMIN_OR_OWNER]
|
||||
expected_rules = [r.name for r in ia_policies.list_rules() if
|
||||
r.check_str in rule_conditions]
|
||||
self._check_filter_rules(context, expected_rules=expected_rules)
|
||||
|
||||
def test_filter_rules_admin(self):
|
||||
self._check_filter_rules()
|
||||
|
||||
def test_filter_rules_instance_non_admin(self):
|
||||
db_context = nova_context.RequestContext(user_id='fake-user',
|
||||
project_id='fake-project')
|
||||
instance = fake_instance.fake_instance_obj(db_context)
|
||||
context = nova_context.RequestContext()
|
||||
expected_rules = [r.name for r in ia_policies.list_rules() if
|
||||
r.check_str == base_policies.RULE_ANY]
|
||||
self._check_filter_rules(context, instance, expected_rules)
|
||||
|
||||
def test_filter_rules_instance_admin(self):
|
||||
db_context = nova_context.RequestContext(user_id='fake-user',
|
||||
project_id='fake-project')
|
||||
instance = fake_instance.fake_instance_obj(db_context)
|
||||
self._check_filter_rules(target=instance)
|
||||
|
||||
def test_filter_rules_instance_owner(self):
|
||||
db_context = nova_context.RequestContext(user_id='fake-user',
|
||||
project_id='fake-project')
|
||||
instance = fake_instance.fake_instance_obj(db_context)
|
||||
rule_conditions = [base_policies.RULE_ANY,
|
||||
base_policies.RULE_ADMIN_OR_OWNER]
|
||||
expected_rules = [r.name for r in ia_policies.list_rules() if
|
||||
r.check_str in rule_conditions]
|
||||
self._check_filter_rules(db_context, instance, expected_rules)
|
||||
|
||||
@mock.patch.object(policy_check.config, 'parse_args')
|
||||
@mock.patch.object(policy_check, 'CONF')
|
||||
def _check_main(self, mock_CONF, mock_parse_args,
|
||||
category_name='check', expected_return_value=0):
|
||||
mock_CONF.category.name = category_name
|
||||
return_value = policy_check.main()
|
||||
|
||||
self.assertEqual(expected_return_value, return_value)
|
||||
mock_CONF.register_cli_opts.assert_called_once_with(
|
||||
policy_check.cli_opts)
|
||||
mock_CONF.register_cli_opt.assert_called_once_with(
|
||||
policy_check.category_opt)
|
||||
|
||||
@mock.patch.object(policy_check.version, 'version_string_with_package',
|
||||
return_value="x.x.x")
|
||||
def test_main_version(self, mock_version_string):
|
||||
self._check_main(category_name='version')
|
||||
self.assertEqual("x.x.x", self.output.stdout)
|
||||
|
||||
@mock.patch.object(policy_check.cmd_common, 'print_bash_completion')
|
||||
def test_main_bash_completion(self, mock_print_bash):
|
||||
self._check_main(category_name='bash-completion')
|
||||
mock_print_bash.assert_called_once_with(policy_check.CATEGORIES)
|
||||
|
||||
@mock.patch.object(policy_check.cmd_common, 'get_action_fn')
|
||||
def test_main(self, mock_get_action_fn):
|
||||
mock_fn = mock.Mock()
|
||||
mock_fn_args = [mock.sentinel.arg]
|
||||
mock_fn_kwargs = {'key': mock.sentinel.value}
|
||||
mock_get_action_fn.return_value = (mock_fn, mock_fn_args,
|
||||
mock_fn_kwargs)
|
||||
|
||||
self._check_main(expected_return_value=mock_fn.return_value)
|
||||
mock_fn.assert_called_once_with(mock.sentinel.arg,
|
||||
key=mock.sentinel.value)
|
||||
|
||||
@mock.patch.object(policy_check.cmd_common, 'get_action_fn')
|
||||
def test_main_error(self, mock_get_action_fn):
|
||||
mock_fn = mock.Mock(side_effect=Exception)
|
||||
mock_get_action_fn.return_value = (mock_fn, [], {})
|
||||
|
||||
self._check_main(expected_return_value=1)
|
||||
self.assertIn("error: ", self.output.stdout)
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- The nova-policy command line is implemented as a tool to experience the
|
||||
under-development feature policy discovery. User can input the credentials
|
||||
infomation and the instance info, the tool will return a list of API which
|
||||
can be allowed to invoke. There isn't any contract for the interface of
|
||||
the tool due to the feature still under-development.
|
@ -62,6 +62,7 @@ console_scripts =
|
||||
nova-manage = nova.cmd.manage:main
|
||||
nova-network = nova.cmd.network:main
|
||||
nova-novncproxy = nova.cmd.novncproxy:main
|
||||
nova-policy = nova.cmd.policy_check:main
|
||||
nova-rootwrap = oslo_rootwrap.cmd:main
|
||||
nova-rootwrap-daemon = oslo_rootwrap.cmd:daemon
|
||||
nova-scheduler = nova.cmd.scheduler:main
|
||||
|
Loading…
Reference in New Issue
Block a user