Refactor the RBAC auth enforcement a bit

Change-Id: Idb0e2fc8560b242e80c2d217384fade5d82f4551
This commit is contained in:
Adam Harwell 2017-06-21 13:11:40 -07:00
parent 335c00ac18
commit 3ce659e9a5
10 changed files with 108 additions and 269 deletions

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from pecan import rest
from stevedore import driver as stevedore_driver
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.db import repositories
@ -27,6 +28,7 @@ LOG = logging.getLogger(__name__)
class BaseController(rest.RestController):
RBAC_TYPE = None
def __init__(self):
super(BaseController, self).__init__()
@ -146,3 +148,29 @@ class BaseController(rest.RestController):
if db_quotas.member is None:
db_quotas.member = CONF.quotas.default_member_quota
return db_quotas
def _auth_get_all(self, context, project_id):
# Check authorization to list objects under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=self.RBAC_TYPE, action=constants.RBAC_GET_ALL_GLOBAL)
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check authorization to list objects under this project
self._auth_validate_action(context, project_id,
constants.RBAC_GET_ALL)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
return query_filter
def _auth_validate_action(self, context, project_id, action):
# Check that the user is authorized to do an action in this object
action = '{rbac_obj}{action}'.format(
rbac_obj=self.RBAC_TYPE, action=action)
target = {'project_id': project_id}
context.policy.authorize(action, target)

View File

@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__)
class HealthMonitorController(base.BaseController):
RBAC_TYPE = constants.RBAC_HEALTHMONITOR
def __init__(self):
super(HealthMonitorController, self).__init__()
@ -59,11 +60,8 @@ class HealthMonitorController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_hm = self._get_db_hm(context.session, id)
# Check that the user is authorized to show this health monitor
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='get_one')
target = {'project_id': db_hm.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_hm.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(
db_hm, hm_types.HealthMonitorResponse)
@ -76,25 +74,7 @@ class HealthMonitorController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check if user is authorized to list health mons under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check that the user is authorized to list lbs under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
db_hm, links = self.repositories.health_monitor.get_all(
context.session, show_deleted=False,
@ -184,11 +164,8 @@ class HealthMonitorController(base.BaseController):
pool = self._get_db_pool(context.session, health_monitor.pool_id)
health_monitor.project_id = pool.project_id
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='post')
target = {'project_id': health_monitor.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, health_monitor.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
@ -226,11 +203,8 @@ class HealthMonitorController(base.BaseController):
health_monitor = health_monitor_.healthmonitor
db_hm = self._get_db_hm(context.session, id)
# Check that the user is authorized to update this health monitor
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='put')
target = {'project_id': db_hm.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_hm.project_id,
constants.RBAC_PUT)
self._test_lb_and_listener_and_pool_statuses(context.session, db_hm)
@ -262,11 +236,8 @@ class HealthMonitorController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_hm = self._get_db_hm(context.session, id)
# Check that the user is authorized to delete this health monitor
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_HEALTHMONITOR, action='delete')
target = {'project_id': db_hm.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_hm.project_id,
constants.RBAC_DELETE)
self._test_lb_and_listener_and_pool_statuses(context.session, db_hm)

View File

@ -37,6 +37,7 @@ LOG = logging.getLogger(__name__)
class L7PolicyController(base.BaseController):
RBAC_TYPE = constants.RBAC_L7POLICY
def __init__(self):
super(L7PolicyController, self).__init__()
@ -48,11 +49,8 @@ class L7PolicyController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_l7policy = self._get_db_l7policy(context.session, id)
# Check that the user is authorized to show this l7policy
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='get_one')
target = {'project_id': db_l7policy.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7policy.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(db_l7policy,
l7policy_types.L7PolicyResponse)
@ -65,25 +63,7 @@ class L7PolicyController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check if user is authorized to list l7policies under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check if user is authorized to list l7policies under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
db_l7policies, links = self.repositories.l7policy.get_all(
context.session, show_deleted=False,
@ -171,11 +151,8 @@ class L7PolicyController(base.BaseController):
load_balancer_id = listener.load_balancer_id
l7policy.project_id = listener.project_id
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='post')
target = {'project_id': l7policy.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, l7policy.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
@ -242,11 +219,8 @@ class L7PolicyController(base.BaseController):
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
db_l7policy)
# Check that the user is authorized to update this l7policy
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='put')
target = {'project_id': db_l7policy.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7policy.project_id,
constants.RBAC_PUT)
self._test_lb_and_listener_statuses(context.session,
lb_id=load_balancer_id,
@ -282,11 +256,8 @@ class L7PolicyController(base.BaseController):
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
db_l7policy)
# Check that the user is authorized to delete this pool
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7POLICY, action='delete')
target = {'project_id': db_l7policy.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7policy.project_id,
constants.RBAC_DELETE)
self._test_lb_and_listener_statuses(context.session,
lb_id=load_balancer_id,

View File

@ -34,6 +34,7 @@ LOG = logging.getLogger(__name__)
class L7RuleController(base.BaseController):
RBAC_TYPE = constants.RBAC_L7RULE
def __init__(self, l7policy_id):
super(L7RuleController, self).__init__()
@ -46,11 +47,8 @@ class L7RuleController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_l7rule = self._get_db_l7rule(context.session, id)
# Check that the user is authorized to show this l7rule
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='get_one')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7rule.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(db_l7rule,
l7rule_types.L7RuleResponse)
@ -159,11 +157,8 @@ class L7RuleController(base.BaseController):
self.l7policy_id)
self._check_l7policy_max_rules(context.session)
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='post')
target = {'project_id': l7rule.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, l7rule.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
l7rule_dict = db_prepare.create_l7rule(
@ -201,11 +196,8 @@ class L7RuleController(base.BaseController):
new_l7rule.update(l7rule.to_dict())
new_l7rule = data_models.L7Rule.from_dict(new_l7rule)
# Check that the user is authorized to update this l7rule
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='put')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7rule.project_id,
constants.RBAC_PUT)
try:
validate.l7rule_data(new_l7rule)
@ -239,11 +231,8 @@ class L7RuleController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_l7rule = self._get_db_l7rule(context.session, id)
# Check that the user is authorized to update this member
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_L7RULE, action='delete')
target = {'project_id': db_l7rule.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_l7rule.project_id,
constants.RBAC_DELETE)
self._test_lb_listener_policy_statuses(context.session)

View File

@ -37,6 +37,7 @@ LOG = logging.getLogger(__name__)
class ListenersController(base.BaseController):
RBAC_TYPE = constants.RBAC_LISTENER
def __init__(self):
super(ListenersController, self).__init__()
@ -61,11 +62,8 @@ class ListenersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_listener = self._get_db_listener(context.session, id)
# Check that the user is authorized to show this listener
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='get_one')
target = {'project_id': db_listener.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_listener.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(db_listener,
listener_types.ListenerResponse)
@ -78,25 +76,7 @@ class ListenersController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check that the user is authorized to list lbs under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check that the user is authorized to list lbs under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
db_listeners, links = self.repositories.listener.get_all(
context.session, show_deleted=False,
@ -211,11 +191,8 @@ class ListenersController(base.BaseController):
listener.project_id = self._get_lb_project_id(
context.session, load_balancer_id)
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='post')
target = {'project_id': listener.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, listener.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
@ -285,11 +262,8 @@ class ListenersController(base.BaseController):
db_listener = self._get_db_listener(context.session, id)
load_balancer_id = db_listener.load_balancer_id
# Check that the user is authorized to update this listener
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='put')
target = {'project_id': db_listener.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_listener.project_id,
constants.RBAC_PUT)
# TODO(rm_work): Do we need something like this? What do we do on an
# empty body for a PUT?
@ -327,11 +301,8 @@ class ListenersController(base.BaseController):
db_listener = self._get_db_listener(context.session, id)
load_balancer_id = db_listener.load_balancer_id
# Check that the user is authorized to delete this listener
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LISTENER, action='delete')
target = {'project_id': db_listener.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_listener.project_id,
constants.RBAC_DELETE)
self._test_lb_and_listener_statuses(
context.session, load_balancer_id,

View File

@ -41,6 +41,7 @@ LOG = logging.getLogger(__name__)
class LoadBalancersController(base.BaseController):
RBAC_TYPE = constants.RBAC_LOADBALANCER
def __init__(self):
super(LoadBalancersController, self).__init__()
@ -52,11 +53,8 @@ class LoadBalancersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
load_balancer = self._get_db_lb(context.session, id)
# Check that the user is authorized to show this lb
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='get_one')
target = {'project_id': load_balancer.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, load_balancer.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(
load_balancer, lb_types.LoadBalancerResponse)
@ -69,25 +67,7 @@ class LoadBalancersController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check that the user is authorized to list lbs under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check that the user is authorized to list lbs under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
load_balancers, links = self.repositories.load_balancer.get_all(
context.session, show_deleted=False,
@ -189,11 +169,8 @@ class LoadBalancersController(base.BaseController):
raise exceptions.ValidationException(detail=_(
"Missing project ID in request where one is required."))
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='post')
target = {'project_id': load_balancer.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, load_balancer.project_id,
constants.RBAC_POST)
self._validate_vip_request_object(load_balancer)
@ -364,11 +341,8 @@ class LoadBalancersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_lb = self._get_db_lb(context.session, id)
# Check that the user is authorized to update this lb
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='put')
target = {'project_id': db_lb.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_lb.project_id,
constants.RBAC_PUT)
self._test_lb_status(context.session, id)
try:
@ -389,11 +363,8 @@ class LoadBalancersController(base.BaseController):
cascade = strutils.bool_from_string(cascade)
db_lb = self._get_db_lb(context.session, id)
# Check that the user is authorized to delete this lb
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_LOADBALANCER, action='delete')
target = {'project_id': db_lb.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_lb.project_id,
constants.RBAC_DELETE)
with db_api.get_lock_session() as lock_session:
self._test_lb_status(lock_session, id,

View File

@ -35,6 +35,7 @@ LOG = logging.getLogger(__name__)
class MembersController(base.BaseController):
RBAC_TYPE = constants.RBAC_MEMBER
def __init__(self, pool_id):
super(MembersController, self).__init__()
@ -47,11 +48,8 @@ class MembersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_member = self._get_db_member(context.session, id)
# Check that the user is authorized to show this member
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_MEMBER, action='get_one')
target = {'project_id': db_member.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_member.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(db_member,
member_types.MemberResponse)
@ -173,11 +171,8 @@ class MembersController(base.BaseController):
member.project_id = self._get_lb_project_id(context.session,
pool.load_balancer_id)
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_MEMBER, action='post')
target = {'project_id': member.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, member.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
@ -218,11 +213,8 @@ class MembersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_member = self._get_db_member(context.session, id)
# Check that the user is authorized to update this member
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_MEMBER, action='put')
target = {'project_id': db_member.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_member.project_id,
constants.RBAC_PUT)
self._test_lb_and_listener_and_pool_statuses(context.session,
member=db_member)
@ -253,11 +245,8 @@ class MembersController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_member = self._get_db_member(context.session, id)
# Check that the user is authorized to update this member
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_MEMBER, action='delete')
target = {'project_id': db_member.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_member.project_id,
constants.RBAC_DELETE)
self._test_lb_and_listener_and_pool_statuses(context.session,
member=db_member)

View File

@ -39,6 +39,7 @@ LOG = logging.getLogger(__name__)
class PoolsController(base.BaseController):
RBAC_TYPE = constants.RBAC_POOL
def __init__(self):
super(PoolsController, self).__init__()
@ -50,11 +51,8 @@ class PoolsController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_pool = self._get_db_pool(context.session, id)
# Check that the user is authorized to show this pool
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='get_one')
target = {'project_id': db_pool.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_pool.project_id,
constants.RBAC_GET_ONE)
result = self._convert_db_to_type(db_pool, pool_types.PoolResponse)
return pool_types.PoolRootResponse(pool=result)
@ -66,25 +64,7 @@ class PoolsController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check that the user is authorized to list pools under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check that the user is authorized to list lbs under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
db_pools, links = self.repositories.pool.get_all(
context.session, show_deleted=False,
@ -187,11 +167,8 @@ class PoolsController(base.BaseController):
"loadbalancer_id, listener_id")
raise exceptions.ValidationException(detail=msg)
# Check that the user is authorized to create under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='post')
target = {'project_id': pool.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, pool.project_id,
constants.RBAC_POST)
lock_session = db_api.get_session(autocommit=False)
if self.repositories.check_quota_met(
@ -273,11 +250,8 @@ class PoolsController(base.BaseController):
context = pecan.request.context.get('octavia_context')
db_pool = self._get_db_pool(context.session, id)
# Check that the user is authorized to update this pool
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='put')
target = {'project_id': db_pool.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_pool.project_id,
constants.RBAC_PUT)
self._test_lb_and_listener_statuses(
context.session, lb_id=db_pool.load_balancer_id,
@ -311,11 +285,8 @@ class PoolsController(base.BaseController):
raise exceptions.PoolInUseByL7Policy(
id=db_pool.id, l7policy_id=db_pool.l7policies[0].id)
# Check that the user is authorized to delete this pool
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_POOL, action='delete')
target = {'project_id': db_pool.project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, db_pool.project_id,
constants.RBAC_DELETE)
self._test_lb_and_listener_statuses(
context.session, lb_id=db_pool.load_balancer_id,

View File

@ -27,6 +27,7 @@ CONF.import_group('quotas', 'octavia.common.config')
class QuotasController(base.BaseController):
RBAC_TYPE = constants.RBAC_QUOTA
def __init__(self):
super(QuotasController, self).__init__()
@ -36,11 +37,7 @@ class QuotasController(base.BaseController):
"""Get a single project's quota details."""
context = pecan.request.context.get('octavia_context')
# Check that the user is authorized to show this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_one')
target = {'project_id': project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, project_id, constants.RBAC_GET_ONE)
db_quotas = self._get_db_quotas(context.session, project_id)
return self._convert_db_to_type(db_quotas, quota_types.QuotaResponse)
@ -52,25 +49,7 @@ class QuotasController(base.BaseController):
pcontext = pecan.request.context
context = pcontext.get('octavia_context')
# Check that the user is authorized to list quotas under all projects
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_all-global')
target = {'project_id': project_id}
if not context.policy.authorize(action, target, do_raise=False):
# Not a global observer or admin
if project_id is None:
project_id = context.project_id
# Check if user is authorized to list quota under this project
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='get_all')
target = {'project_id': project_id}
context.policy.authorize(action, target)
if project_id is None:
query_filter = {}
else:
query_filter = {'project_id': project_id}
query_filter = self._auth_get_all(context, project_id)
db_quotas, links = self.repositories.quotas.get_all(
context.session,
@ -89,11 +68,7 @@ class QuotasController(base.BaseController):
if not project_id:
raise exceptions.MissingAPIProjectID()
# Check that the user is authorized to update this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='put')
target = {'project_id': project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, project_id, constants.RBAC_PUT)
quotas_dict = quotas.to_dict()
self.repositories.quotas.update(context.session, project_id,
@ -109,11 +84,7 @@ class QuotasController(base.BaseController):
if not project_id:
raise exceptions.MissingAPIProjectID()
# Check that the user is authorized to delete this quota
action = '{rbac_obj}{action}'.format(
rbac_obj=constants.RBAC_QUOTA, action='delete')
target = {'project_id': project_id}
context.policy.authorize(action, target)
self._auth_validate_action(context, project_id, constants.RBAC_DELETE)
self.repositories.quotas.delete(context.session, project_id)
db_quotas = self._get_db_quotas(context.session, project_id)

View File

@ -438,3 +438,10 @@ RBAC_HEALTHMONITOR = '{}:healthmonitor:'.format(LOADBALANCER_API)
RBAC_L7POLICY = '{}:l7policy:'.format(LOADBALANCER_API)
RBAC_L7RULE = '{}:l7rule:'.format(LOADBALANCER_API)
RBAC_QUOTA = '{}:quota:'.format(LOADBALANCER_API)
RBAC_POST = 'post'
RBAC_PUT = 'put'
RBAC_DELETE = 'delete'
RBAC_GET_ONE = 'get_one'
RBAC_GET_ALL = 'get_all'
RBAC_GET_ALL_GLOBAL = 'get_all-global'