Merge "Extract SigningDirectory into file"

This commit is contained in:
Jenkins 2015-02-25 00:55:42 +00:00 committed by Gerrit Code Review
commit 249d9ddb8e
4 changed files with 225 additions and 183 deletions

View File

@ -175,8 +175,6 @@ import contextlib
import datetime
import logging
import os
import stat
import tempfile
from keystoneclient import access
from keystoneclient import adapter
@ -196,6 +194,7 @@ from six.moves import urllib
from keystonemiddleware import _memcache_crypt as memcache_crypt
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.i18n import _, _LC, _LE, _LI, _LW
from keystonemiddleware.openstack.common import memorycache
@ -845,7 +844,7 @@ class AuthProtocol(object):
self._auth_uri = self._identity_server.auth_uri
self._signing_directory = _SigningDirectory(
self._signing_directory = _signing_dir.SigningDirectory(
directory_name=self._conf_get('signing_dir'), log=self._LOG)
self._token_cache = self._token_cache_factory()
@ -1780,65 +1779,6 @@ class _Revocations(object):
raise exc.InvalidToken(_('Token has been revoked'))
class _SigningDirectory(object):
def __init__(self, directory_name=None, log=None):
self._log = log or _LOG
if directory_name is None:
directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
self._log.info(
_LI('Using %s as cache directory for signing certificate'),
directory_name)
self._directory_name = directory_name
self._verify_signing_dir()
def write_file(self, file_name, new_contents):
# In Python2, encoding is slow so the following check avoids it if it
# is not absolutely necessary.
if isinstance(new_contents, six.text_type):
new_contents = new_contents.encode('utf-8')
def _atomic_write():
with tempfile.NamedTemporaryFile(dir=self._directory_name,
delete=False) as f:
f.write(new_contents)
os.rename(f.name, self.calc_path(file_name))
try:
_atomic_write()
except (OSError, IOError):
self._verify_signing_dir()
_atomic_write()
def read_file(self, file_name):
path = self.calc_path(file_name)
open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
with open(path, 'r', **open_kwargs) as f:
return f.read()
def calc_path(self, file_name):
return os.path.join(self._directory_name, file_name)
def _verify_signing_dir(self):
if os.path.isdir(self._directory_name):
if not os.access(self._directory_name, os.W_OK):
raise exc.ConfigurationError(
_('unable to access signing_dir %s') %
self._directory_name)
uid = os.getuid()
if os.stat(self._directory_name).st_uid != uid:
self._log.warning(_LW('signing_dir is not owned by %s'), uid)
current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
if current_mode != stat.S_IRWXU:
self._log.warning(
_LW('signing_dir mode is %(mode)s instead of %(need)s'),
{'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
else:
os.makedirs(self._directory_name, stat.S_IRWXU)
class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.

View File

@ -0,0 +1,83 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import stat
import tempfile
import six
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.i18n import _, _LI, _LW
_LOG = logging.getLogger(__name__)
class SigningDirectory(object):
def __init__(self, directory_name=None, log=None):
self._log = log or _LOG
if directory_name is None:
directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
self._log.info(
_LI('Using %s as cache directory for signing certificate'),
directory_name)
self._directory_name = directory_name
self._verify_signing_dir()
def write_file(self, file_name, new_contents):
# In Python2, encoding is slow so the following check avoids it if it
# is not absolutely necessary.
if isinstance(new_contents, six.text_type):
new_contents = new_contents.encode('utf-8')
def _atomic_write():
with tempfile.NamedTemporaryFile(dir=self._directory_name,
delete=False) as f:
f.write(new_contents)
os.rename(f.name, self.calc_path(file_name))
try:
_atomic_write()
except (OSError, IOError):
self._verify_signing_dir()
_atomic_write()
def read_file(self, file_name):
path = self.calc_path(file_name)
open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
with open(path, 'r', **open_kwargs) as f:
return f.read()
def calc_path(self, file_name):
return os.path.join(self._directory_name, file_name)
def _verify_signing_dir(self):
if os.path.isdir(self._directory_name):
if not os.access(self._directory_name, os.W_OK):
raise exc.ConfigurationError(
_('unable to access signing_dir %s') %
self._directory_name)
uid = os.getuid()
if os.stat(self._directory_name).st_uid != uid:
self._log.warning(_LW('signing_dir is not owned by %s'), uid)
current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
if current_mode != stat.S_IRWXU:
self._log.warning(
_LW('signing_dir mode is %(mode)s instead of %(need)s'),
{'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
else:
os.makedirs(self._directory_name, stat.S_IRWXU)

View File

@ -0,0 +1,138 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import stat
import uuid
import testtools
from keystonemiddleware.auth_token import _signing_dir
class SigningDirectoryTests(testtools.TestCase):
def test_directory_created_when_doesnt_exist(self):
# When _SigningDirectory is created, if the directory doesn't exist
# it's created with the expected permissions.
tmp_name = uuid.uuid4().hex
parent_directory = '/tmp/%s' % tmp_name
directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
# Directories are created by __init__.
_signing_dir.SigningDirectory(directory_name)
self.addCleanup(shutil.rmtree, parent_directory)
self.assertTrue(os.path.isdir(directory_name))
self.assertTrue(os.access(directory_name, os.W_OK))
self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
stat.S_IRWXU)
def test_use_directory_already_exists(self):
# The directory can already exist.
tmp_name = uuid.uuid4().hex
parent_directory = '/tmp/%s' % tmp_name
directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
os.makedirs(directory_name, stat.S_IRWXU)
self.addCleanup(shutil.rmtree, parent_directory)
_signing_dir.SigningDirectory(directory_name)
def test_write_file(self):
# write_file when the file doesn't exist creates the file.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
file_path = signing_directory.calc_path(file_name)
with open(file_path) as f:
actual_contents = f.read()
self.assertEqual(contents, actual_contents)
def test_replace_file(self):
# write_file when the file already exists overwrites it.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
orig_contents = self.getUniqueString()
signing_directory.write_file(file_name, orig_contents)
new_contents = self.getUniqueString()
signing_directory.write_file(file_name, new_contents)
file_path = signing_directory.calc_path(file_name)
with open(file_path) as f:
actual_contents = f.read()
self.assertEqual(new_contents, actual_contents)
def test_recreate_directory(self):
# If the original directory is lost, it gets recreated when a file
# is written.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
# Delete the directory.
shutil.rmtree(signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
actual_contents = signing_directory.read_file(file_name)
self.assertEqual(contents, actual_contents)
def test_read_file(self):
# Can read a file that was written.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
actual_contents = signing_directory.read_file(file_name)
self.assertEqual(contents, actual_contents)
def test_read_file_doesnt_exist(self):
# Show what happens when try to read a file that wasn't written.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
self.assertRaises(IOError, signing_directory.read_file, file_name)
def test_calc_path(self):
# calc_path returns the actual filename built from the directory name.
signing_directory = _signing_dir.SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
actual_path = signing_directory.calc_path(file_name)
expected_path = os.path.join(signing_directory._directory_name,
file_name)
self.assertEqual(expected_path, actual_path)

View File

@ -14,9 +14,7 @@
import datetime
import json
import os
import shutil
import stat
import uuid
import mock
@ -24,13 +22,14 @@ import testtools
from keystonemiddleware import auth_token
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _signing_dir
class RevocationsTests(testtools.TestCase):
def _check_with_list(self, revoked_list, token_ids):
directory_name = '/tmp/%s' % uuid.uuid4().hex
signing_directory = auth_token._SigningDirectory(directory_name)
signing_directory = _signing_dir.SigningDirectory(directory_name)
self.addCleanup(shutil.rmtree, directory_name)
identity_server = mock.Mock()
@ -64,121 +63,3 @@ class RevocationsTests(testtools.TestCase):
token_ids = [token_id]
self.assertRaises(exc.InvalidToken,
self._check_with_list, revoked_tokens, token_ids)
class SigningDirectoryTests(testtools.TestCase):
def test_directory_created_when_doesnt_exist(self):
# When _SigningDirectory is created, if the directory doesn't exist
# it's created with the expected permissions.
tmp_name = uuid.uuid4().hex
parent_directory = '/tmp/%s' % tmp_name
directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
# Directories are created by __init__.
auth_token._SigningDirectory(directory_name)
self.addCleanup(shutil.rmtree, parent_directory)
self.assertTrue(os.path.isdir(directory_name))
self.assertTrue(os.access(directory_name, os.W_OK))
self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
stat.S_IRWXU)
def test_use_directory_already_exists(self):
# The directory can already exist.
tmp_name = uuid.uuid4().hex
parent_directory = '/tmp/%s' % tmp_name
directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
os.makedirs(directory_name, stat.S_IRWXU)
self.addCleanup(shutil.rmtree, parent_directory)
auth_token._SigningDirectory(directory_name)
def test_write_file(self):
# write_file when the file doesn't exist creates the file.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
file_path = signing_directory.calc_path(file_name)
with open(file_path) as f:
actual_contents = f.read()
self.assertEqual(contents, actual_contents)
def test_replace_file(self):
# write_file when the file already exists overwrites it.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
orig_contents = self.getUniqueString()
signing_directory.write_file(file_name, orig_contents)
new_contents = self.getUniqueString()
signing_directory.write_file(file_name, new_contents)
file_path = signing_directory.calc_path(file_name)
with open(file_path) as f:
actual_contents = f.read()
self.assertEqual(new_contents, actual_contents)
def test_recreate_directory(self):
# If the original directory is lost, it gets recreated when a file
# is written.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
# Delete the directory.
shutil.rmtree(signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
actual_contents = signing_directory.read_file(file_name)
self.assertEqual(contents, actual_contents)
def test_read_file(self):
# Can read a file that was written.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
contents = self.getUniqueString()
signing_directory.write_file(file_name, contents)
actual_contents = signing_directory.read_file(file_name)
self.assertEqual(contents, actual_contents)
def test_read_file_doesnt_exist(self):
# Show what happens when try to read a file that wasn't written.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
self.assertRaises(IOError, signing_directory.read_file, file_name)
def test_calc_path(self):
# calc_path returns the actual filename built from the directory name.
signing_directory = auth_token._SigningDirectory()
self.addCleanup(shutil.rmtree, signing_directory._directory_name)
file_name = self.getUniqueString()
actual_path = signing_directory.calc_path(file_name)
expected_path = os.path.join(signing_directory._directory_name,
file_name)
self.assertEqual(expected_path, actual_path)