6f63f09735
The positional decorator results in poorly maintainable code in a misguided effort to emulate python3's key-word-arg only notation and functionality. This patch removes keystonemiddleware's dependance on the positional decorator. Change-Id: I1be3b19d08daf18babac66f274787862c6d77a93
100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import uuid
|
|
|
|
import fixtures
|
|
from keystoneauth1 import fixture as client_fixtures
|
|
from oslo_log import log as logging
|
|
from oslo_utils import timeutils
|
|
|
|
from keystonemiddleware import auth_token
|
|
from keystonemiddleware.auth_token import _exceptions
|
|
|
|
|
|
_LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class AuthTokenFixture(fixtures.Fixture):
|
|
"""Overrides what keystonemiddleware will return to the app behind it."""
|
|
|
|
def setUp(self):
|
|
super(AuthTokenFixture, self).setUp()
|
|
# Ensure that the initialized token data is cleaned up
|
|
self._token_data = {}
|
|
self.addCleanup(self._token_data.clear)
|
|
_LOG.info('Using Testing AuthTokenFixture...')
|
|
self.useFixture(fixtures.MockPatchObject(
|
|
auth_token.AuthProtocol,
|
|
'fetch_token',
|
|
self.fetch_token))
|
|
|
|
@property
|
|
def tokens(self):
|
|
return self._token_data.keys()
|
|
|
|
def add_token_data(self, token_id=None, expires=None,
|
|
user_id=None, user_name=None,
|
|
user_domain_id=None, user_domain_name=None,
|
|
project_id=None, project_name=None,
|
|
project_domain_id=None, project_domain_name=None,
|
|
role_list=None, is_v2=False):
|
|
"""Add token data to the auth_token fixture."""
|
|
if not token_id:
|
|
token_id = uuid.uuid4().hex
|
|
|
|
if not role_list:
|
|
role_list = []
|
|
|
|
if is_v2:
|
|
token = client_fixtures.V2Token(
|
|
token_id=token_id, expires=expires, tenant_id=project_id,
|
|
tenant_name=project_name, user_id=user_id, user_name=user_name)
|
|
else:
|
|
token = client_fixtures.V3Token(
|
|
expires=expires, user_id=user_id, user_name=user_name,
|
|
user_domain_id=user_domain_id, project_id=project_id,
|
|
project_name=project_name,
|
|
project_domain_id=project_domain_id,
|
|
user_domain_name=user_domain_name,
|
|
project_domain_name=project_domain_name)
|
|
for role in role_list:
|
|
token.add_role(name=role)
|
|
self.add_token(token, token_id=token_id)
|
|
|
|
def add_token(self, token_data, token_id=None):
|
|
"""Add an existing token to the middleware.
|
|
|
|
:param token_data: token data to add to the fixture
|
|
:type token_data: dict
|
|
:param token_id: the token ID to add this token as
|
|
:type token_id: str
|
|
:returns: The token_id that the token was added as.
|
|
:rtype: str
|
|
"""
|
|
if not token_id:
|
|
token_id = uuid.uuid4().hex
|
|
self._token_data[token_id] = token_data
|
|
return token_id
|
|
|
|
def fetch_token(self, token, **kwargs):
|
|
"""Low level replacement of fetch_token for AuthProtocol."""
|
|
token_data = self._token_data.get(token, {})
|
|
if token_data:
|
|
self._assert_token_not_expired(token_data.expires)
|
|
return token_data
|
|
raise _exceptions.InvalidToken()
|
|
|
|
def _assert_token_not_expired(self, token_expires):
|
|
if timeutils.utcnow() > timeutils.normalize_time(token_expires):
|
|
raise _exceptions.InvalidToken()
|