Extract revocations to file

Extract the revocations client and test code to their own file.

Change-Id: Ib5955cf4efea6b95d29b6ec722eabcf755688e65
Implements: bp refactor-extract-module
This commit is contained in:
Jamie Lennox 2015-02-19 15:43:23 +11:00
parent 249d9ddb8e
commit f950931484
5 changed files with 118 additions and 94 deletions

View File

@ -174,7 +174,6 @@ keystone.token_auth
import contextlib
import datetime
import logging
import os
from keystoneclient import access
from keystoneclient import adapter
@ -194,6 +193,7 @@ from six.moves import urllib
from keystonemiddleware import _memcache_crypt as memcache_crypt
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.i18n import _, _LC, _LE, _LI, _LW
from keystonemiddleware.openstack.common import memorycache
@ -851,11 +851,11 @@ class AuthProtocol(object):
revocation_cache_timeout = datetime.timedelta(
seconds=self._conf_get('revocation_cache_time'))
self._revocations = _Revocations(revocation_cache_timeout,
self._signing_directory,
self._identity_server,
self._cms_verify,
self._LOG)
self._revocations = _revocations.Revocations(revocation_cache_timeout,
self._signing_directory,
self._identity_server,
self._cms_verify,
self._LOG)
self._check_revocations_for_cached = self._conf_get(
'check_revocations_for_cached')
@ -1696,89 +1696,6 @@ class _V3RequestStrategy(_RequestStrategy):
_REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy]
class _Revocations(object):
_FILE_NAME = 'revoked.pem'
def __init__(self, timeout, signing_directory, identity_server,
cms_verify, log=_LOG):
self._cache_timeout = timeout
self._signing_directory = signing_directory
self._identity_server = identity_server
self._cms_verify = cms_verify
self._log = log
self._fetched_time_prop = None
self._list_prop = None
@property
def _fetched_time(self):
if not self._fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
file_path = self._signing_directory.calc_path(self._FILE_NAME)
if os.path.exists(file_path):
mtime = os.path.getmtime(file_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
fetched_time = datetime.datetime.min
self._fetched_time_prop = fetched_time
return self._fetched_time_prop
@_fetched_time.setter
def _fetched_time(self, value):
self._fetched_time_prop = value
def _fetch(self):
revocation_list_data = self._identity_server.fetch_revocation_list()
return self._cms_verify(revocation_list_data)
@property
def _list(self):
timeout = self._fetched_time + self._cache_timeout
list_is_current = timeutils.utcnow() < timeout
if list_is_current:
# Load the list from disk if required
if not self._list_prop:
self._list_prop = jsonutils.loads(
self._signing_directory.read_file(self._FILE_NAME))
else:
self._list = self._fetch()
return self._list_prop
@_list.setter
def _list(self, value):
"""Save a revocation list to memory and to disk.
:param value: A json-encoded revocation list
"""
self._list_prop = jsonutils.loads(value)
self._fetched_time = timeutils.utcnow()
self._signing_directory.write_file(self._FILE_NAME, value)
def _is_revoked(self, token_id):
"""Indicate whether the token_id appears in the revocation list."""
revoked_tokens = self._list.get('revoked', None)
if not revoked_tokens:
return False
revoked_ids = (x['id'] for x in revoked_tokens)
return token_id in revoked_ids
def _any_revoked(self, token_ids):
for token_id in token_ids:
if self._is_revoked(token_id):
return True
return False
def check(self, token_ids):
if self._any_revoked(token_ids):
self._log.debug('Token is marked as having been revoked')
raise exc.InvalidToken(_('Token has been revoked'))
class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.

View File

@ -0,0 +1,106 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
import os
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.i18n import _
_LOG = logging.getLogger(__name__)
class Revocations(object):
_FILE_NAME = 'revoked.pem'
def __init__(self, timeout, signing_directory, identity_server,
cms_verify, log=_LOG):
self._cache_timeout = timeout
self._signing_directory = signing_directory
self._identity_server = identity_server
self._cms_verify = cms_verify
self._log = log
self._fetched_time_prop = None
self._list_prop = None
@property
def _fetched_time(self):
if not self._fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
file_path = self._signing_directory.calc_path(self._FILE_NAME)
if os.path.exists(file_path):
mtime = os.path.getmtime(file_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
fetched_time = datetime.datetime.min
self._fetched_time_prop = fetched_time
return self._fetched_time_prop
@_fetched_time.setter
def _fetched_time(self, value):
self._fetched_time_prop = value
def _fetch(self):
revocation_list_data = self._identity_server.fetch_revocation_list()
return self._cms_verify(revocation_list_data)
@property
def _list(self):
timeout = self._fetched_time + self._cache_timeout
list_is_current = timeutils.utcnow() < timeout
if list_is_current:
# Load the list from disk if required
if not self._list_prop:
self._list_prop = jsonutils.loads(
self._signing_directory.read_file(self._FILE_NAME))
else:
self._list = self._fetch()
return self._list_prop
@_list.setter
def _list(self, value):
"""Save a revocation list to memory and to disk.
:param value: A json-encoded revocation list
"""
self._list_prop = jsonutils.loads(value)
self._fetched_time = timeutils.utcnow()
self._signing_directory.write_file(self._FILE_NAME, value)
def _is_revoked(self, token_id):
"""Indicate whether the token_id appears in the revocation list."""
revoked_tokens = self._list.get('revoked', None)
if not revoked_tokens:
return False
revoked_ids = (x['id'] for x in revoked_tokens)
return token_id in revoked_ids
def _any_revoked(self, token_ids):
for token_id in token_ids:
if self._is_revoked(token_id):
return True
return False
def check(self, token_ids):
if self._any_revoked(token_ids):
self._log.debug('Token is marked as having been revoked')
raise exc.InvalidToken(_('Token has been revoked'))

View File

@ -44,6 +44,7 @@ import webob.dec
from keystonemiddleware import auth_token
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.openstack.common import memorycache
from keystonemiddleware.tests import client_fixtures
from keystonemiddleware.tests import utils
@ -904,7 +905,7 @@ class CommonAuthTokenMiddlewareTest(object):
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
auth_token._Revocations._FILE_NAME)
_revocations.Revocations._FILE_NAME)
os.remove(revoked_path)
self.assertEqual(self.middleware._revocations._fetched_time,
@ -914,7 +915,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware._revocations._fetched_time = None
revoked_path = self.middleware._signing_directory.calc_path(
auth_token._Revocations._FILE_NAME)
_revocations.Revocations._FILE_NAME)
mtime = os.path.getmtime(revoked_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
@ -942,7 +943,7 @@ class CommonAuthTokenMiddlewareTest(object):
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
auth_token._Revocations._FILE_NAME)
_revocations.Revocations._FILE_NAME)
os.remove(revoked_path)
self.assertEqual(self.middleware._revocations._list,

View File

@ -20,8 +20,8 @@ import uuid
import mock
import testtools
from keystonemiddleware import auth_token
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.auth_token import _signing_dir
@ -39,7 +39,7 @@ class RevocationsTests(testtools.TestCase):
}
cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj))
revocations = auth_token._Revocations(
revocations = _revocations.Revocations(
timeout=datetime.timedelta(1), signing_directory=signing_directory,
identity_server=identity_server, cms_verify=cms_verify)