Implement certificate_utils

Implement the certificate_utils module. The module's verify_certificate
method can be applied to the creation or rebuild of an instance. It is
triggered by one of two ways:
    1) The enable_certificate_validation configuration option is set to
       True in Nova's glance configuration (alongside the
       verify_glance_signatures option also set to True)
    2) A list of trusted certificate IDs is provided

Change-Id: I0ae2dbf66241207a425bf7d0fc02a4d2e2dea409
Implements: blueprint nova-validate-certificates
This commit is contained in:
Brianna Poulos
2018-06-06 16:43:20 -04:00
committed by Matt Riedemann
parent 2d3b204d38
commit 595ea73c62
5 changed files with 283 additions and 37 deletions

View File

@@ -2292,3 +2292,8 @@ class OptRequiredIfOtherOptValue(NovaException):
class AllocationCreateFailed(NovaException):
msg_fmt = _('Failed to create allocations for instance %(instance)s '
'against resource provider %(provider)s.')
class CertificateValidationFailed(NovaException):
msg_fmt = _("Image signature certificate validation failed for "
"certificate: %(cert_uuid)s. %(reason)s")

View File

@@ -152,7 +152,8 @@ class API(object):
session, image_id = self._get_session_and_image_id(context, id_or_uri)
return session.delete(context, image_id)
def download(self, context, id_or_uri, data=None, dest_path=None):
def download(self, context, id_or_uri, data=None, dest_path=None,
trusted_certs=None):
"""Transfer image bits from Glance or a known source location to the
supplied destination filepath.
@@ -162,6 +163,9 @@ class API(object):
information for.
:param data: A file object to use in downloading image data.
:param dest_path: Filepath to transfer image bits to.
:param trusted_certs: A 'nova.objects.trusted_certs.TrustedCerts'
object with a list of trusted image certificate
IDs.
Note that because of the poor design of the
`glance.ImageService.download` method, the function returns different
@@ -191,4 +195,5 @@ class API(object):
# handle streaming/copying/zero-copy as they see fit.
session, image_id = self._get_session_and_image_id(context, id_or_uri)
return session.download(context, image_id, data=data,
dst_path=dest_path)
dst_path=dest_path,
trusted_certs=trusted_certs)

View File

@@ -28,6 +28,7 @@ import sys
import time
import cryptography
from cursive import certificate_utils
from cursive import exception as cursive_exception
from cursive import signature_utils
import glanceclient
@@ -300,7 +301,8 @@ class GlanceImageServiceV2(object):
if not any(check(mode) for check in (stat.S_ISFIFO, stat.S_ISSOCK)):
os.fsync(fileno)
def download(self, context, image_id, data=None, dst_path=None):
def download(self, context, image_id, data=None, dst_path=None,
trusted_certs=None):
"""Calls out to Glance for data and writes data."""
if CONF.glance.allowed_direct_url_schemes and dst_path is not None:
image = self.show(context, image_id, include_locations=True)
@@ -329,33 +331,7 @@ class GlanceImageServiceV2(object):
reason='Image has no associated data')
# Retrieve properties for verification of Glance image signature
verifier = None
if CONF.glance.verify_glance_signatures:
image_meta_dict = self.show(context, image_id,
include_locations=False)
image_meta = objects.ImageMeta.from_dict(image_meta_dict)
img_signature = image_meta.properties.get('img_signature')
img_sig_hash_method = image_meta.properties.get(
'img_signature_hash_method'
)
img_sig_cert_uuid = image_meta.properties.get(
'img_signature_certificate_uuid'
)
img_sig_key_type = image_meta.properties.get(
'img_signature_key_type'
)
try:
verifier = signature_utils.get_verifier(
context=context,
img_signature_certificate_uuid=img_sig_cert_uuid,
img_signature_hash_method=img_sig_hash_method,
img_signature=img_signature,
img_signature_key_type=img_sig_key_type,
)
except cursive_exception.SignatureVerificationError:
with excutils.save_and_reraise_exception():
LOG.error('Image signature verification failed '
'for image: %s', image_id)
verifier = self._get_verifier(context, image_id, trusted_certs)
close_file = False
if data is None and dst_path:
@@ -408,6 +384,66 @@ class GlanceImageServiceV2(object):
self._safe_fsync(data)
data.close()
def _get_verifier(self, context, image_id, trusted_certs):
verifier = None
# Use the default certs if the user didn't provide any (and there are
# default certs configured).
if (not trusted_certs and CONF.glance.enable_certificate_validation and
CONF.glance.default_trusted_certificate_ids):
trusted_certs = objects.TrustedCerts(
ids=CONF.glance.default_trusted_certificate_ids)
# Verify image signature if feature is enabled or trusted
# certificates were provided
if trusted_certs or CONF.glance.verify_glance_signatures:
image_meta_dict = self.show(context, image_id,
include_locations=False)
image_meta = objects.ImageMeta.from_dict(image_meta_dict)
img_signature = image_meta.properties.get('img_signature')
img_sig_hash_method = image_meta.properties.get(
'img_signature_hash_method'
)
img_sig_cert_uuid = image_meta.properties.get(
'img_signature_certificate_uuid'
)
img_sig_key_type = image_meta.properties.get(
'img_signature_key_type'
)
try:
verifier = signature_utils.get_verifier(
context=context,
img_signature_certificate_uuid=img_sig_cert_uuid,
img_signature_hash_method=img_sig_hash_method,
img_signature=img_signature,
img_signature_key_type=img_sig_key_type,
)
except cursive_exception.SignatureVerificationError:
with excutils.save_and_reraise_exception():
LOG.error('Image signature verification failed '
'for image: %s', image_id)
# Validate image signature certificate if trusted certificates
# were provided
# NOTE(jackie-truong): Certificate validation will occur if
# trusted_certs are provided, even if the certificate validation
# feature is disabled. This is to provide safety for the user.
# We may want to consider making this a "soft" check in the future.
if trusted_certs:
_verify_certs(context, img_sig_cert_uuid, trusted_certs)
elif CONF.glance.enable_certificate_validation:
msg = ('Image signature certificate validation enabled, '
'but no trusted certificate IDs were provided. '
'Unable to validate the certificate used to '
'verify the image signature.')
LOG.warning(msg)
raise exception.CertificateValidationFailed(msg)
else:
LOG.debug('Certificate validation was not performed. A list '
'of trusted image certificate IDs must be provided '
'in order to validate an image certificate.')
return verifier
def create(self, context, image_meta, data=None):
"""Store the image data and return the new image object."""
# Here we workaround the situation when user wants to activate an
@@ -887,6 +923,23 @@ def _translate_plain_exception(exc_value):
return exc_value
def _verify_certs(context, img_sig_cert_uuid, trusted_certs):
try:
certificate_utils.verify_certificate(
context=context,
certificate_uuid=img_sig_cert_uuid,
trusted_certificate_uuids=trusted_certs.ids)
LOG.debug('Image signature certificate validation '
'succeeded for certificate: %s',
img_sig_cert_uuid)
except cursive_exception.SignatureVerificationError as e:
LOG.warning('Image signature certificate validation '
'failed for certificate: %s',
img_sig_cert_uuid)
raise exception.CertificateValidationFailed(
cert_uuid=img_sig_cert_uuid, reason=six.text_type(e))
def get_remote_image_service(context, image_href):
"""Create an image_service and parse the id from the given image_href.

View File

@@ -33,6 +33,7 @@ import nova.conf
from nova import context
from nova import exception
from nova.image import glance
from nova import objects
from nova import service_auth
from nova import test
from nova.tests import uuidsentinel as uuids
@@ -831,7 +832,8 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
service = glance.GlanceImageServiceV2(self.client)
mock_get_verifier.return_value = self.MockVerifier()
mock_show.return_value = self.fake_img_props
res = service.download(context=None, image_id=None,
image_id = None
res = service.download(context=None, image_id=image_id,
data=None, dst_path=None)
self.assertEqual(self.fake_img_data, res)
mock_get_verifier.assert_called_once_with(
@@ -841,7 +843,14 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_log.info.assert_called_once_with(mock.ANY, mock.ANY)
# trusted_certs is None and enable_certificate_validation is
# false, which causes the below debug message to occur
msg = ('Certificate validation was not performed. A list of '
'trusted image certificate IDs must be provided in '
'order to validate an image certificate.')
mock_log.debug.assert_called_once_with(msg)
msg = ('Image signature verification succeeded for image: %s')
mock_log.info.assert_called_once_with(msg, image_id)
@mock.patch.object(six.moves.builtins, 'open')
@mock.patch('nova.image.glance.LOG')
@@ -869,7 +878,12 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_log.info.assert_called_once_with(mock.ANY, mock.ANY)
msg = ('Certificate validation was not performed. A list of '
'trusted image certificate IDs must be provided in '
'order to validate an image certificate.')
mock_log.debug.assert_called_once_with(msg)
msg = ('Image signature verification succeeded for image %s')
mock_log.info.assert_called_once_with(msg, None)
self.assertEqual(len(self.fake_img_data), mock_dest.write.call_count)
self.assertTrue(mock_dest.close.called)
mock_fsync.assert_called_once_with(mock_dest)
@@ -949,6 +963,174 @@ class TestDownloadSignatureVerification(test.NoDBTestCase):
self.assertTrue(mock_dest.close.called)
class TestDownloadCertificateValidation(test.NoDBTestCase):
"""Tests the download method of the GlanceImageServiceV2 when
certificate validation is enabled.
"""
def setUp(self):
super(TestDownloadCertificateValidation, self).setUp()
self.flags(enable_certificate_validation=True, group='glance')
self.fake_img_props = {
'properties': {
'img_signature': 'signature',
'img_signature_hash_method': 'SHA-224',
'img_signature_certificate_uuid': uuids.img_sig_cert_uuid,
'img_signature_key_type': 'RSA-PSS',
}
}
self.fake_img_data = ['A' * 256, 'B' * 256]
self.client = mock.MagicMock()
self.client.call.return_value = fake_glance_response(
self.fake_img_data)
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('cursive.certificate_utils.verify_certificate')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_certificate_validation_v2(self,
mock_get_verifier,
mock_verify_certificate,
mock_show,
mock_log):
service = glance.GlanceImageServiceV2(self.client)
mock_show.return_value = self.fake_img_props
fake_cert = uuids.img_sig_cert_uuid
fake_trusted_certs = objects.TrustedCerts(ids=[fake_cert])
res = service.download(context=None, image_id=None,
data=None, dst_path=None,
trusted_certs=fake_trusted_certs)
self.assertEqual(self.fake_img_data, res)
mock_get_verifier.assert_called_once_with(
context=None,
img_signature_certificate_uuid=uuids.img_sig_cert_uuid,
img_signature_hash_method='SHA-224',
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_verify_certificate.assert_called_once_with(
context=None,
certificate_uuid=uuids.img_sig_cert_uuid,
trusted_certificate_uuids=[fake_cert]
)
msg = ('Image signature certificate validation succeeded '
'for certificate: %s')
mock_log.debug.assert_called_once_with(msg, uuids.img_sig_cert_uuid)
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('cursive.certificate_utils.verify_certificate')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_trusted_certs_and_disabled_cert_validation_v2(
self,
mock_get_verifier,
mock_verify_certificate,
mock_show,
mock_log):
self.flags(enable_certificate_validation=False, group='glance')
service = glance.GlanceImageServiceV2(self.client)
mock_show.return_value = self.fake_img_props
fake_cert = uuids.img_sig_cert_uuid
fake_trusted_certs = objects.TrustedCerts(ids=[fake_cert])
res = service.download(context=None, image_id=None,
data=None, dst_path=None,
trusted_certs=fake_trusted_certs)
self.assertEqual(self.fake_img_data, res)
mock_get_verifier.assert_called_once_with(
context=None,
img_signature_certificate_uuid=uuids.img_sig_cert_uuid,
img_signature_hash_method='SHA-224',
img_signature='signature',
img_signature_key_type='RSA-PSS'
)
mock_verify_certificate.assert_called_once_with(
context=None,
certificate_uuid=uuids.img_sig_cert_uuid,
trusted_certificate_uuids=[fake_cert]
)
msg = ('Image signature certificate validation succeeded '
'for certificate: %s')
mock_log.debug.assert_called_once_with(msg, uuids.img_sig_cert_uuid)
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('cursive.certificate_utils.verify_certificate')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_with_certificate_validation_failure_v2(
self,
mock_get_verifier,
mock_verify_certificate,
mock_show,
mock_log):
service = glance.GlanceImageServiceV2(self.client)
mock_verify_certificate.side_effect = \
cursive_exception.SignatureVerificationError(
reason='Invalid certificate.'
)
mock_show.return_value = self.fake_img_props
bad_trusted_certs = objects.TrustedCerts(ids=['bad_cert_id',
'other_bad_cert_id'])
self.assertRaises(exception.CertificateValidationFailed,
service.download,
context=None, image_id=None,
data=None, dst_path=None,
trusted_certs=bad_trusted_certs)
msg = ('Image signature certificate validation failed for '
'certificate: %s')
mock_log.warning.assert_called_once_with(msg,
uuids.img_sig_cert_uuid)
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('cursive.signature_utils.get_verifier')
def test_download_without_trusted_certs_failure_v2(self,
mock_get_verifier,
mock_show,
mock_log):
# Signature verification needs to be enabled in order to reach the
# checkpoint for trusted_certs. Otherwise, all image signature
# validation will be skipped.
self.flags(verify_glance_signatures=True, group='glance')
service = glance.GlanceImageServiceV2(self.client)
mock_show.return_value = self.fake_img_props
self.assertRaises(exception.CertificateValidationFailed,
service.download,
context=None, image_id=None,
data=None, dst_path=None)
msg = ('Image signature certificate validation enabled, but no '
'trusted certificate IDs were provided. Unable to '
'validate the certificate used to verify the image '
'signature.')
mock_log.warning.assert_called_once_with(msg)
@mock.patch('nova.image.glance.LOG')
@mock.patch('nova.image.glance.GlanceImageServiceV2.show')
@mock.patch('cursive.signature_utils.get_verifier')
@mock.patch('cursive.certificate_utils.verify_certificate')
def test_get_verifier_without_trusted_certs_use_default_certs(
self, mock_verify_certificate, mock_get_verifier, mock_show,
mock_log):
"""Tests the scenario that trusted_certs is not provided, but
signature and cert verification are enabled, and there are default
certs to use.
"""
self.flags(verify_glance_signatures=True, group='glance')
self.flags(default_trusted_certificate_ids=[uuids.img_sig_cert_uuid],
group='glance')
service = glance.GlanceImageServiceV2(self.client)
mock_show.return_value = self.fake_img_props
service._get_verifier(
mock.sentinel.context, mock.sentinel.image_id, trusted_certs=None)
mock_verify_certificate.assert_called_once_with(
context=mock.sentinel.context,
certificate_uuid=uuids.img_sig_cert_uuid,
trusted_certificate_uuids=[uuids.img_sig_cert_uuid]
)
msg = ('Image signature certificate validation succeeded '
'for certificate: %s')
mock_log.debug.assert_called_once_with(msg, uuids.img_sig_cert_uuid)
class TestIsImageAvailable(test.NoDBTestCase):
"""Tests the internal _is_image_available function."""

View File

@@ -131,18 +131,19 @@ def _convert_image(source, dest, in_format, out_format, run_as_root):
raise exception.ImageUnacceptable(image_id=source, reason=msg)
def fetch(context, image_href, path):
def fetch(context, image_href, path, trusted_certs=None):
with fileutils.remove_path_on_error(path):
IMAGE_API.download(context, image_href, dest_path=path)
IMAGE_API.download(context, image_href, dest_path=path,
trusted_certs=trusted_certs)
def get_info(context, image_href):
return IMAGE_API.get(context, image_href)
def fetch_to_raw(context, image_href, path):
def fetch_to_raw(context, image_href, path, trusted_certs=None):
path_tmp = "%s.part" % path
fetch(context, image_href, path_tmp)
fetch(context, image_href, path_tmp, trusted_certs)
with fileutils.remove_path_on_error(path_tmp):
data = qemu_img_info(path_tmp)