Adds store_secret_supports to secret_store

This adds a new method, store_secret_supports, to the SecretStore interface.
Given a KeySpec, this method will allow the plugin manager to check if a plugin
supports storing the secret data.

For example, a plugin that only supports storing secrets if the secret
attributes are defined and valid will check the attributes are not None, but
some plugins may be able to store any secret as a blob whether or not the
attributes are defined and thus will define this method to always return True.

Implements: blueprint create-secret-store
(https://blueprints.launchpad.net/barbican/+spec/create-secret-store)
Change-Id: I5f1b196f36d02587cd35aac7cdfa1c152743aaa6
This commit is contained in:
Kaitlin Farr
2014-08-07 10:42:45 -07:00
parent 248c7d6b10
commit ef79bc5927
5 changed files with 133 additions and 34 deletions

View File

@@ -259,6 +259,14 @@ class DogtagPlugin(sstore.SecretStoreBase):
"""
return self._map_algorithm(key_spec.alg) is not None
def store_secret_supports(self, key_spec):
"""Key storage supported?
Specifies whether the plugin supports storage of the secret given
the attributes included in the KeySpec
"""
return True
@staticmethod
def _map_algorithm(algorithm):
"""Map Barbican algorithms to Dogtag plugin algorithms.

View File

@@ -163,6 +163,8 @@ class KeyAlgorithm(object):
RSA = "rsa"
"""Constant for the Elliptic Curve algorithm."""
EC = "ec"
"""List of asymmetric algorithms"""
ASYMMETRIC_ALGORITHMS = [DIFFIE_HELLMAN, DSA, RSA, EC]
"""Constant for the AES algorithm."""
AES = "aes"
@@ -170,6 +172,14 @@ class KeyAlgorithm(object):
DES = "des"
"""Constant for the DESede (triple-DES) algorithm."""
DESEDE = "desede"
"""List of symmetric algorithms"""
SYMMETRIC_ALGORITHMS = [AES, DES, DESEDE]
def get_secret_type(self, alg):
if alg in self.SYMMETRIC_ALGORITHMS:
return SecretType.SYMMETRIC
else: # TODO(kaitlin-farr) add asymmetric once it's supported
return None
class KeySpec(object):
@@ -345,6 +355,20 @@ class SecretStoreBase(object):
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def store_secret_supports(self, key_spec):
"""Returns a boolean indicating if the secret can be stored.
Checks if the secret store can store the secret, give the attributes
of the secret in the KeySpec. For example, some plugins may need to
know the attributes in order to store the secret, but other plugins
may be able to store the secret as a blob if no attributes are given.
:param key_spec: KeySpec for the secret
:returns: a boolean indicating if the secret can be stored
"""
raise NotImplementedError # pragma: no cover
def get_transport_key(self):
"""Gets a transport key.
@@ -382,9 +406,11 @@ class SecretStorePluginManager(named.NamedExtensionManager):
invoke_kwds=invoke_kwargs
)
def get_plugin_store(self, plugin_name=None, transport_key_needed=False):
def get_plugin_store(self, key_spec, plugin_name=None,
transport_key_needed=False):
"""Gets a secret store plugin.
:param: plugin_name: set to plugin_name to get specific plugin
:param: key_spec: KeySpec of key that will be stored
:param: transport_key_needed: set to True if a transport
key is required.
:returns: SecretStoreBase plugin implementation
@@ -400,10 +426,14 @@ class SecretStorePluginManager(named.NamedExtensionManager):
raise SecretStoreSupportedPluginNotFound()
if not transport_key_needed:
return self.extensions[0].obj
for ext in self.extensions:
if ext.obj.store_secret_supports(key_spec):
return ext.obj
for ext in self.extensions:
if ext.obj.get_transport_key() is not None:
else:
for ext in self.extensions:
if ext.obj.get_transport_key() is not None and\
ext.obj.store_secret_supports(key_spec):
return ext.obj
raise SecretStoreSupportedPluginNotFound()

View File

@@ -17,13 +17,13 @@ from barbican.plugin.interface import secret_store
from barbican.plugin.util import translations as tr
def get_transport_key_model(repos, transport_key_needed):
def get_transport_key_model(key_spec, repos, transport_key_needed):
key_model = None
if transport_key_needed:
# get_plugin_store() will throw an exception if no suitable
# plugin with transport key is found
store_plugin = secret_store.SecretStorePluginManager(). \
get_plugin_store(transport_key_needed=True)
get_plugin_store(key_spec=key_spec, transport_key_needed=True)
plugin_name = utils.generate_fullname_for(store_plugin)
key_repo = repos.transport_key_repo
@@ -72,11 +72,18 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
elif _secret_already_has_stored_data(secret_model):
raise ValueError('Secret already has encrypted data stored for it.')
# Create a KeySpec to find a plugin that will support storing the secret
key_spec = secret_store.KeySpec(alg=spec.get('algorithm'),
bit_length=spec.get('bit_length'),
mode=spec.get('mode'))
# If there is no secret data to store, then just create Secret entity and
# leave. A subsequent call to this method should provide both the Secret
# entity created here *and* the secret data to store into it.
if not unencrypted_raw:
key_model = get_transport_key_model(repos, transport_key_needed)
key_model = get_transport_key_model(key_spec,
repos,
transport_key_needed)
_save_secret(secret_model, tenant_model, repos)
return secret_model, key_model
@@ -86,7 +93,7 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
# Locate a suitable plugin to store the secret.
store_plugin = secret_store.SecretStorePluginManager().\
get_plugin_store(plugin_name=plugin_name)
get_plugin_store(key_spec=key_spec, plugin_name=plugin_name)
# Normalize inputs prior to storage.
#TODO(john-wood-w) Normalize all secrets to base64, so we don't have to
@@ -101,10 +108,8 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
context = secret_store.SecretStoreContext(secret_model=secret_model,
tenant_model=tenant_model,
repos=repos)
key_spec = secret_store.KeySpec(alg=spec.get('algorithm'),
bit_length=spec.get('bit_length'),
mode=spec.get('mode'))
secret_dto = secret_store.SecretDTO(type=None,
secret_type = secret_store.KeyAlgorithm().get_secret_type(key_spec.alg)
secret_dto = secret_store.SecretDTO(type=secret_type,
secret=unencrypted,
key_spec=key_spec,
content_type=content_type,

View File

@@ -172,6 +172,14 @@ class StoreCryptoAdapterPlugin(sstore.SecretStoreBase):
"""
return key_spec and sstore.KeyAlgorithm.AES == key_spec.alg.lower()
def store_secret_supports(self, key_spec):
"""Key storage supported?
Specifies whether the plugin supports storage of the secret given
the attributes included in the KeySpec
"""
return True
def _find_or_create_kek_objects(self, plugin_inst, tenant_model, kek_repo):
# Find or create a key encryption key.
full_plugin_name = utils.generate_fullname_for(plugin_inst)

View File

@@ -22,9 +22,9 @@ from barbican.plugin.interface import secret_store as str
class TestSecretStore(str.SecretStoreBase):
"""Secret store plugin for testing support."""
def __init__(self, generate_supports_response):
def __init__(self, supported_alg_list):
super(TestSecretStore, self).__init__()
self.generate_supports_response = generate_supports_response
self.alg_list = supported_alg_list
def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
@@ -39,11 +39,14 @@ class TestSecretStore(str.SecretStoreBase):
raise NotImplementedError # pragma: no cover
def generate_supports(self, key_spec):
return self.generate_supports_response
return key_spec.alg in self.alg_list
def delete_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover
def store_secret_supports(self, key_spec):
return key_spec.alg in self.alg_list
class TestSecretStoreWithTransportKey(str.SecretStoreBase):
"""Secret store plugin for testing support.
@@ -51,9 +54,9 @@ class TestSecretStoreWithTransportKey(str.SecretStoreBase):
This plugin will override the relevant methods for key wrapping.
"""
def __init__(self, generate_supports_response):
def __init__(self, supported_alg_list):
super(TestSecretStoreWithTransportKey, self).__init__()
self.generate_supports_response = generate_supports_response
self.alg_list = supported_alg_list
def generate_symmetric_key(self, key_spec):
raise NotImplementedError # pragma: no cover
@@ -68,11 +71,14 @@ class TestSecretStoreWithTransportKey(str.SecretStoreBase):
raise NotImplementedError # pragma: no cover
def generate_supports(self, key_spec):
return self.generate_supports_response
return key_spec.alg in self.alg_list
def delete_secret(self, secret_metadata):
raise NotImplementedError # pragma: no cover
def store_secret_supports(self, key_spec):
return key_spec.alg in self.alg_list
def get_transport_key(self):
return "transport key"
@@ -87,65 +93,107 @@ class WhenTestingSecretStorePluginManager(testtools.TestCase):
self.manager = str.SecretStorePluginManager()
def test_get_store_supported_plugin(self):
plugin = TestSecretStore(True)
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertEqual(plugin,
self.manager.get_plugin_store())
self.manager.get_plugin_store(keySpec))
def test_get_generate_supported_plugin(self):
plugin = TestSecretStore(True)
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertEqual(plugin,
self.manager.get_plugin_generate(keySpec))
def test_get_store_no_plugin_found(self):
self.manager.extensions = []
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStorePluginNotFound,
self.manager.get_plugin_store,
keySpec,
)
def test_get_generate_no_plugin_found(self):
self.manager.extensions = []
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStorePluginNotFound,
self.manager.get_plugin_generate,
keySpec,
)
def test_get_generate_no_supported_plugin(self):
plugin = TestSecretStore(False)
def test_get_store_no_supported_plugin(self):
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec('AES', 128)
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
keySpec,
)
def test_get_generate_no_supported_plugin(self):
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_generate,
keySpec,
)
def test_get_store_no_plugin_with_tkey(self):
plugin = TestSecretStore(False)
def test_get_store_no_plugin_with_tkey_and_no_supports_storage(self):
plugin = TestSecretStore([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)
def test_get_store_with_tkey(self):
plugin1 = TestSecretStore(False)
def test_get_store_plugin_with_tkey_and_no_supports_storage(self):
plugin = TestSecretStoreWithTransportKey([])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)
def test_get_store_plugin_with_no_tkey_and_supports_storage(self):
plugin = TestSecretStore([str.KeyAlgorithm.AES])
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertRaises(
str.SecretStoreSupportedPluginNotFound,
self.manager.get_plugin_store,
key_spec=keySpec,
transport_key_needed=True,
)
def test_get_store_plugin_with_tkey_and_supports_storage(self):
plugin1 = TestSecretStore([str.KeyAlgorithm.AES])
plugin1_mock = mock.MagicMock(obj=plugin1)
plugin2 = TestSecretStoreWithTransportKey(False)
plugin2 = TestSecretStoreWithTransportKey([str.KeyAlgorithm.AES])
plugin2_mock = mock.MagicMock(obj=plugin2)
self.manager.extensions = [plugin1_mock, plugin2_mock]
self.assertEqual(
plugin2,
self.manager.get_plugin_store(transport_key_needed=True))
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
self.assertEqual(plugin2,
self.manager.get_plugin_store(
key_spec=keySpec,
transport_key_needed=True))