barbican/barbican/tests/plugin/test_castellan_secret_store.py
Douglas Mendizábal b9daa100d0 Fix Castellan Secret Store inconsistent encoding
This patch fixes the Castellan secret store use of SecretDTO objects,
which require that the "secret" member be base64 encoded. [1]

Prior to this fix all secrets that were generated were stored in
plaintext, but secrets coming in through the API were base64 encoded
before being stored in the backend.

On secret retreival the Castellan plugin wrongly assumed everything in
the backend was encoded, so attempts to retrieve generated keys failed.

This patch fixes this inconsistency by always storing data un-encoded in
the backend.

A helper method was added to sort out the inconsistent data stored prior
to this fix.

A "version" property was added to the Castellan plugin metadata that is
stored in barbican to help differentiate secrets stored prior to this
fix vs secrets stored after this fix.

Story: 2008335
Task: 41236

[1]
https://opendev.org/openstack/barbican/src/tag/12.0.0/barbican/plugin/interface/secret_store.py#L356

Change-Id: I46fe77a471bf7927a24ca4d64dfccb385cd6402e
2021-09-15 08:42:25 -05:00

246 lines
8.5 KiB
Python

# Copyright (c) 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
from unittest import mock
from castellan.common import exception
from castellan.common.objects import opaque_data
import barbican.plugin.castellan_secret_store as css
import barbican.plugin.interface.secret_store as ss
import barbican.plugin.vault_secret_store as vss
from barbican.tests import utils
key_ref1 = 'aff825be-6ede-4b1d-aeb0-aaec8e62aec6'
key_ref2 = '9c94c9c7-16ea-43e8-8ebe-0de282c0e6d5'
mock_key = b'\xae9Eso\xd4\x98\x04>\xc3\x05n\x0f\x03\x96\xa3' + \
b'\xc3Z;\x9c\x11&oYY\x00\x13\xae\xf4>\x83\x82'
class WhenTestingVaultSecretStore(utils.BaseTestCase):
def setUp(self):
super(WhenTestingVaultSecretStore, self).setUp()
self.key_manager_mock = mock.MagicMock(name="key manager mock")
self.key_manager_mock.create_key_pair.return_value = (
key_ref1, key_ref2
)
self.key_manager_mock.create_key.return_value = key_ref1
self.key_manager_mock.store.return_value = key_ref1
secret_object = opaque_data.OpaqueData(mock_key)
self.key_manager_mock.get.return_value = secret_object
self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.vault_plugin = mock.MagicMock(
use_ssl=False,
root_token_id='12345'
)
self.plugin = vss.VaultSecretStore(self.cfg_mock)
self.plugin.key_manager = self.key_manager_mock
self.plugin_name = "VaultSecretStore"
def test_meta_dict(self):
key_id = 'SOME_KEY_UUID'
meta = self.plugin._meta_dict(key_id)
self.assertNotIn(css.CastellanSecretStore.BIT_LENGTH, meta)
self.assertNotIn(css.CastellanSecretStore.ALG, meta)
self.assertEqual(key_id, meta[css.CastellanSecretStore.KEY_ID])
meta = self.plugin._meta_dict(key_id, bit_length=128)
self.assertEqual(128, meta[css.CastellanSecretStore.BIT_LENGTH])
meta = self.plugin._meta_dict(key_id, algorithm='AES')
self.assertEqual('AES', meta[css.CastellanSecretStore.ALG])
self.assertEqual(1, meta[css.CastellanSecretStore.METADATA_VERSION])
def test_generate_symmetric_key(self):
key_spec = ss.KeySpec(ss.KeyAlgorithm.AES, 128)
response = self.plugin.generate_symmetric_key(key_spec)
self.plugin.key_manager.create_key.assert_called_once_with(
mock.ANY,
ss.KeyAlgorithm.AES,
128
)
expected_response = {
css.CastellanSecretStore.KEY_ID: key_ref1,
css.CastellanSecretStore.METADATA_VERSION:
css.CastellanSecretStore.CURRENT_VERSION}
self.assertEqual(response, expected_response)
def test_generate_symmetric_key_raises_exception(self):
key_spec = ss.KeySpec(ss.KeyAlgorithm.AES, 128)
self.plugin.key_manager.create_key.side_effect = exception.Forbidden()
self.assertRaises(
ss.SecretGeneralException,
self.plugin.generate_symmetric_key,
key_spec
)
def test_generate_asymmetric_key(self):
key_spec = ss.KeySpec(ss.KeyAlgorithm.RSA, 2048)
response = self.plugin.generate_asymmetric_key(key_spec)
self.plugin.key_manager.create_key_pair.assert_called_once_with(
mock.ANY,
ss.KeyAlgorithm.RSA,
2048)
self.assertIsInstance(response, ss.AsymmetricKeyMetadataDTO)
self.assertEqual(
response.public_key_meta[css.CastellanSecretStore.KEY_ID],
key_ref2
)
self.assertEqual(
response.private_key_meta[css.CastellanSecretStore.KEY_ID],
key_ref1
)
def test_generate_asymmetric_throws_exception(self):
key_spec = ss.KeySpec(ss.KeyAlgorithm.RSA, 2048)
self.plugin.key_manager.create_key_pair.side_effect = (
exception.Forbidden()
)
self.assertRaises(
ss.SecretGeneralException,
self.plugin.generate_asymmetric_key,
key_spec
)
def test_generate_asymmetric_throws_passphrase_exception(self):
key_spec = ss.KeySpec(
alg=ss.KeyAlgorithm.RSA,
bit_length=2048,
passphrase="some passphrase"
)
self.assertRaises(
ss.GeneratePassphraseNotSupportedException,
self.plugin.generate_asymmetric_key,
key_spec
)
def test_store_secret(self):
payload = b'encrypt me!!'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
transport_key = None
secret_dto = ss.SecretDTO(ss.SecretType.SYMMETRIC,
base64.b64encode(payload),
key_spec,
content_type,
transport_key)
response = self.plugin.store_secret(secret_dto)
data = opaque_data.OpaqueData(payload)
self.plugin.key_manager.store.assert_called_once_with(
mock.ANY,
data
)
expected_response = self.plugin._meta_dict(key_ref1)
self.assertEqual(response, expected_response)
def test_store_secret_raises_exception(self):
payload = b'encrypt me!!'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
transport_key = None
secret_dto = ss.SecretDTO(ss.SecretType.SYMMETRIC,
base64.b64encode(payload),
key_spec,
content_type,
transport_key)
self.plugin.key_manager.store.side_effect = exception.Forbidden()
self.assertRaises(
ss.SecretGeneralException,
self.plugin.store_secret,
secret_dto
)
def test_get_secret(self):
secret_metadata = self.plugin._meta_dict(key_ref1, 256, 'AES')
response = self.plugin.get_secret(
ss.SecretType.SYMMETRIC,
secret_metadata
)
self.assertIsInstance(response, ss.SecretDTO)
plaintext = base64.b64decode(response.secret)
self.assertEqual(ss.SecretType.SYMMETRIC, response.type)
self.assertEqual(mock_key, plaintext)
self.plugin.key_manager.get.assert_called_once_with(
mock.ANY,
key_ref1
)
def test_get_secret_throws_exception(self):
secret_metadata = self.plugin._meta_dict(key_ref1, 256, 'AES')
self.plugin.key_manager.get.side_effect = exception.Forbidden()
self.assertRaises(
ss.SecretGeneralException,
self.plugin.get_secret,
ss.SecretType.SYMMETRIC,
secret_metadata
)
def test_delete_secret(self):
secret_metadata = {css.CastellanSecretStore.KEY_ID: key_ref1}
self.plugin.delete_secret(secret_metadata)
self.plugin.key_manager.delete.assert_called_once_with(
mock.ANY,
key_ref1
)
def test_delete_secret_throws_exception(self):
secret_metadata = {css.CastellanSecretStore.KEY_ID: key_ref1}
self.plugin.key_manager.delete.side_effect = exception.Forbidden()
self.assertRaises(
ss.SecretGeneralException,
self.plugin.delete_secret,
secret_metadata
)
def test_delete_secret_throws_key_error(self):
secret_metadata = {css.CastellanSecretStore.KEY_ID: key_ref1}
self.plugin.key_manager.delete.side_effect = KeyError()
self.plugin.delete_secret(secret_metadata)
self.plugin.key_manager.delete.assert_called_once_with(
mock.ANY,
key_ref1
)
def test_store_secret_supports(self):
self.assertTrue(
self.plugin.generate_supports(mock.ANY)
)
def test_generate_supports(self):
self.assertTrue(
self.plugin.generate_supports(mock.ANY)
)
def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())