Consolidate `assert_XXX_enabled` type calls to managers

There are multiple implementations of the various assert_XXX_enabled
methods. These have been consolidated to rely on the appropriate
manager objects (e.g. domain uses assignment_api, user now uses
identity_api).

``keystone.token.core.validate_auth_info`` has been deprecated
with no plans to be superseded as the new ``assert_XXXX_enabled``
methods now cover the functionality.

This is another step in the process to deprecate the functionality
from the token_api (and move relevant logic either the
token provider or other relevant locations).

Change-Id: I450daaca946e9695ff30e058adecef6ef0b9058a
bp: non-persistent-tokens
This commit is contained in:
Morgan Fainberg 2014-07-15 17:55:28 -07:00
parent 49d897872d
commit ac31133fba
7 changed files with 97 additions and 20 deletions

View File

@ -83,6 +83,27 @@ class Manager(manager.Manager):
ret['domain_id'])
return ret
def assert_domain_enabled(self, domain_id, domain=None):
"""Assert the Domain is enabled.
:raise AssertionError if domain is disabled.
"""
if domain is None:
domain = self.get_domain(domain_id)
if not domain.get('enabled', True):
raise AssertionError(_('Domain is disabled: %s') % domain_id)
def assert_project_enabled(self, project_id, project=None):
"""Assert the project is enabled and its associated domain is enabled.
:raise AssertionError if the project or domain is disabled.
"""
if project is None:
project = self.get_project(project_id)
self.assert_domain_enabled(domain_id=project['domain_id'])
if not project.get('enabled', True):
raise AssertionError(_('Project is disabled: %s') % project_id)
@notifications.disabled(_PROJECT, public=False)
def _disable_project(self, tenant_id):
return self.token_api.delete_tokens_for_users(

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
from keystoneclient.common import cms
import six
@ -149,16 +151,24 @@ class AuthInfo(object):
def _assert_project_is_enabled(self, project_ref):
# ensure the project is enabled
if not project_ref.get('enabled', True):
msg = _('Project is disabled: %s') % project_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
try:
self.assignment_api.assert_project_enabled(
project_id=project_ref['id'],
project=project_ref)
except AssertionError as e:
LOG.warning(e)
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _assert_domain_is_enabled(self, domain_ref):
if not domain_ref.get('enabled'):
msg = _('Domain is disabled: %s') % (domain_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
try:
self.assignment_api.assert_domain_enabled(
domain_id=domain_ref['id'],
domain=domain_ref)
except AssertionError as e:
LOG.warning(e)
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')

View File

@ -12,6 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import six
from keystone import auth
from keystone.common import dependency
from keystone import exception
@ -37,16 +41,24 @@ class UserAuthInfo(object):
self.user_ref = None
def _assert_domain_is_enabled(self, domain_ref):
if not domain_ref.get('enabled'):
msg = _('Domain is disabled: %s') % (domain_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
try:
self.assignment_api.assert_domain_enabled(
domain_id=domain_ref['id'],
domain=domain_ref)
except AssertionError as e:
LOG.warning(e)
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _assert_user_is_enabled(self, user_ref):
if not user_ref.get('enabled', True):
msg = _('User is disabled: %s') % (user_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
try:
self.identity_api.assert_user_enabled(
user_id=user_ref['id'],
user=user_ref)
except AssertionError as e:
LOG.warning(e)
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')

View File

@ -33,6 +33,7 @@ Glance to list images needed to perform the requested task.
"""
import abc
import sys
import uuid
import six
@ -46,7 +47,6 @@ from keystone.common import wsgi
from keystone import exception
from keystone.i18n import _
from keystone.openstack.common import jsonutils
from keystone import token
@dependency.requires('assignment_api', 'catalog_api', 'credential_api',
@ -125,7 +125,16 @@ class Ec2ControllerCommon(object):
metadata_ref['trustee_user_id'] = user_ref['id']
# Validate that the auth info is valid and nothing is disabled
token.validate_auth_info(self, user_ref, tenant_ref)
try:
self.identity_api.assert_user_enabled(
user_id=user_ref['id'], user=user_ref)
self.assignment_api.assert_domain_enabled(
domain_id=user_ref['domain_id'])
self.assignment_api.assert_project_enabled(
project_id=tenant_ref['id'], project=tenant_ref)
except AssertionError as e:
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
roles = metadata_ref.get('roles', [])
if not roles:

View File

@ -538,6 +538,17 @@ class Manager(manager.Manager):
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
def assert_user_enabled(self, user_id, user=None):
"""Assert the user and the user's domain are enabled.
:raise AssertionError if the user or the user's domain is disabled.
"""
if user is None:
user = self.get_user(user_id)
self.assignment_api.assert_domain_enabled(user['domain_id'])
if not user.get('enabled', True):
raise AssertionError(_('User is disabled: %s') % user_id)
@domains_configured
@exception_translated('user')
def get_user_by_name(self, user_name, domain_id):

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
import sys
from keystoneclient.common import cms
import six
@ -26,7 +27,6 @@ from keystone.i18n import _
from keystone.openstack.common import jsonutils
from keystone.openstack.common import log
from keystone.openstack.common import timeutils
from keystone.token import core
from keystone.token import provider
@ -99,7 +99,18 @@ class Auth(controller.V2Controller):
context, auth)
user_ref, tenant_ref, metadata_ref, expiry, bind = auth_info
core.validate_auth_info(self, user_ref, tenant_ref)
# Validate that the auth info is valid and nothing is disabled
try:
self.identity_api.assert_user_enabled(
user_id=user_ref['id'], user=user_ref)
self.assignment_api.assert_domain_enabled(
domain_id=user_ref['domain_id'])
if tenant_ref:
self.assignment_api.assert_project_enabled(
project_id=tenant_ref['id'], project=tenant_ref)
except AssertionError as e:
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
# NOTE(morganfainberg): Make sure the data is in correct form since it
# might be consumed external to Keystone and this is a v2.0 controller.
# The user_ref is encoded into the auth_token_data which is returned as

View File

@ -49,6 +49,9 @@ def default_expire_time():
return provider.default_expire_time()
@versionutils.deprecated(as_of=versionutils.deprecated.JUNO,
what='keystone.token.core.validate_auth_info',
remove_in=+1)
def validate_auth_info(self, user_ref, tenant_ref):
"""Validate user and tenant auth info.