Refactor is_admin

Consolidate duplicate policy checking code. Using the more complete
logic from check_protection also allows assert_admin to work with more
complicated admin_required rules.

Change-Id: I49c7c9d2732675b87c58c9460671dfe13969fd34
This commit is contained in:
Adam Young 2016-10-17 14:27:33 -04:00 committed by Matthew Edmonds
parent 7d8f2fcfb9
commit 40c118d6ba

View File

@ -15,8 +15,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_log import log
from oslo_utils import strutils
@ -128,28 +126,7 @@ def assert_admin(app, request):
does not have the admin role
"""
request.assert_authenticated()
if not request.context.is_admin:
user_token_ref = get_token_ref(request.context_dict)
creds = copy.deepcopy(user_token_ref.metadata)
try:
creds['user_id'] = user_token_ref.user_id
except exception.UnexpectedError:
LOG.debug('Invalid user')
raise exception.Unauthorized(_('Invalid user'))
if user_token_ref.project_scoped:
creds['tenant_id'] = user_token_ref.project_id
else:
LOG.debug('Invalid tenant')
raise exception.Unauthorized(_('Invalid tenant'))
creds['roles'] = user_token_ref.role_names
# Accept either is_admin or the admin role
app.policy_api.enforce(creds, 'admin_required', {})
check_policy(app, request, 'admin_required', input_attr={})
def _build_policy_check_credentials(action, context, kwargs):
@ -220,33 +197,46 @@ def check_protection(controller, request, prep_info, target_attr=None,
they can be referenced by policy rules.
"""
check_policy(controller, request,
prep_info['f_name'],
prep_info.get('filter_attr'),
prep_info.get('input_attr'),
target_attr,
*args, **kwargs)
def check_policy(controller, request, f_name,
filter_attr=None, input_attr=None, target_attr=None,
*args, **kwargs):
# Makes the arguments from check protection explicit.
request.assert_authenticated()
if request.context.is_admin:
LOG.warning('RBAC: Bypassing authorization')
return
else:
action = 'identity:%s' % prep_info['f_name']
# TODO(henry-nash) need to log the target attributes as well
creds = _build_policy_check_credentials(
action, request.context_dict, prep_info['input_attr'])
# Build the dict the policy engine will check against from both the
# parameters passed into the call we are protecting (which was
# stored in the prep_info by protected()), plus the target
# attributes provided.
policy_dict = {}
_handle_member_from_driver(controller, policy_dict, **kwargs)
_handle_subject_token_id(controller, request, policy_dict)
if target_attr:
policy_dict = {'target': target_attr}
policy_dict.update(prep_info['input_attr'])
if 'filter_attr' in prep_info:
policy_dict.update(prep_info['filter_attr'])
action = 'identity:%s' % f_name
# TODO(henry-nash) need to log the target attributes as well
creds = _build_policy_check_credentials(
action, request.context_dict, input_attr)
# Build the dict the policy engine will check against from both the
# parameters passed into the call we are protecting plus the target
# attributes provided.
policy_dict = {}
_handle_member_from_driver(controller, policy_dict, **kwargs)
_handle_subject_token_id(controller, request, policy_dict)
for key in kwargs:
policy_dict[key] = kwargs[key]
controller.policy_api.enforce(creds,
action,
utils.flatten_dict(policy_dict))
if target_attr:
policy_dict = {'target': target_attr}
if input_attr:
policy_dict.update(input_attr)
if filter_attr:
policy_dict.update(filter_attr)
for key in kwargs:
policy_dict[key] = kwargs[key]
controller.policy_api.enforce(creds,
action,
utils.flatten_dict(policy_dict))
LOG.debug('RBAC: Authorization granted')