Make assert_admin work with a request

Push further into pushing a request object around, fix the v2
assert_admin method to work with a request.

Change-Id: I83063178b04c5e401d1f1a6bb9bce63a4a38910e
This commit is contained in:
Jamie Lennox 2016-07-04 16:15:41 +10:00
parent d32910b29d
commit b66693ed23
9 changed files with 47 additions and 44 deletions

View File

@ -65,7 +65,7 @@ class TenantAssignment(controller.V2Controller):
@controller.v2_deprecated
def get_project_users(self, request, tenant_id, **kw):
self.assert_admin(request.context_dict)
self.assert_admin(request)
user_refs = []
user_ids = self.assignment_api.list_user_ids_for_project(tenant_id)
for user_id in user_ids:
@ -88,13 +88,13 @@ class Role(controller.V2Controller):
@controller.v2_deprecated
def get_role(self, request, role_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
return {'role': self.role_api.get_role(role_id)}
@controller.v2_deprecated
def create_role(self, request, role):
role = self._normalize_dict(role)
self.assert_admin(request.context_dict)
self.assert_admin(request)
if 'name' not in role or not role['name']:
msg = _('Name field is required and cannot be empty')
@ -115,13 +115,13 @@ class Role(controller.V2Controller):
@controller.v2_deprecated
def delete_role(self, request, role_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
initiator = notifications._get_request_audit_info(request.context_dict)
self.role_api.delete_role(role_id, initiator)
@controller.v2_deprecated
def get_roles(self, request):
self.assert_admin(request.context_dict)
self.assert_admin(request)
return {'roles': self.role_api.list_roles()}
@ -138,7 +138,7 @@ class RoleAssignmentV2(controller.V2Controller):
not implementing them in hopes that the idea will die off.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
# NOTE(davechen): Router without project id is defined,
# but we don't plan on implementing this.
if tenant_id is None:
@ -157,7 +157,7 @@ class RoleAssignmentV2(controller.V2Controller):
not implementing them in hopes that the idea will die off.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
if tenant_id is None:
raise exception.NotImplemented(
message=_('User roles not supported: tenant_id required'))
@ -176,7 +176,7 @@ class RoleAssignmentV2(controller.V2Controller):
not implementing them in hopes that the idea will die off.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
if tenant_id is None:
raise exception.NotImplemented(
message=_('User roles not supported: tenant_id required'))
@ -197,7 +197,7 @@ class RoleAssignmentV2(controller.V2Controller):
up the appropriate data when we need to delete them.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
tenants = self.assignment_api.list_projects_for_user(user_id)
o = []
for tenant in tenants:
@ -224,7 +224,7 @@ class RoleAssignmentV2(controller.V2Controller):
a role.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
# TODO(termie): for now we're ignoring the actual role
tenant_id = role.get('tenantId')
role_id = role.get('roleId')
@ -247,7 +247,7 @@ class RoleAssignmentV2(controller.V2Controller):
we remove the user from the tenant.
"""
self.assert_admin(request.context_dict)
self.assert_admin(request)
# TODO(termie): for now we're ignoring the actual role
role_ref_ref = urllib.parse.parse_qs(role_ref_id)
tenant_id = role_ref_ref.get('tenantId')[0]

View File

@ -37,25 +37,25 @@ class Service(controller.V2Controller):
@controller.v2_deprecated
def get_services(self, request):
self.assert_admin(request.context_dict)
self.assert_admin(request)
service_list = self.catalog_api.list_services()
return {'OS-KSADM:services': service_list}
@controller.v2_deprecated
def get_service(self, request, service_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
service_ref = self.catalog_api.get_service(service_id)
return {'OS-KSADM:service': service_ref}
@controller.v2_deprecated
def delete_service(self, request, service_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
initiator = notifications._get_request_audit_info(request.context_dict)
self.catalog_api.delete_service(service_id, initiator)
@controller.v2_deprecated
def create_service(self, request, OS_KSADM_service):
self.assert_admin(request.context_dict)
self.assert_admin(request)
service_id = uuid.uuid4().hex
service_ref = OS_KSADM_service.copy()
service_ref['id'] = service_id
@ -71,7 +71,7 @@ class Endpoint(controller.V2Controller):
@controller.v2_deprecated
def get_endpoints(self, request):
"""Merge matching v3 endpoint refs into legacy refs."""
self.assert_admin(request.context_dict)
self.assert_admin(request)
legacy_endpoints = {}
v3_endpoints = {}
for endpoint in self.catalog_api.list_endpoints():
@ -131,7 +131,7 @@ class Endpoint(controller.V2Controller):
@controller.v2_deprecated
def create_endpoint(self, request, endpoint):
"""Create three v3 endpoint refs based on a legacy ref."""
self.assert_admin(request.context_dict)
self.assert_admin(request)
# according to the v2 spec publicurl is mandatory
self._require_attribute(endpoint, 'publicurl')
@ -186,7 +186,7 @@ class Endpoint(controller.V2Controller):
@controller.v2_deprecated
def delete_endpoint(self, request, endpoint_id):
"""Delete up to three v3 endpoint refs based on a legacy ref ID."""
self.assert_admin(request.context_dict)
self.assert_admin(request)
initiator = notifications._get_request_audit_info(request.context_dict)
deleted_at_least_one = False

View File

@ -272,7 +272,7 @@ class Application(BaseApplication):
def _normalize_dict(self, d):
return {self._normalize_arg(k): v for (k, v) in d.items()}
def assert_admin(self, context):
def assert_admin(self, request):
"""Ensure the user is an admin.
:raises keystone.exception.Unauthorized: if a token could not be
@ -282,10 +282,10 @@ class Application(BaseApplication):
does not have the admin role
"""
if not context['is_admin']:
user_token_ref = utils.get_token_ref(context)
if not request.context_dict['is_admin']:
user_token_ref = utils.get_token_ref(request.context_dict)
validate_token_bind(context, user_token_ref)
validate_token_bind(request.context_dict, user_token_ref)
creds = copy.deepcopy(user_token_ref.metadata)
try:

View File

@ -286,27 +286,27 @@ class Ec2Controller(Ec2ControllerCommon, controller.V2Controller):
@controller.v2_ec2_deprecated
def get_credential(self, request, user_id, credential_id):
if not self._is_admin(request.context_dict):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).get_credential(user_id,
credential_id)
@controller.v2_ec2_deprecated
def get_credentials(self, request, user_id):
if not self._is_admin(request.context_dict):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).get_credentials(user_id)
@controller.v2_ec2_deprecated
def create_credential(self, request, user_id, tenant_id):
if not self._is_admin(request.context_dict):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
return super(Ec2Controller, self).create_credential(
request.context_dict, user_id, tenant_id)
@controller.v2_ec2_deprecated
def delete_credential(self, request, user_id, credential_id):
if not self._is_admin(request.context_dict):
if not self._is_admin(request):
self._assert_identity(request.context_dict, user_id)
self._assert_owner(user_id, credential_id)
return super(Ec2Controller, self).delete_credential(user_id,
@ -325,7 +325,7 @@ class Ec2Controller(Ec2ControllerCommon, controller.V2Controller):
if token_ref.user_id != user_id:
raise exception.Forbidden(_('Token belongs to another user'))
def _is_admin(self, context):
def _is_admin(self, request):
"""Wrap admin assertion error return statement.
:param context: standard context
@ -335,7 +335,7 @@ class Ec2Controller(Ec2ControllerCommon, controller.V2Controller):
try:
# NOTE(morganfainberg): policy_api is required for assert_admin
# to properly perform policy enforcement.
self.assert_admin(context)
self.assert_admin(request)
return True
except (exception.Forbidden, exception.Unauthorized):
return False

View File

@ -35,7 +35,7 @@ class User(controller.V2Controller):
@controller.v2_deprecated
def get_user(self, request, user_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
ref = self.identity_api.get_user(user_id)
return {'user': self.v3_to_v2_user(ref)}
@ -46,14 +46,14 @@ class User(controller.V2Controller):
if 'name' in request.params:
return self.get_user_by_name(request, request.params['name'])
self.assert_admin(request.context_dict)
self.assert_admin(request)
user_list = self.identity_api.list_users(
CONF.identity.default_domain_id)
return {'users': self.v3_to_v2_user(user_list)}
@controller.v2_deprecated
def get_user_by_name(self, request, user_name):
self.assert_admin(request.context_dict)
self.assert_admin(request)
ref = self.identity_api.get_user_by_name(
user_name, CONF.identity.default_domain_id)
return {'user': self.v3_to_v2_user(ref)}
@ -64,7 +64,7 @@ class User(controller.V2Controller):
user = self._normalize_OSKSADM_password_on_request(user)
user = self.normalize_username_in_request(user)
user = self._normalize_dict(user)
self.assert_admin(request.context_dict)
self.assert_admin(request)
if 'name' not in user or not user['name']:
msg = _('Name field is required and cannot be empty')
@ -96,7 +96,7 @@ class User(controller.V2Controller):
def update_user(self, request, user_id, user):
# NOTE(termie): this is really more of a patch than a put
user = self.normalize_username_in_request(user)
self.assert_admin(request.context_dict)
self.assert_admin(request)
if 'enabled' in user and not isinstance(user['enabled'], bool):
msg = _('Enabled field should be a boolean')
@ -168,7 +168,7 @@ class User(controller.V2Controller):
@controller.v2_deprecated
def delete_user(self, request, user_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
initiator = notifications._get_request_audit_info(request.context_dict)
self.identity_api.delete_user(user_id, initiator)

View File

@ -38,7 +38,7 @@ class Tenant(controller.V2Controller):
@controller.v2_deprecated
def get_all_projects(self, request, **kw):
"""Get a list of all tenants for an admin user."""
self.assert_admin(request.context_dict)
self.assert_admin(request)
name = request.params.get('name')
if name:
@ -70,7 +70,7 @@ class Tenant(controller.V2Controller):
@controller.v2_deprecated
def get_project(self, request, tenant_id):
# TODO(termie): this stuff should probably be moved to middleware
self.assert_admin(request.context_dict)
self.assert_admin(request)
ref = self.resource_api.get_project(tenant_id)
self._assert_not_is_domain_project(tenant_id, ref)
return {'tenant': self.v3_to_v2_project(ref)}
@ -96,7 +96,7 @@ class Tenant(controller.V2Controller):
'allowed in v2.')
raise exception.ValidationError(message=msg)
self.assert_admin(request.context_dict)
self.assert_admin(request)
self.resource_api.ensure_default_domain_exists()
@ -110,7 +110,7 @@ class Tenant(controller.V2Controller):
@controller.v2_deprecated
def update_project(self, request, tenant_id, tenant):
self.assert_admin(request.context_dict)
self.assert_admin(request)
self._assert_not_is_domain_project(tenant_id)
# Remove domain_id and is_domain if specified - a v2 api caller
# should not be specifying that
@ -124,7 +124,7 @@ class Tenant(controller.V2Controller):
@controller.v2_deprecated
def delete_project(self, request, tenant_id):
self.assert_admin(request.context_dict)
self.assert_admin(request)
self._assert_not_is_domain_project(tenant_id)
initiator = notifications._get_request_audit_info(request.context_dict)
self.resource_api.delete_project(tenant_id, initiator)

View File

@ -17,6 +17,7 @@ import uuid
from keystoneclient.contrib.ec2 import utils as ec2_utils
from six.moves import http_client
from keystone.common import request
from keystone.common import utils
from keystone.contrib.ec2 import controllers
from keystone import exception
@ -258,8 +259,10 @@ class V2CredentialEc2Controller(unit.TestCase):
is raised but not caught if the user is not an admin.
"""
# make a non-admin user
context = {'is_admin': False, 'token_id': uuid.uuid4().hex}
req = request.Request.blank('/')
req.context_dict['is_admin'] = False
req.context_dict['token_id'] = uuid.uuid4().hex
# check if user is admin
# no exceptions should be raised
self.controller._is_admin(context)
self.controller._is_admin(req)

View File

@ -453,7 +453,7 @@ class Auth(controller.V2Controller):
def delete_token(self, request, token_id):
"""Delete a token, effectively invalidating it for authz."""
# TODO(termie): this stuff should probably be moved to middleware
self.assert_admin(request.context_dict)
self.assert_admin(request)
self.token_provider_api.revoke_token(token_id)
@controller.v2_deprecated
@ -478,7 +478,7 @@ class Auth(controller.V2Controller):
@controller.v2_deprecated
def endpoints(self, request, token_id):
"""Return a list of endpoints available to the token."""
self.assert_admin(request.context_dict)
self.assert_admin(request)
token_ref = self._get_token_ref(token_id)

View File

@ -218,7 +218,7 @@ class TrustV3(controller.V3Controller):
def list_trusts(self, request):
trusts = []
if not request.params:
self.assert_admin(request.context_dict)
self.assert_admin(request)
trusts += self.trust_api.list_trusts()
if 'trustor_user_id' in request.params:
user_id = request.params['trustor_user_id']