Add default policy in code for the provider resource

Leverage oslo.policy to register default policies in the
code. Administrator only need to update the specified
policy in the config file.

Change-Id: I8ec94b4a3e34257031af3eb7607806b66c6b832e
Partial-Implements: blueprint policy-in-code
This commit is contained in:
chenying 2017-09-27 17:54:26 +08:00
parent 0c5ef4aacd
commit 7900222b5d
5 changed files with 105 additions and 23 deletions

View File

@ -1,11 +1,4 @@
{
"provider:get": "rule:admin_or_owner",
"provider:get_all": "rule:admin_or_owner",
"provider:checkpoint_get": "rule:admin_or_owner",
"provider:checkpoint_get_all": "rule:admin_or_owner",
"provider:checkpoint_create": "rule:admin_or_owner",
"provider:checkpoint_delete": "rule:admin_or_owner",
"trigger:create": "",
"trigger:delete": "rule:admin_or_owner",
"trigger:update": "rule:admin_or_owner",

View File

@ -26,7 +26,7 @@ from karbor import exception
from karbor.i18n import _
from karbor import objects
import karbor.policy
from karbor.policies import providers as provider_policy
from karbor.services.protection import api as protection_api
from karbor import utils
@ -62,15 +62,6 @@ CONF.register_opts(query_checkpoint_filters_opts)
LOG = logging.getLogger(__name__)
def check_policy(context, action):
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
_action = 'provider:%s' % action
karbor.policy.enforce(context, _action, target)
class ProviderViewBuilder(common.ViewBuilder):
"""Model a server API response as a python dictionary."""
@ -230,7 +221,7 @@ class ProvidersController(wsgi.Controller):
def _get_all(self, context, marker=None, limit=None, sort_keys=None,
sort_dirs=None, filters=None, offset=None):
check_policy(context, 'get_all')
context.can(provider_policy.GET_ALL_POLICY)
if filters is None:
filters = {}
@ -272,7 +263,7 @@ class ProvidersController(wsgi.Controller):
raise exc.HTTPBadRequest(explanation=msg)
try:
check_policy(context, 'get')
context.can(provider_policy.GET_POLICY)
except exception.PolicyNotAuthorized:
# raise ProviderNotFound instead to make sure karbor behaves
# as it used to
@ -314,7 +305,7 @@ class ProvidersController(wsgi.Controller):
def _checkpoints_get_all(self, context, provider_id, marker=None,
limit=None, sort_keys=None, sort_dirs=None,
filters=None, offset=None):
check_policy(context, 'checkpoint_get_all')
context.can(provider_policy.CHECKPOINT_GET_ALL_POLICY)
if filters is None:
filters = {}
@ -352,7 +343,7 @@ class ProvidersController(wsgi.Controller):
LOG.debug('Create checkpoint request '
'body: %s provider_id:%s', body, provider_id)
check_policy(context, 'checkpoint_create')
context.can(provider_policy.CHECKPOINT_CREATE_POLICY)
checkpoint = body['checkpoint']
LOG.debug('Create checkpoint request checkpoint: %s',
checkpoint)
@ -457,7 +448,7 @@ class ProvidersController(wsgi.Controller):
raise exc.HTTPBadRequest(explanation=msg)
try:
check_policy(context, 'checkpoint_get')
context.can(provider_policy.CHECKPOINT_GET_POLICY)
except exception.PolicyNotAuthorized:
# raise CheckpointNotFound instead to make sure karbor behaves
# as it used to
@ -487,7 +478,7 @@ class ProvidersController(wsgi.Controller):
msg = _("Invalid checkpoint id provided.")
raise exc.HTTPBadRequest(explanation=msg)
check_policy(context, 'checkpoint_delete')
context.can(provider_policy.CHECKPOINT_DELETE_POLICY)
self.protection_api.delete(context,
provider_id,
checkpoint_id)

View File

@ -17,6 +17,7 @@ import itertools
from karbor.policies import base
from karbor.policies import plans
from karbor.policies import protectables
from karbor.policies import providers
from karbor.policies import restores
@ -26,4 +27,5 @@ def list_rules():
plans.list_rules(),
restores.list_rules(),
protectables.list_rules(),
providers.list_rules(),
)

View File

@ -0,0 +1,94 @@
# Copyright (c) 2017 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from karbor.policies import base
GET_POLICY = 'provider:get'
GET_ALL_POLICY = 'provider:get_all'
CHECKPOINT_GET_POLICY = 'provider:checkpoint_get'
CHECKPOINT_GET_ALL_POLICY = 'provider:checkpoint_get_all'
CHECKPOINT_CREATE_POLICY = 'provider:checkpoint_create'
CHECKPOINT_DELETE_POLICY = 'provider:checkpoint_delete'
providers_policies = [
policy.DocumentedRuleDefault(
name=GET_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""Show a protection provider.""",
operations=[
{
'method': 'GET',
'path': '/providers/{provider_id}'
}
]),
policy.DocumentedRuleDefault(
name=GET_ALL_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""List protection providers.""",
operations=[
{
'method': 'GET',
'path': '/providers'
}
]),
policy.DocumentedRuleDefault(
name=CHECKPOINT_GET_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""Show a checkpoint.""",
operations=[
{
'method': 'GET',
'path': '/providers/{provider_id}/checkpoints/{checkpoint_id}'
}
]),
policy.DocumentedRuleDefault(
name=CHECKPOINT_GET_ALL_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""List checkpoints.""",
operations=[
{
'method': 'GET',
'path': '/providers/{provider_id}/checkpoints'
}
]),
policy.DocumentedRuleDefault(
name=CHECKPOINT_CREATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""Create checkpoint.""",
operations=[
{
'method': 'POST',
'path': '/providers/{provider_id}/checkpoints'
}
]),
policy.DocumentedRuleDefault(
name=CHECKPOINT_DELETE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description="""Delete checkpoint.""",
operations=[
{
'method': 'DELETE',
'path': '/providers/{provider_id}/checkpoints/{checkpoint_id}'
}
]),
]
def list_rules():
return providers_policies

View File

@ -29,6 +29,8 @@ class ProvidersApiTest(base.TestCase):
super(ProvidersApiTest, self).setUp()
self.controller = providers.ProvidersController()
self.ctxt = context.RequestContext('demo', 'fakeproject', True)
self.mock_policy_check = self.mock_object(
context.RequestContext, 'can')
@mock.patch(
'karbor.api.v1.providers.ProvidersController._get_all')