Check file existence in local cert manager

Intermediates and private key passphrase are optional in CertManager,
But local CertManager tries to get or delete these anyway.
This patch checks file existence when getting or deleting files.

Change-Id: I5f883e03fff9cf4cb3466698d63ae8ba337f9f2d
Closes-bug: #1494645
This commit is contained in:
OTSUKA, Yuanying 2015-09-11 17:38:34 +09:00
parent 22b83b0949
commit bf04ac76f0
2 changed files with 96 additions and 30 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import os
from os import path
import uuid
from oslo_config import cfg
@ -162,15 +163,17 @@ class CertManager(cert_manager.CertManager):
)
try:
with open(filename_intermediates, 'r') as int_file:
cert_data['intermediates'] = int_file.read()
if path.isfile(filename_intermediates):
with open(filename_intermediates, 'r') as int_file:
cert_data['intermediates'] = int_file.read()
except IOError as ioe:
LOG.error(_LE("Failed to read certificate."))
raise exception.CertificateStorageException(msg=ioe.message)
try:
with open(filename_pkp, 'r') as pass_file:
cert_data['private_key_passphrase'] = pass_file.read()
if path.isfile(filename_pkp):
with open(filename_pkp, 'r') as pass_file:
cert_data['private_key_passphrase'] = pass_file.read()
except IOError as ioe:
LOG.error(_LE("Failed to read certificate."))
raise exception.CertificateStorageException(msg=ioe.message)
@ -200,8 +203,10 @@ class CertManager(cert_manager.CertManager):
try:
os.remove(filename_certificate)
os.remove(filename_private_key)
os.remove(filename_intermediates)
os.remove(filename_pkp)
if path.isfile(filename_intermediates):
os.remove(filename_intermediates)
if path.isfile(filename_pkp):
os.remove(filename_pkp)
except IOError as ioe:
LOG.error(_LE(
"Failed to delete certificate {0}."

View File

@ -58,11 +58,51 @@ class TestLocalManager(base.BaseTestCase):
self.private_key = "My Private Key"
self.private_key_passphrase = "My Private Key Passphrase"
def _mock_isfile(path):
_, ext = os.path.splitext(path)
if self.intermediates == None and ext == '.int':
return False
if self.private_key_passphrase == None and ext == '.pass':
return False
return True
isfile_patcher = mock.patch('os.path.isfile')
self.mock_isfile = isfile_patcher.start()
self.addCleanup(isfile_patcher.stop)
self.mock_isfile.side_effect = _mock_isfile
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="certificates", storage_path="/tmp/")
super(TestLocalManager, self).setUp()
def _open_calls(self, cert_id, mode='w'):
open_calls = []
unexpected_calls = []
for ext in ['crt', 'key', 'int', 'pass']:
args = [os.path.join('/tmp/{0}.{1}'.format(cert_id, ext))]
if mode:
args.append(mode)
call = mock.call(*args)
if ext == 'int' and not self.intermediates:
unexpected_calls.append(call)
elif ext == 'pass' and not self.private_key_passphrase:
unexpected_calls.append(call)
else:
open_calls.append(call)
return open_calls, unexpected_calls
def _write_calls(self):
write_calls = [
mock.call(self.certificate),
mock.call(self.private_key),
]
if self.intermediates:
write_calls.append(mock.call(self.intermediates))
if self.private_key_passphrase:
write_calls.append(mock.call(self.private_key_passphrase))
return write_calls
def _store_cert(self):
file_mock = mock.mock_open()
# Attempt to store the cert
@ -78,20 +118,13 @@ class TestLocalManager(base.BaseTestCase):
self.assertIsNotNone(cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'w')
], any_order=True)
open_calls, unexpected_calls = self._open_calls(cert_id)
file_mock.assert_has_calls(open_calls, any_order=True)
for unexpected_call in unexpected_calls:
self.assertNotIn(unexpected_call, file_mock.mock_calls)
# Verify the writes were made
file_mock().write.assert_has_calls([
mock.call(self.certificate),
mock.call(self.intermediates),
mock.call(self.private_key),
mock.call(self.private_key_passphrase)
], any_order=True)
file_mock().write.assert_has_calls(self._write_calls(), any_order=True)
return cert_id
@ -102,12 +135,10 @@ class TestLocalManager(base.BaseTestCase):
data = local_cert_manager.CertManager.get_cert(cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'r')
], any_order=True)
open_calls, unexpected_calls = self._open_calls(cert_id, 'r')
file_mock.assert_has_calls(open_calls, any_order=True)
for unexpected_call in unexpected_calls:
self.assertNotIn(unexpected_call, file_mock.mock_calls)
# The returned data should be a Cert object
self.assertIsInstance(data, cert_manager.Cert)
@ -136,13 +167,11 @@ class TestLocalManager(base.BaseTestCase):
with mock.patch('os.remove', remove_mock):
local_cert_manager.CertManager.delete_cert(cert_id)
open_calls, unexpected_calls = self._open_calls(cert_id, mode=None)
# Verify the correct files were removed
remove_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)))
], any_order=True)
remove_mock.assert_has_calls(open_calls, any_order=True)
for unexpected_call in unexpected_calls:
self.assertNotIn(unexpected_call, remove_mock.mock_calls)
def _delete_cert_with_fail(self, cert_id):
remove_mock = mock.Mock()
@ -202,6 +231,22 @@ class TestLocalManager(base.BaseTestCase):
self._get_cert_with_fail(cert_id, failed='pass')
def test_get_cert_without_intermediate(self):
self.intermediates = None
# Store a cert
cert_id = self._store_cert()
# Get the cert
self._get_cert(cert_id)
def test_get_cert_without_pkp(self):
self.private_key_passphrase = None
# Store a cert
cert_id = self._store_cert()
# Get the cert
self._get_cert(cert_id)
def test_delete_cert(self):
# Store a cert
cert_id = self._store_cert()
@ -221,3 +266,19 @@ class TestLocalManager(base.BaseTestCase):
# Delete the cert with fail
self._delete_cert_with_fail(cert_id)
def test_delete_cert_without_intermediate(self):
self.intermediates = None
# Store a cert
cert_id = self._store_cert()
# Delete the cert with fail
self._delete_cert_with_fail(cert_id)
def test_delete_cert_without_pkp(self):
self.private_key_passphrase = None
# Store a cert
cert_id = self._store_cert()
# Delete the cert with fail
self._delete_cert_with_fail(cert_id)