Add sign-the-data signature verification

Per discussion on the mailing list [1] and the related nova
specification [2] it has been decided that the signature should be of
the image data directly, rather than of the glance MD5 "checksum" hash
of the image data.

This patch adds the ability to verify a signature of the image data
directly, using a verifier that is passed to the glance_store backend.
It is dependent on a glance_store patch which updates this verifier
object with the image data as it is creating the checksum (see
Depends-On below).

[1] http://bit.ly/1Q0M0C7
[2] https://review.openstack.org/#/c/188874/19

Depends-On: I43799e6a4a6643a23769af8d839a2beb4e0ff9bf
Partial-Bug: #1516031
Change-Id: If0c06b3094cecef6c8ca8a65753038b6b5a9d8fe
This commit is contained in:
Brianna Poulos 2015-12-08 17:24:11 -05:00 committed by Dane Fichter
parent 89a66916f8
commit 95ca43f30f
5 changed files with 316 additions and 10 deletions

View File

@ -32,7 +32,7 @@ from oslo_utils import encodeutils
import six
from glance.common import exception
from glance.i18n import _LE
from glance.i18n import _, _LE
LOG = logging.getLogger(__name__)
@ -71,6 +71,13 @@ MASK_GEN_ALGORITHMS = {
}
# Required image property names
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
'img_signature',
'img_signature_hash_method',
'img_signature_key_type',
'img_signature_certificate_uuid'
)
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
'signature',
@ -136,6 +143,70 @@ KEY_TYPE_METHODS = {
}
def should_create_verifier(image_properties):
"""Determine whether a verifier should be created.
Using the image properties, determine whether existing properties indicate
that signature verification should be done.
:param image_properties: the key-value properties about the image
:return: True, if signature metadata properties exist, False otherwise
"""
return (image_properties is not None and
CERT_UUID in image_properties and
HASH_METHOD in image_properties and
SIGNATURE in image_properties and
KEY_TYPE in image_properties)
def get_verifier(context, image_properties):
"""Retrieve the image properties and use them to create a verifier.
:param context: the user context for authentication
:param image_properties: the key-value properties about the image
:return: instance of cryptography AsymmetricVerificationContext
:raises glance.common.exception.SignatureVerificationError: if building
the verifier fails
"""
if not should_create_verifier(image_properties):
raise exception.SignatureVerificationError(
_('Required image properties for signature verification do not'
' exist. Cannot verify signature.')
)
signature = get_signature(image_properties[SIGNATURE])
hash_method = get_hash_method(image_properties[HASH_METHOD])
signature_key_type = get_signature_key_type(
image_properties[KEY_TYPE])
public_key = get_public_key(context,
image_properties[CERT_UUID],
signature_key_type)
# create the verifier based on the signature key type
try:
verifier = KEY_TYPE_METHODS[signature_key_type](signature,
hash_method,
public_key,
image_properties)
except crypto_exception.UnsupportedAlgorithm as e:
msg = (_LE("Unable to create verifier since algorithm is "
"unsupported: %(e)s")
% {'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
_('Unable to verify signature since the algorithm is unsupported '
'on this system')
)
if verifier:
return verifier
else:
# Error creating the verifier
raise exception.SignatureVerificationError(
_('Error occurred while creating the verifier')
)
@debtcollector.removals.remove(message="This will be removed in the N cycle.")
def should_verify_signature(image_properties):
"""Determine whether a signature should be verified.

View File

@ -16,6 +16,7 @@
import collections
import copy
from cryptography import exceptions as crypto_exception
import debtcollector
import glance_store as store
from oslo_config import cfg
@ -391,16 +392,40 @@ class ImageProxy(glance.domain.proxy.Image):
def set_data(self, data, size=None):
if size is None:
size = 0 # NOTE(markwash): zero -> unknown size
# Create the verifier for signature verification (if correct properties
# are present)
if (signature_utils.should_create_verifier(
self.image.extra_properties)):
# NOTE(bpoulos): if creating verifier fails, exception will be
# raised
verifier = signature_utils.get_verifier(
self.context, self.image.extra_properties)
else:
verifier = None
location, size, checksum, loc_meta = self.store_api.add_to_backend(
CONF,
self.image.image_id,
utils.LimitingReader(utils.CooperativeReader(data),
CONF.image_size_cap),
size,
context=self.context)
context=self.context,
verifier=verifier)
self._verify_signature_if_needed(checksum)
# NOTE(bpoulos): if verification fails, exception will be raised
if verifier:
try:
verifier.verify()
LOG.info(_LI("Successfully verified signature for image %s"),
self.image.image_id)
except crypto_exception.InvalidSignature:
raise exception.SignatureVerificationError(
_('Signature verification failed')
)
self.image.locations = [{'url': location, 'metadata': loc_meta,
'status': 'active'}]
self.image.size = size

View File

@ -33,6 +33,14 @@ TEST_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3,
key_size=1024,
backend=default_backend())
# Required image property names
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
signature_utils.SIGNATURE,
signature_utils.HASH_METHOD,
signature_utils.KEY_TYPE,
signature_utils.CERT_UUID
)
# Required image property names
# TODO(bpoulos): remove when 'sign-the-hash' approach is no longer supported
(OLD_SIGNATURE, OLD_HASH_METHOD, OLD_KEY_TYPE, OLD_CERT_UUID) = (
@ -330,6 +338,130 @@ class TestSignatureUtils(test_utils.BaseTestCase):
signature_utils.verify_signature,
None, checksum_hash, image_properties)
def test_should_create_verifier(self):
image_props = {CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'}
self.assertTrue(signature_utils.should_create_verifier(image_props))
def test_should_create_verifier_fail(self):
bad_image_properties = [{CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE'},
{CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
KEY_TYPE: 'SIG_KEY_TYPE'},
{CERT_UUID: 'CERT_UUID',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'},
{HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'}]
for bad_props in bad_image_properties:
result = signature_utils.should_create_verifier(bad_props)
self.assertFalse(result)
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
"SHA-2 hash algorithms not supported by backend")
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
image_props = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: hash_name,
KEY_TYPE: 'RSA-PSS',
SIGNATURE: signature}
verifier = signature_utils.get_verifier(None, image_props)
verifier.update(data)
verifier.verify()
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
"SHA-2 hash algorithms not supported by backend")
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_bad_signature(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
verifier = signature_utils.get_verifier(None, image_properties)
verifier.update(data)
self.assertRaises(crypto_exception.InvalidSignature,
verifier.verify)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_unsupported_algorithm(self,
mock_get_pub_key):
public_key = TEST_PRIVATE_KEY.public_key()
public_key.verifier = mock.MagicMock(
side_effect=crypto_exception.UnsupportedAlgorithm(
"When OpenSSL is older than 1.0.1 then only SHA1 is "
"supported with MGF1.",
crypto_exception._Reasons.UNSUPPORTED_HASH))
mock_get_pub_key.return_value = public_key
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Unable to verify signature since the '
'algorithm is unsupported on this system',
signature_utils.get_verifier,
None, image_properties)
@mock.patch('glance.common.signature_utils.should_create_verifier')
def test_verify_signature_invalid_image_props(self, mock_should):
mock_should.return_value = False
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Required image properties for signature'
' verification do not exist. Cannot verify'
' signature.',
signature_utils.get_verifier,
None, None)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key):
mock_get_pub_key.return_value = TEST_PRIVATE_KEY.public_key()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'BLAH',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.get_verifier,
None, image_properties)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_get_verifier_none(self, mock_get_pub_key):
mock_get_pub_key.return_value = BadPublicKey()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Error occurred while creating'
' the verifier',
signature_utils.get_verifier,
None, image_properties)
def test_get_signature(self):
signature = b'A' * 256
data = base64.b64encode(signature)

View File

@ -15,6 +15,8 @@
import glance_store
import mock
from debtcollector import removals
from glance.common import exception
from glance.common import signature_utils
import glance.location
@ -188,7 +190,8 @@ class TestStoreImage(utils.BaseTestCase):
self.store_api.get_from_backend,
image.locations[0]['url'], context={})
def test_image_set_data_valid_signature(self):
@removals.remove(message="This will be removed in the N cycle.")
def test_old_image_set_data_valid_signature(self):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'signature_certificate_uuid': 'UUID',
@ -199,7 +202,7 @@ class TestStoreImage(utils.BaseTestCase):
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'verify_signature',
unit_test_utils.fake_verify_signature)
unit_test_utils.fake_old_verify_signature)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
image.set_data('YYYY', 4)
@ -207,7 +210,8 @@ class TestStoreImage(utils.BaseTestCase):
self.assertEqual('Z', image.checksum)
self.assertEqual('active', image.status)
def test_image_set_data_invalid_signature(self):
@removals.remove(message="This will be removed in the N cycle.")
def test_old_image_set_data_invalid_signature(self):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'signature_certificate_uuid': 'UUID',
@ -218,14 +222,15 @@ class TestStoreImage(utils.BaseTestCase):
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'verify_signature',
unit_test_utils.fake_verify_signature)
unit_test_utils.fake_old_verify_signature)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
self.assertRaises(exception.SignatureVerificationError,
image.set_data,
'YYYY', 4)
def test_image_set_data_invalid_signature_missing_metadata(self):
@removals.remove(message="This will be removed in the N cycle.")
def test_old_image_set_data_invalid_signature_missing_metadata(self):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'signature_hash_method': 'METHOD',
@ -235,7 +240,65 @@ class TestStoreImage(utils.BaseTestCase):
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'verify_signature',
unit_test_utils.fake_verify_signature)
unit_test_utils.fake_old_verify_signature)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
image.set_data('YYYY', 4)
self.assertEqual(UUID2, image.locations[0]['url'])
self.assertEqual('Z', image.checksum)
# Image is still active, since invalid signature was ignored
self.assertEqual('active', image.status)
@mock.patch('glance.location.LOG')
def test_image_set_data_valid_signature(self, mock_log):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'img_signature_certificate_uuid': 'UUID',
'img_signature_hash_method': 'METHOD',
'img_signature_key_type': 'TYPE',
'img_signature': 'VALID'
}
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'get_verifier',
unit_test_utils.fake_get_verifier)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
image.set_data('YYYY', 4)
self.assertEqual('active', image.status)
mock_log.info.assert_called_once_with(
u'Successfully verified signature for image %s',
UUID2)
def test_image_set_data_invalid_signature(self):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'img_signature_certificate_uuid': 'UUID',
'img_signature_hash_method': 'METHOD',
'img_signature_key_type': 'TYPE',
'img_signature': 'INVALID'
}
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'get_verifier',
unit_test_utils.fake_get_verifier)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
self.assertRaises(exception.SignatureVerificationError,
image.set_data,
'YYYY', 4)
def test_image_set_data_invalid_signature_missing_metadata(self):
context = glance.context.RequestContext(user=USER1)
extra_properties = {
'img_signature_hash_method': 'METHOD',
'img_signature_key_type': 'TYPE',
'img_signature': 'INVALID'
}
image_stub = ImageStub(UUID2, status='queued',
extra_properties=extra_properties)
self.stubs.Set(signature_utils, 'get_verifier',
unit_test_utils.fake_get_verifier)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
image.set_data('YYYY', 4)

View File

@ -14,7 +14,10 @@
# under the License.
from cryptography import exceptions as crypto_exception
from debtcollector import removals
import glance_store as store
import mock
from oslo_config import cfg
from oslo_log import log as logging
from six.moves import urllib
@ -85,7 +88,8 @@ def fake_get_size_from_backend(uri, context=None):
return 1
def fake_verify_signature(context, checksum_hash, image_properties):
@removals.remove(message="This will be removed in the N cycle.")
def fake_old_verify_signature(context, checksum_hash, image_properties):
if (image_properties is not None and 'signature' in image_properties and
image_properties['signature'] == 'VALID'):
return True
@ -94,6 +98,17 @@ def fake_verify_signature(context, checksum_hash, image_properties):
'Signature verification failed.')
def fake_get_verifier(context, image_properties):
verifier = mock.Mock()
if (image_properties is not None and 'img_signature' in image_properties
and image_properties['img_signature'] == 'VALID'):
verifier.verify.return_value = None
else:
ex = crypto_exception.InvalidSignature()
verifier.verify.side_effect = ex
return verifier
class FakeDB(object):
def __init__(self, initialize=True):
@ -197,7 +212,7 @@ class FakeStoreAPI(object):
return self.get_from_backend(location, context=context)[1]
def add_to_backend(self, conf, image_id, data, size,
scheme=None, context=None):
scheme=None, context=None, verifier=None):
store_max_size = 7
current_store_size = 2
for location in self.data.keys():