Add RBAC enforcement to l7rules v2 API

This patch adds policies and enforcement to the Octavia v2 API for l7rules.

Change-Id: I2050ef70c26bc59d3777842ea3de9900d4e282fe
Partial-Bug: #1690481
This commit is contained in:
Michael Johnson 2017-06-20 16:16:45 -07:00
parent accf9456cc
commit f4a16a842b
6 changed files with 429 additions and 2 deletions

View File

@ -45,6 +45,13 @@ class L7RuleController(base.BaseController):
"""Gets a single l7rule's details."""
context = pecan.request.context.get('octavia_context')
db_l7rule = self._get_db_l7rule(context.session, id)
# Check that the user is authorized to show this l7rule
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='get_one')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
result = self._convert_db_to_type(db_l7rule,
l7rule_types.L7RuleResponse)
return l7rule_types.L7RuleRootResponse(rule=result)
@ -55,6 +62,15 @@ class L7RuleController(base.BaseController):
"""Lists all l7rules of a l7policy."""
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
l7policy = self._get_db_l7policy(context.session, self.l7policy_id)
# Check that the user is authorized to list members for this l7rule
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='get_all')
target = {'project_id': l7policy.project_id}
context.policy.authorize(action, target)
db_l7rules, links = self.repositories.l7rule.get_all(
context.session, show_deleted=False, l7policy_id=self.l7policy_id,
pagination_helper=pcontext.get(constants.PAGINATION_HELPER))
@ -143,6 +159,12 @@ class L7RuleController(base.BaseController):
self.l7policy_id)
self._check_l7policy_max_rules(context.session)
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='post')
target = {'project_id': l7rule.project_id}
context.policy.authorize(action, target)
lock_session = db_api.get_session(autocommit=False)
l7rule_dict = db_prepare.create_l7rule(
l7rule.to_dict(render_unsets=True), self.l7policy_id)
@ -178,6 +200,13 @@ class L7RuleController(base.BaseController):
new_l7rule = db_l7rule.to_dict()
new_l7rule.update(l7rule.to_dict())
new_l7rule = data_models.L7Rule.from_dict(new_l7rule)
# Check that the user is authorized to update this l7rule
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='put')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
try:
validate.l7rule_data(new_l7rule)
except Exception as e:
@ -209,6 +238,13 @@ class L7RuleController(base.BaseController):
"""Deletes a l7rule."""
context = pecan.request.context.get('octavia_context')
db_l7rule = self._get_db_l7rule(context.session, id)
# Check that the user is authorized to update this member
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='delete')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
self._test_lb_listener_policy_statuses(context.session)
self.repositories.l7rule.update(

View File

@ -434,3 +434,4 @@ RBAC_POOL = '{}:pool:'.format(LOADBALANCER_API)
RBAC_MEMBER = '{}:member:'.format(LOADBALANCER_API)
RBAC_HEALTHMONITOR = '{}:healthmonitor:'.format(LOADBALANCER_API)
RBAC_L7POLICY = '{}:l7policy:'.format(LOADBALANCER_API)
RBAC_L7RULE = '{}:l7rule:'.format(LOADBALANCER_API)

View File

@ -16,6 +16,7 @@ import itertools
from octavia.policies import base
from octavia.policies import healthmonitor
from octavia.policies import l7policy
from octavia.policies import l7rule
from octavia.policies import listener
from octavia.policies import loadbalancer
from octavia.policies import member
@ -27,6 +28,7 @@ def list_rules():
base.list_rules(),
healthmonitor.list_rules(),
l7policy.list_rules(),
l7rule.list_rules(),
listener.list_rules(),
loadbalancer.list_rules(),
member.list_rules(),

View File

@ -0,0 +1,62 @@
# Copyright 2017 Rackspace, US Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia.common import constants
from oslo_policy import policy
rules = [
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7RULE,
action='get_all'),
constants.RULE_API_READ,
"List L7 Rules",
[{'method': 'GET',
'path': '/v2.0/lbaas/l7policies/{l7policy_id}/rules'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7RULE,
action='post'),
constants.RULE_API_WRITE,
"Create a L7 Rule",
[{'method': 'POST',
'path': '/v2.0/lbaas/l7policies/{l7policy_id}/rules'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7RULE,
action='get_one'),
constants.RULE_API_READ,
"Show L7 Rule details",
[{'method': 'GET',
'path': '/v2.0/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7RULE,
action='put'),
constants.RULE_API_WRITE,
"Update a L7 Rule",
[{'method': 'PUT',
'path': '/v2.0/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7RULE,
action='delete'),
constants.RULE_API_WRITE,
"Remove a L7 Rule",
[{'method': 'DELETE',
'path': '/v2.0/lbaas/l7policies/{l7policy_id}/rules/{l7rule_id}'}]
),
]
def list_rules():
return rules

View File

@ -261,12 +261,13 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
return response.json
def create_l7rule(self, l7policy_id, type, compare_type,
value, **optionals):
value, status=None, **optionals):
req_dict = {'type': type, 'compare_type': compare_type, 'value': value}
req_dict.update(optionals)
body = {'rule': req_dict}
path = self.L7RULES_PATH.format(l7policy_id=l7policy_id)
response = self.post(path, body)
status = {'status': status} if status else {}
response = self.post(path, body, **status)
return response.json
def create_quota(self, project_id=-1, lb_quota=None, listener_quota=None,

View File

@ -12,9 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import constants
import octavia.common.context
from octavia.tests.functional.api.v2 import base
@ -28,6 +32,7 @@ class TestL7Rule(base.BaseAPITest):
super(TestL7Rule, self).setUp()
self.lb = self.create_load_balancer(uuidutils.generate_uuid())
self.lb_id = self.lb.get('loadbalancer').get('id')
self.project_id = self.lb.get('loadbalancer').get('project_id')
self.set_lb_status(self.lb_id)
self.listener = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb_id=self.lb_id)
@ -50,6 +55,52 @@ class TestL7Rule(base.BaseAPITest):
l7rule_id=l7rule.get('id'))).json.get(self.root_tag)
self.assertEqual(l7rule, response)
def test_get_authorized(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.get(self.l7rule_path.format(
l7rule_id=l7rule.get('id'))).json.get(self.root_tag)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(l7rule, response)
def test_get_not_authorized(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
response = self.get(self.l7rule_path.format(
l7rule_id=l7rule.get('id')), status=401).json
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response)
def test_get_hides_deleted(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
@ -96,6 +147,71 @@ class TestL7Rule(base.BaseAPITest):
self.assertIn((api_l7r_b.get('id'), api_l7r_b.get('type')),
rule_id_types)
def test_get_all_authorized(self):
api_l7r_a = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
api_l7r_b = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
rules = self.get(
self.l7rules_path).json.get(self.root_tag_list)
self.conf.config(auth_strategy=auth_strategy)
self.assertIsInstance(rules, list)
self.assertEqual(2, len(rules))
rule_id_types = [(r.get('id'), r.get('type')) for r in rules]
self.assertIn((api_l7r_a.get('id'), api_l7r_a.get('type')),
rule_id_types)
self.assertIn((api_l7r_b.get('id'), api_l7r_b.get('type')),
rule_id_types)
def test_get_all_not_authorized(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_COOKIE,
constants.L7RULE_COMPARE_TYPE_CONTAINS, 'some-value',
key='some-cookie').get(self.root_tag)
self.set_lb_status(self.lb_id)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
rules = self.get(self.l7rules_path, status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, rules.json)
def test_get_all_sorted(self):
self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
@ -201,6 +317,64 @@ class TestL7Rule(base.BaseAPITest):
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_rule_authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com').get(self.root_tag)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(constants.L7RULE_TYPE_HOST_NAME,
api_l7rule.get('type'))
self.assertEqual(constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
api_l7rule.get('compare_type'))
self.assertEqual('www.example.com', api_l7rule.get('value'))
self.assertIsNone(api_l7rule.get('key'))
self.assertFalse(api_l7rule.get('invert'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_CREATE,
l7rule_op_status=constants.OFFLINE)
def test_create_rule_not_authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_HOST_NAME,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO,
'www.example.com', status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, api_l7rule)
def test_create_path_rule(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
@ -358,6 +532,74 @@ class TestL7Rule(base.BaseAPITest):
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_UPDATE)
def test_update_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images'}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule)).json.get(self.root_tag)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual('/api', response.get('value'))
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_UPDATE)
def test_update_not_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
new_l7rule = {'value': '/images'}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
response = self.put(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id')),
self._build_body(new_l7rule), status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.ACTIVE,
listener_prov_status=constants.ACTIVE,
l7policy_prov_status=constants.ACTIVE,
l7rule_prov_status=constants.ACTIVE)
def test_bad_update(self):
l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
@ -424,6 +666,89 @@ class TestL7Rule(base.BaseAPITest):
l7rule_prov_status=constants.PENDING_DELETE)
self.set_lb_status(self.lb_id)
def test_delete_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
api_l7rule.pop('updated_at')
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
response.pop('updated_at')
self.assertEqual(api_l7rule, response)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.delete(
self.l7rule_path.format(l7rule_id=api_l7rule.get('id')))
self.conf.config(auth_strategy=auth_strategy)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.PENDING_UPDATE,
listener_prov_status=constants.PENDING_UPDATE,
l7policy_prov_status=constants.PENDING_UPDATE,
l7rule_prov_status=constants.PENDING_DELETE)
self.set_lb_status(self.lb_id)
def test_delete_not_authorized(self):
api_l7rule = self.create_l7rule(
self.l7policy_id, constants.L7RULE_TYPE_PATH,
constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
'/api').get(self.root_tag)
self.set_lb_status(self.lb_id)
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
api_l7rule['provisioning_status'] = constants.ACTIVE
api_l7rule['operating_status'] = constants.ONLINE
api_l7rule.pop('updated_at')
response = self.get(self.l7rule_path.format(
l7rule_id=api_l7rule.get('id'))).json.get(self.root_tag)
response.pop('updated_at')
self.assertEqual(api_l7rule, response)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
self.delete(
self.l7rule_path.format(l7rule_id=api_l7rule.get('id')),
status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assert_correct_status(
lb_id=self.lb_id, listener_id=self.listener_id,
l7policy_id=self.l7policy_id, l7rule_id=api_l7rule.get('id'),
lb_prov_status=constants.ACTIVE,
listener_prov_status=constants.ACTIVE,
l7policy_prov_status=constants.ACTIVE,
l7rule_prov_status=constants.ACTIVE)
def test_bad_delete(self):
self.delete(self.l7rule_path.format(
l7rule_id=uuidutils.generate_uuid()), status=404)