Add RBAC enforcement to quotas v2 API

This patch adds policies and enforcement to the Octavia v2 API for quotas.

Change-Id: I5f2fa38973fce595ea3ec03cdff924336e0e71c8
Partial-Bug: #1690481
This commit is contained in:
Michael Johnson 2017-06-20 18:51:17 -07:00
parent f4a16a842b
commit 335c00ac18
9 changed files with 697 additions and 24 deletions

View File

@ -525,6 +525,7 @@ function add_load-balancer_roles {
openstack role create load-balancer_global_observer
openstack role create load-balancer_member
openstack role create load-balancer_admin
openstack role create load-balancer_quota_admin
openstack role add --user demo --project demo load-balancer_member
}

View File

@ -10,21 +10,24 @@ the load-balancer API:
.. glossary::
role:load-balancer_observer
User has access to load-balancer read-only APIs
User has access to load-balancer read-only APIs.
role:load-balancer_global_observer
User has access to load-balancer read-only APIs including resources
owned by others.
role:load-balancer_member
User has access to load-balancer read and write APIs
User has access to load-balancer read and write APIs.
role:load-balancer_quota_admin
User is considered an admin for quota APIs only.
role:load-balancer_admin
User is considered an admin for all load-balnacer APIs including
resources owned by others.
role:admin
User is admin to all APIs
User is admin to all APIs.
.. note::

View File

@ -4,5 +4,8 @@
"load-balancer:read": "rule:admin_or_owner",
"load-balancer:read-global": "is_admin:True",
"load-balancer:write": "rule:admin_or_owner"
"load-balancer:write": "rule:admin_or_owner",
"load-balancer:read-quota": "rule:admin_or_owner",
"load-balancer:read-quota-global": "is_admin:True",
"load-balancer:write-quota": "is_admin:True"
}

View File

@ -35,27 +35,47 @@ class QuotasController(base.BaseController):
def get(self, project_id):
"""Get a single project's quota details."""
context = pecan.request.context.get('octavia_context')
# Check that the user is authorized to show this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_one')
target = {'project_id': project_id}
context.policy.authorize(action, target)
db_quotas = self._get_db_quotas(context.session, project_id)
return self._convert_db_to_type(db_quotas, quota_types.QuotaResponse)
@wsme_pecan.wsexpose(quota_types.QuotaAllResponse,
ignore_extra_args=True)
def get_all(self, tenant_id=None, project_id=None):
def get_all(self, project_id=None):
"""List all non-default quotas."""
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
if context.is_admin or CONF.auth_strategy == constants.NOAUTH:
if project_id or tenant_id:
project_id = {'project_id': project_id or tenant_id}
else:
project_id = {}
# Check that the user is authorized to list quotas under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check if user is authorized to list quota under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
project_id = {'project_id': context.project_id}
query_filter = {'project_id': project_id}
db_quotas, links = self.repositories.quotas.get_all(
context.session,
pagination_helper=pcontext.get(constants.PAGINATION_HELPER),
**project_id)
**query_filter)
quotas = quota_types.QuotaAllResponse.from_data_model(db_quotas)
quotas.quotas_links = links
return quotas
@ -66,15 +86,14 @@ class QuotasController(base.BaseController):
"""Update any or all quotas for a project."""
context = pecan.request.context.get('octavia_context')
new_project_id = context.project_id
if context.is_admin or CONF.auth_strategy == constants.NOAUTH:
if project_id:
new_project_id = project_id
if not new_project_id:
if not project_id:
raise exceptions.MissingAPIProjectID()
project_id = new_project_id
# Check that the user is authorized to update this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='put')
target = {'project_id': project_id}
context.policy.authorize(action, target)
quotas_dict = quotas.to_dict()
self.repositories.quotas.update(context.session, project_id,
@ -86,7 +105,16 @@ class QuotasController(base.BaseController):
def delete(self, project_id):
"""Reset a project's quotas to the default values."""
context = pecan.request.context.get('octavia_context')
project_id = context.project_id or project_id
if not project_id:
raise exceptions.MissingAPIProjectID()
# Check that the user is authorized to delete this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='delete')
target = {'project_id': project_id}
context.policy.authorize(action, target)
self.repositories.quotas.delete(context.session, project_id)
db_quotas = self._get_db_quotas(context.session, project_id)
return self._convert_db_to_type(db_quotas, quota_types.QuotaResponse)
@ -108,6 +136,15 @@ class QuotasDefaultController(base.BaseController):
def get(self):
"""Get a project's default quota details."""
context = pecan.request.context.get('octavia_context')
project_id = context.project_id
quotas = self._get_default_quotas(project_id)
if not self.project_id:
raise exceptions.MissingAPIProjectID()
# Check that the user is authorized to see quota defaults
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_defaults')
target = {'project_id': self.project_id}
context.policy.authorize(action, target)
quotas = self._get_default_quotas(self.project_id)
return self._convert_db_to_type(quotas, quota_types.QuotaResponse)

View File

@ -427,7 +427,9 @@ LOADBALANCER_API = 'os_load-balancer_api'
RULE_API_READ = 'rule:load-balancer:read'
RULE_API_READ_GLOBAL = 'rule:load-balancer:read-global'
RULE_API_WRITE = 'rule:load-balancer:write'
RULE_ANY = '@'
RULE_API_READ_QUOTA = 'rule:load-balancer:read-quota'
RULE_API_READ_QUOTA_GLOBAL = 'rule:load-balancer:read-quota-global'
RULE_API_WRITE_QUOTA = 'rule:load-balancer:write-quota'
RBAC_LOADBALANCER = '{}:loadbalancer:'.format(LOADBALANCER_API)
RBAC_LISTENER = '{}:listener:'.format(LOADBALANCER_API)
RBAC_POOL = '{}:pool:'.format(LOADBALANCER_API)
@ -435,3 +437,4 @@ RBAC_MEMBER = '{}:member:'.format(LOADBALANCER_API)
RBAC_HEALTHMONITOR = '{}:healthmonitor:'.format(LOADBALANCER_API)
RBAC_L7POLICY = '{}:l7policy:'.format(LOADBALANCER_API)
RBAC_L7RULE = '{}:l7rule:'.format(LOADBALANCER_API)
RBAC_QUOTA = '{}:quota:'.format(LOADBALANCER_API)

View File

@ -21,6 +21,7 @@ from octavia.policies import listener
from octavia.policies import loadbalancer
from octavia.policies import member
from octavia.policies import pool
from octavia.policies import quota
def list_rules():
@ -33,4 +34,5 @@ def list_rules():
loadbalancer.list_rules(),
member.list_rules(),
pool.list_rules(),
quota.list_rules(),
)

View File

@ -63,6 +63,21 @@ rules = [
policy.RuleDefault('load-balancer:write',
'rule:load-balancer:member_and_owner or is_admin:True'),
policy.RuleDefault('load-balancer:read-quota',
'rule:load-balancer:observer_and_owner or '
'rule:load-balancer:global_observer or '
'rule:load-balancer:member_and_owner or '
'role:load-balancer_quota_admin or '
'is_admin:True'),
policy.RuleDefault('load-balancer:read-quota-global',
'rule:load-balancer:global_observer or '
'role:load-balancer_quota_admin or '
'is_admin:True'),
policy.RuleDefault('load-balancer:write-quota',
'role:load-balancer_quota_admin or is_admin:True'),
]

68
octavia/policies/quota.py Normal file
View File

@ -0,0 +1,68 @@
# Copyright 2017 Rackspace, US Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from octavia.common import constants
from oslo_policy import policy
rules = [
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='get_all'),
constants.RULE_API_READ_QUOTA,
"List Quotas",
[{'method': 'GET', 'path': '/v2.0/lbaas/quotas'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='get_all-global'),
constants.RULE_API_READ_QUOTA_GLOBAL,
"List Quotas including resources owned by others",
[{'method': 'GET', 'path': '/v2.0/lbaas/quotas'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='get_one'),
constants.RULE_API_READ_QUOTA,
"Show Quota details",
[{'method': 'GET',
'path': '/v2.0/lbaas/quotas/{project_id}'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='put'),
constants.RULE_API_WRITE_QUOTA,
"Update a Quota",
[{'method': 'PUT',
'path': '/v2.0/lbaas/quotas/{project_id}'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='delete'),
constants.RULE_API_WRITE_QUOTA,
"Remove a Quota",
[{'method': 'DELETE',
'path': '/v2.0/lbaas/quotas/{project_id}'}]
),
policy.DocumentedRuleDefault(
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_QUOTA,
action='get_defaults'),
constants.RULE_API_READ_QUOTA,
"Show Default Quota for a Project",
[{'method': 'GET',
'path': '/v2.0/lbaas/quotas/{project_id}/default'}]
),
]
def list_rules():
return rules

View File

@ -100,6 +100,98 @@ class TestQuotas(base.BaseAPITest):
expected = {'quotas': [quota1, quota2], 'quotas_links': []}
self.assertEqual(expected, quota_list)
def test_get_all_not_Authorized(self):
project_id1 = uuidutils.generate_uuid()
project_id2 = uuidutils.generate_uuid()
quota_path1 = self.QUOTA_PATH.format(project_id=project_id1)
quota1 = {'load_balancer': constants.QUOTA_UNLIMITED, 'listener': 30,
'pool': 30, 'health_monitor': 30, 'member': 30}
body1 = {'quota': quota1}
self.put(quota_path1, body1, status=202)
quota_path2 = self.QUOTA_PATH.format(project_id=project_id2)
quota2 = {'load_balancer': 50, 'listener': 50, 'pool': 50,
'health_monitor': 50, 'member': 50}
body2 = {'quota': quota2}
self.put(quota_path2, body2, status=202)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
response = self.get(self.QUOTAS_PATH, status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_get_all_not_Authorized_no_role(self):
project_id1 = uuidutils.generate_uuid()
quota_path1 = self.QUOTA_PATH.format(project_id=project_id1)
quota1 = {'load_balancer': constants.QUOTA_UNLIMITED, 'listener': 30,
'pool': 30, 'health_monitor': 30, 'member': 30}
body1 = {'quota': quota1}
self.put(quota_path1, body1, status=202)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project_id1):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': [],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.get(self.QUOTAS_PATH, status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_get_all_not_Authorized_bogus_role(self):
project_id1 = uuidutils.generate_uuid()
project_id2 = uuidutils.generate_uuid()
quota_path1 = self.QUOTA_PATH.format(project_id=project_id1)
quota1 = {'load_balancer': constants.QUOTA_UNLIMITED, 'listener': 30,
'pool': 30, 'health_monitor': 30, 'member': 30}
body1 = {'quota': quota1}
self.put(quota_path1, body1, status=202)
quota_path2 = self.QUOTA_PATH.format(project_id=project_id2)
quota2 = {'load_balancer': 50, 'listener': 50, 'pool': 50,
'health_monitor': 50, 'member': 50}
body2 = {'quota': quota2}
self.put(quota_path2, body2, status=202)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_bogus'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.get(self.QUOTAS_PATH, status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_get_all_admin(self):
project_id1 = uuidutils.generate_uuid()
project_id2 = uuidutils.generate_uuid()
@ -124,6 +216,98 @@ class TestQuotas(base.BaseAPITest):
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
def test_get_all_non_admin_global_observer(self):
project_id1 = uuidutils.generate_uuid()
project_id2 = uuidutils.generate_uuid()
project_id3 = uuidutils.generate_uuid()
quota1 = self.create_quota(
project_id=project_id1, lb_quota=1, member_quota=1
).get(self.root_tag)
quota2 = self.create_quota(
project_id=project_id2, lb_quota=2, member_quota=2
).get(self.root_tag)
quota3 = self.create_quota(
project_id=project_id3, lb_quota=3, member_quota=3
).get(self.root_tag)
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_global_observer'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(self.QUOTAS_PATH)
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(3, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
self.assertIn((quota1.get('load_balancer'), quota1.get('member')),
quota_lb_member_quotas)
self.assertIn((quota2.get('load_balancer'), quota2.get('member')),
quota_lb_member_quotas)
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
def test_get_all_quota_admin(self):
project_id1 = uuidutils.generate_uuid()
project_id2 = uuidutils.generate_uuid()
project_id3 = uuidutils.generate_uuid()
quota1 = self.create_quota(
project_id=project_id1, lb_quota=1, member_quota=1
).get(self.root_tag)
quota2 = self.create_quota(
project_id=project_id2, lb_quota=2, member_quota=2
).get(self.root_tag)
quota3 = self.create_quota(
project_id=project_id3, lb_quota=3, member_quota=3
).get(self.root_tag)
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_quota_admin'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(self.QUOTAS_PATH)
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(3, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
self.assertIn((quota1.get('load_balancer'), quota1.get('member')),
quota_lb_member_quotas)
self.assertIn((quota2.get('load_balancer'), quota2.get('member')),
quota_lb_member_quotas)
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
def test_get_all_non_admin(self):
project1_id = uuidutils.generate_uuid()
project2_id = uuidutils.generate_uuid()
@ -142,7 +326,68 @@ class TestQuotas(base.BaseAPITest):
self.conf.config(auth_strategy=constants.KEYSTONE)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project3_id):
quotas = self.get(self.QUOTAS_PATH).json.get(self.root_tag_list)
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project3_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(self.QUOTAS_PATH)
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(1, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
def test_get_all_non_admin_observer(self):
project1_id = uuidutils.generate_uuid()
project2_id = uuidutils.generate_uuid()
project3_id = uuidutils.generate_uuid()
self.create_quota(
project_id=project1_id, lb_quota=1, member_quota=1
).get(self.root_tag)
self.create_quota(
project_id=project2_id, lb_quota=2, member_quota=2
).get(self.root_tag)
quota3 = self.create_quota(
project_id=project3_id, lb_quota=3, member_quota=3
).get(self.root_tag)
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.KEYSTONE)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project3_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_observer'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project3_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(self.QUOTAS_PATH)
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(1, len(quotas))
@ -170,6 +415,133 @@ class TestQuotas(base.BaseAPITest):
).json.get(self.root_tag)
self._assert_quotas_equal(quotas, quota2)
def test_get_Authorized_member(self):
self._test_get_Authorized('load-balancer_member')
def test_get_Authorized_observer(self):
self._test_get_Authorized('load-balancer_observer')
def test_get_Authorized_global_observer(self):
self._test_get_Authorized('load-balancer_global_observer')
def test_get_Authorized_quota_admin(self):
self._test_get_Authorized('load-balancer_quota_admin')
def _test_get_Authorized(self, role):
project1_id = uuidutils.generate_uuid()
quota1 = self.create_quota(
project_id=project1_id, lb_quota=1, member_quota=1
).get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project1_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': [role],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project1_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(
self.QUOTA_PATH.format(project_id=project1_id)
).json.get(self.root_tag)
self.conf.config(auth_strategy=auth_strategy)
self._assert_quotas_equal(quotas, quota1)
def test_get_not_Authorized(self):
project1_id = uuidutils.generate_uuid()
self.create_quota(
project_id=project1_id, lb_quota=1, member_quota=1
).get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
quotas = self.get(self.QUOTA_PATH.format(project_id=project1_id),
status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, quotas.json)
def test_get_not_Authorized_bogus_role(self):
project1_id = uuidutils.generate_uuid()
self.create_quota(
project_id=project1_id, lb_quota=1, member_quota=1
).get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project1_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer:bogus'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project1_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(
self.QUOTA_PATH.format(project_id=project1_id),
status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, quotas.json)
def test_get_not_Authorized_no_role(self):
project1_id = uuidutils.generate_uuid()
self.create_quota(
project_id=project1_id, lb_quota=1, member_quota=1
).get(self.root_tag)
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project1_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': [],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project1_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
quotas = self.get(
self.QUOTA_PATH.format(project_id=project1_id),
status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, quotas.json)
def test_get_all_sorted(self):
project1_id = uuidutils.generate_uuid()
project2_id = uuidutils.generate_uuid()
@ -250,6 +622,45 @@ class TestQuotas(base.BaseAPITest):
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'])
def test_get_default_quotas_Authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.get(self.QUOTA_DEFAULT_PATH.format(
project_id=self.project_id))
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'])
self.conf.config(auth_strategy=auth_strategy)
def test_get_default_quotas_not_Authorized(self):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
response = self.get(self.QUOTA_DEFAULT_PATH.format(
project_id=self.project_id), status=401)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
self.conf.config(auth_strategy=auth_strategy)
def test_custom_quotas(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': 30, 'pool': 30,
@ -259,6 +670,66 @@ class TestQuotas(base.BaseAPITest):
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'], expected=body['quota'])
def test_custom_quotas_quota_admin(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': 30, 'pool': 30,
'health_monitor': 30, 'member': 30}}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_quota_admin'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.put(quota_path, body, status=202)
self.conf.config(auth_strategy=auth_strategy)
response = self.get(quota_path)
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'], expected=body['quota'])
def test_custom_quotas_not_Authorized_member(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': 30, 'pool': 30,
'health_monitor': 30, 'member': 30}}
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self.put(quota_path, body, status=401)
self.conf.config(auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
def test_custom_partial_quotas(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': None, 'pool': 30,
@ -300,6 +771,76 @@ class TestQuotas(base.BaseAPITest):
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'])
def test_delete_custom_quotas_admin(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': 30, 'pool': 30,
'health_monitor': 30, 'member': 30}}
self.put(quota_path, body, status=202)
response = self.get(quota_path)
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'], expected=body['quota'])
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_quota_admin'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.delete(quota_path, status=202)
self.conf.config(auth_strategy=auth_strategy)
response = self.get(quota_path)
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'])
def test_delete_quotas_not_Authorized_member(self):
quota_path = self.QUOTA_PATH.format(project_id=self.project_id)
body = {'quota': {'load_balancer': 30, 'listener': 30, 'pool': 30,
'health_monitor': 30, 'member': 30}}
self.put(quota_path, body, status=202)
response = self.get(quota_path)
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'], expected=body['quota'])
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.get('auth_strategy')
self.conf.config(auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
self.project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': self.project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
self.delete(quota_path, status=401)
self.conf.config(auth_strategy=auth_strategy)
response = self.get(quota_path)
quota_dict = response.json
self._assert_quotas_equal(quota_dict['quota'], expected=body['quota'])
def test_delete_non_existent_custom_quotas(self):
quota_path = self.QUOTA_PATH.format(project_id='bogus')
self.delete(quota_path, status=404)