Merge "Implement Xena project personas for group_actions"

This commit is contained in:
Zuul 2021-09-15 05:23:03 +00:00 committed by Gerrit Code Review
commit c97a9362a0
4 changed files with 416 additions and 31 deletions

View File

@ -25,17 +25,48 @@ FAILOVER_REP = 'group:failover_replication'
LIST_REP = 'group:list_replication_targets'
DELETE_POLICY = 'group:delete'
deprecated_delete_group = base.CinderDeprecatedRule(
name=DELETE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_enable_replication = base.CinderDeprecatedRule(
name=ENABLE_REP,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_disable_replication = base.CinderDeprecatedRule(
name=DISABLE_REP,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_failover_replication = base.CinderDeprecatedRule(
name=FAILOVER_REP,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_list_replication = base.CinderDeprecatedRule(
name=LIST_REP,
check_str=base.RULE_ADMIN_OR_OWNER
)
# TODO(enriquetaso): update the following in Yoga.
# We're not deprecating the reset rule in Xena.
# deprecated_reset_status = base.CinderDeprecatedRule(
# name=RESET_STATUS,
# check_str=base.RULE_ADMIN_API
# )
group_actions_policies = [
policy.DocumentedRuleDefault(
name=DELETE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Delete group.",
operations=[
{
'method': 'POST',
'path': '/groups/{group_id}/action (delete)'
}
]),
],
deprecated_rule=deprecated_delete_group,
),
policy.DocumentedRuleDefault(
name=RESET_STATUS,
check_str=base.RULE_ADMIN_API,
@ -45,47 +76,56 @@ group_actions_policies = [
'method': 'POST',
'path': '/groups/{group_id}/action (reset_status)'
}
]),
]
),
policy.DocumentedRuleDefault(
name=ENABLE_REP,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Enable replication.",
operations=[
{
'method': 'POST',
'path': '/groups/{group_id}/action (enable_replication)'
}
]),
],
deprecated_rule=deprecated_enable_replication,
),
policy.DocumentedRuleDefault(
name=DISABLE_REP,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Disable replication.",
operations=[
{
'method': 'POST',
'path': '/groups/{group_id}/action (disable_replication)'
}
]),
],
deprecated_rule=deprecated_disable_replication,
),
policy.DocumentedRuleDefault(
name=FAILOVER_REP,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Fail over replication.",
operations=[
{
'method': 'POST',
'path': '/groups/{group_id}/action (failover_replication)'
}
]),
],
deprecated_rule=deprecated_failover_replication,
),
policy.DocumentedRuleDefault(
name=LIST_REP,
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="List failover replication.",
operations=[
{
'method': 'POST',
'path': '/groups/{group_id}/action (list_replication_targets)'
}
]),
],
deprecated_rule=deprecated_list_replication,
),
]

View File

@ -1074,6 +1074,7 @@ class GroupsAPITestCase(test.TestCase):
req = fakes.HTTPRequest.blank('/v3/%s/groups/%s/action' %
(fake.PROJECT_ID, self.group2.id),
version=mv.GROUP_VOLUME_RESET_STATUS)
req.environ['cinder.context'] = self.ctxt
body = {"reset_status": {
"status": fields.GroupStatus.AVAILABLE
}}

View File

@ -0,0 +1,364 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from cinder.api import microversions as mv
from cinder.api.v3 import groups
from cinder import exception
from cinder.objects import fields
from cinder.policies import group_actions as group_policies
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit.policies import base
from cinder.tests.unit import utils as test_utils
from cinder.volume import group_types
@ddt.ddt
class GroupActionPolicyTest(base.BasePolicyTest):
sysadmins = [
'legacy_admin',
'project_admin',
'system_admin',
]
non_sysadmins = [
'legacy_owner',
'project_member',
'project_reader',
'project_foo',
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_users = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_users = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_members = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_members = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.controller = groups.GroupsController()
self.api_path = '/v3/%s/groups' % (self.project_id)
self.api_version = mv.GROUP_REPLICATION
self.group_type = group_types.create(self.project_admin_context,
'group_type_name',
{'key3': 'value3'},
is_public=True)
# not surprisingly, to do a group action you need to get a
# group, so relax the group:get policy so that these tests
# will check the group action policy we're interested in
self.policy.set_rules({"group:get": ""},
overwrite=False)
def _create_group(self, group_status=fields.GroupStatus.AVAILABLE):
volume_type = test_utils.create_volume_type(self.project_admin_context,
name="test")
group = test_utils.create_group(self.project_admin_context,
status=group_status,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
test_utils.create_volume(self.project_member_context,
group_id=group.id,
testcase_instance=self,
volume_type_id=volume_type.id)
return group.id
@ddt.data(*base.all_users)
@mock.patch('cinder.group.api.API.enable_replication')
def test_enable_group_replication_policy(self, user_id,
mock_enable_replication):
"""Test enable group replication policy."""
# FIXME: this is a very fragile approach
def fake_enable_rep(context, group):
context.authorize(group_policies.ENABLE_REP, target_obj=group)
volume_type = test_utils.create_volume_type(self.project_admin_context,
name='test_group_policy')
group = test_utils.create_group(self.project_admin_context,
status=fields.GroupStatus.AVAILABLE,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
test_utils.create_volume(self.project_member_context,
group_id=group.id,
testcase_instance=self,
volume_type_id=volume_type.id)
mock_enable_replication.side_effect = fake_enable_rep
self.group_type.status = 'enabled'
rule_name = group_policies.ENABLE_REP
version = mv.GROUP_REPLICATION
url = '%s/%s/action' % (self.api_path, group.id)
req = fake_api.HTTPRequest.blank(url, version=version)
req.method = 'POST'
body = {
"enable_replication": {}
}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name,
self.controller.enable_replication,
req, id=group.id, body=body)
group.destroy()
@ddt.data(*base.all_users)
@mock.patch('cinder.group.api.API.disable_replication')
def test_disable_group_replication_policy(self, user_id,
mock_disable_replication):
"""Test disable group replication policy."""
# FIXME: this is a very fragile approach
def fake_disable_rep(context, group):
context.authorize(group_policies.DISABLE_REP, target_obj=group)
volume_type = test_utils.create_volume_type(self.project_admin_context,
name='test_group_policy')
group = test_utils.create_group(self.project_admin_context,
status=fields.GroupStatus.AVAILABLE,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
test_utils.create_volume(self.project_member_context,
group_id=group.id,
testcase_instance=self,
volume_type_id=volume_type.id)
mock_disable_replication.side_effect = fake_disable_rep
rule_name = group_policies.DISABLE_REP
version = mv.GROUP_REPLICATION
url = '%s/%s/action' % (self.api_path, group.id)
req = fake_api.HTTPRequest.blank(url, version=version)
req.method = 'POST'
body = {
"disable_replication": {}
}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name,
self.controller.disable_replication,
req, id=group.id, body=body)
group.destroy()
@ddt.data(*base.all_users)
def test_reset_status_group_policy(self, user_id):
"""Test reset status of group policy."""
rule_name = group_policies.RESET_STATUS
group_id = self._create_group(group_status=fields.GroupStatus.ERROR)
url = '%s/%s/action' % (self.api_path, group_id)
version = mv.GROUP_VOLUME_RESET_STATUS
req = fake_api.HTTPRequest.blank(url, version=version)
req.method = 'POST'
body = {
"reset_status": {
"status": "available"
}
}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id,
self.sysadmins,
self.non_sysadmins,
unauthorized_exceptions,
rule_name,
self.controller.reset_status,
req,
id=group_id,
body=body)
@ddt.data(*base.all_users)
def test_delete_group_policy(self, user_id):
"""Test delete group policy."""
volume_type = test_utils.create_volume_type(self.project_admin_context,
name='test_group_policy')
group_1 = test_utils.create_group(self.project_admin_context,
status=fields.GroupStatus.AVAILABLE,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
rule_name = group_policies.DELETE_POLICY
url = '%s/%s' % (self.api_path, group_1.id)
req = fake_api.HTTPRequest.blank(url, version=mv.GROUP_VOLUME)
req.method = 'POST'
body = {
"delete": {
"delete-volumes": "false"
}
}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name, self.controller.delete_group,
req, id=group_1.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.group.api.API.failover_replication')
def test_fail_over_replication_group_policy(self, user_id,
mock_failover_replication):
"""Test fail over replication group policy."""
# FIXME: this is a very fragile approach
def fake_failover_rep(context, group,
allow_attached_volume=False,
secondary_backend_id=None):
context.authorize(group_policies.FAILOVER_REP, target_obj=group)
volume_type = test_utils.create_volume_type(self.project_admin_context,
name='test_group_policy')
group_2 = test_utils.create_group(self.project_admin_context,
status=fields.GroupStatus.AVAILABLE,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
mock_failover_replication.side_effect = fake_failover_rep
rule_name = group_policies.FAILOVER_REP
url = '%s/%s' % (self.api_path, group_2.id)
req = fake_api.HTTPRequest.blank(url, version=mv.GROUP_REPLICATION)
req.method = 'POST'
body = {
"failover_replication": {
"allow_attached_volume": "true",
"secondary_backend_id": "vendor-id-1"
}
}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name,
self.controller.failover_replication,
req, id=group_2.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.group.api.API.list_replication_targets')
def test_list_replication_targets_group_policy(self, user_id,
mock_list_targets):
"""Test list replication targets for a group policy."""
# FIXME: this is a very fragile approach
def fake_list_targets(context, group):
context.authorize(group_policies.LIST_REP, target_obj=group)
volume_type = test_utils.create_volume_type(self.project_admin_context,
name='test_group_policy')
group_2 = test_utils.create_group(self.project_admin_context,
status=fields.GroupStatus.AVAILABLE,
group_type_id=self.group_type.id,
volume_type_ids=[volume_type.id])
mock_list_targets.side_effect = fake_list_targets
rule_name = group_policies.LIST_REP
url = '%s/%s/action' % (self.api_path, group_2.id)
req = fake_api.HTTPRequest.blank(url, version=mv.GROUP_REPLICATION)
req.method = 'POST'
body = {"list_replication_targets": {}}
unauthorized_exceptions = [exception.GroupNotFound]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name,
self.controller.list_replication_targets,
req, id=group_2.id, body=body)
group_2.destroy()
class GroupActionPolicySecureRbacTest(GroupActionPolicyTest):
sysadmins = [
'legacy_admin',
'system_admin',
'project_admin',
]
non_sysadmins = [
'legacy_owner',
'project_member',
'system_member',
'system_reader',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
authorized_users = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
]
unauthorized_users = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
authorized_members = authorized_users
unauthorized_members = unauthorized_users
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)

View File

@ -57,26 +57,6 @@
# POST /group_snapshots/{g_snapshot_id}/action (reset_status)
"group:reset_group_snapshot_status": ""
# Reset status of group.
# POST /groups/{group_id}/action (reset_status)
"group:reset_status": ""
# Enable replication.
# POST /groups/{group_id}/action (enable_replication)
"group:enable_replication": ""
# Disable replication.
# POST /groups/{group_id}/action (disable_replication)
"group:disable_replication": ""
# Fail over replication.
# POST /groups/{group_id}/action (failover_replication)
"group:failover_replication": ""
# List failover replication.
# POST /groups/{group_id}/action (list_replication_targets)
"group:list_replication_targets": ""
# List all services.
# GET /os-services
"volume_extension:services:index": ""