Central logic to sync secret store data with conf data (Part 3)

Modified logic to retrieve applicable plugins when preferred project
is set.

This is part 2 of central logic changes needed for multiple backend
support work.

Change-Id: I15eb7ca88611a12677227647e3026822e67413b0
Partially-Implements: blueprint multiple-secret-backend
This commit is contained in:
Arun Kant 2016-08-18 10:27:55 -07:00
parent f4141861ef
commit b05c4b6f86
9 changed files with 723 additions and 26 deletions

View File

@ -532,6 +532,30 @@ class P11CryptoTokenException(PKCS11Exception):
message = u._("No token was found in slot %(slot_id)s")
class MultipleStorePreferredPluginMissing(BarbicanException):
"""Raised when a preferred plugin is missing in service configuration."""
def __init__(self, store_name):
super(MultipleStorePreferredPluginMissing, self).__init__(
u._("Preferred Secret Store plugin '{store_name}' is not "
"currently set in service configuration. This is probably a "
"server misconfiguration.").format(
store_name=store_name)
)
self.store_name = store_name
class MultipleStorePluginStillInUse(BarbicanException):
"""Raised when a used plugin is missing in service configuration."""
def __init__(self, store_name):
super(MultipleStorePluginStillInUse, self).__init__(
u._("Secret Store plugin '{store_name}' is still in use and can "
"not be removed. Its missing in service configuration. This is"
" probably a server misconfiguration.").format(
store_name=store_name)
)
self.store_name = store_name
class MultipleSecretStoreLookupFailed(BarbicanException):
"""Raised when a plugin lookup suffix is missing during config read."""
def __init__(self):
@ -541,7 +565,7 @@ class MultipleSecretStoreLookupFailed(BarbicanException):
class MultipleStoreIncorrectGlobalDefault(BarbicanException):
"""Raised when a plugin lookup is missing or failed during config read."""
"""Raised when a global default for only one plugin is not set to True."""
def __init__(self, occurence):
msg = None
if occurence > 1:

View File

@ -76,14 +76,15 @@ class _CryptoPluginManager(named.NamedExtensionManager):
self, invoke_args, invoke_kwargs)
def get_plugin_store_generate(self, type_needed, algorithm=None,
bit_length=None, mode=None):
bit_length=None, mode=None, project_id=None):
"""Gets a secret store or generate plugin that supports provided type.
:param type_needed: PluginSupportTypes that contains details on the
type of plugin required
:returns: CryptoPluginBase plugin implementation
"""
active_plugins = plugin_utils.get_active_plugins(self)
active_plugins = multiple_backends.get_applicable_crypto_plugins(
self, project_id=project_id, existing_plugin_name=None)
if not active_plugins:
raise crypto.CryptoPluginNotFound()
@ -125,7 +126,8 @@ class _CryptoPluginManager(named.NamedExtensionManager):
are read via updated configuration structure. If not enabled, then it
reads MultiStr property in 'crypto' config section.
"""
# to cache default global secret store value on first use
self.global_default_store_dict = None
if utils.is_multiple_backends_enabled():
parsed_stores = multiple_backends.read_multiple_backends_config()
plugin_names = [store.crypto_plugin for store in parsed_stores

View File

@ -533,12 +533,13 @@ class SecretStorePluginManager(named.NamedExtensionManager):
name_order=True # extensions sorted as per order of plugin names
)
plugin_utils.instantiate_plugins(
self, invoke_args, invoke_kwargs)
plugin_utils.instantiate_plugins(self, invoke_args, invoke_kwargs)
multiple_backends.sync_secret_stores(self)
@_enforce_extensions_configured
def get_plugin_store(self, key_spec, plugin_name=None,
transport_key_needed=False):
transport_key_needed=False, project_id=None):
"""Gets a secret store plugin.
:param: plugin_name: set to plugin_name to get specific plugin
@ -547,7 +548,8 @@ class SecretStorePluginManager(named.NamedExtensionManager):
key is required.
:returns: SecretStoreBase plugin implementation
"""
active_plugins = plugin_utils.get_active_plugins(self)
active_plugins = multiple_backends.get_applicable_store_plugins(
self, project_id=project_id, existing_plugin_name=plugin_name)
if plugin_name is not None:
for plugin in active_plugins:
@ -590,7 +592,7 @@ class SecretStorePluginManager(named.NamedExtensionManager):
raise StorePluginNotAvailableOrMisconfigured(plugin_name)
@_enforce_extensions_configured
def get_plugin_generate(self, key_spec):
def get_plugin_generate(self, key_spec, project_id=None):
"""Gets a secret generate plugin.
:param key_spec: KeySpec that contains details on the type of key to
@ -598,7 +600,10 @@ class SecretStorePluginManager(named.NamedExtensionManager):
:returns: SecretStoreBase plugin implementation
"""
for plugin in plugin_utils.get_active_plugins(self):
active_plugins = multiple_backends.get_applicable_store_plugins(
self, project_id=project_id, existing_plugin_name=None)
for plugin in active_plugins:
if plugin.generate_supports(key_spec):
return plugin
raise SecretStoreSupportedPluginNotFound()
@ -610,10 +615,12 @@ class SecretStorePluginManager(named.NamedExtensionManager):
names are read via updated configuration structure. If not enabled,
then it reads MultiStr property in 'secretstore' config section.
"""
# to cache default global secret store value on first use
self.global_default_store_dict = None
if utils.is_multiple_backends_enabled():
parsed_stores = multiple_backends.read_multiple_backends_config()
plugin_names = [store.store_plugin for store in parsed_stores
self.parsed_stores = multiple_backends.\
read_multiple_backends_config()
plugin_names = [store.store_plugin for store in self.parsed_stores
if store.store_plugin]
else:
plugin_names = secretstore_conf.secretstore.\

View File

@ -20,14 +20,15 @@ from barbican.plugin import store_crypto
from barbican.plugin.util import translations as tr
def _get_transport_key_model(key_spec, transport_key_needed):
def _get_transport_key_model(key_spec, transport_key_needed, project_id):
key_model = None
if transport_key_needed:
# get_plugin_store() will throw an exception if no suitable
# plugin with transport key is found
plugin_manager = secret_store.get_manager()
store_plugin = plugin_manager.get_plugin_store(
key_spec=key_spec, transport_key_needed=True)
key_spec=key_spec, transport_key_needed=True,
project_id=project_id)
plugin_name = utils.generate_fullname_for(store_plugin)
key_repo = repos.get_transport_key_repository()
@ -80,7 +81,8 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
# leave. A subsequent call to this method should provide both the Secret
# entity created here *and* the secret data to store into it.
if not unencrypted_raw:
key_model = _get_transport_key_model(key_spec, transport_key_needed)
key_model = _get_transport_key_model(key_spec, transport_key_needed,
project_id=project_model.id)
_save_secret_in_repo(secret_model, project_model)
return secret_model, key_model
@ -94,7 +96,8 @@ def store_secret(unencrypted_raw, content_type_raw, content_encoding,
plugin_manager = secret_store.get_manager()
store_plugin = plugin_manager.get_plugin_store(key_spec=key_spec,
plugin_name=plugin_name)
plugin_name=plugin_name,
project_id=project_model.id)
secret_dto = secret_store.SecretDTO(type=secret_model.secret_type,
secret=unencrypted,
@ -166,7 +169,8 @@ def generate_secret(spec, content_type, project_model):
mode=spec.get('mode'))
plugin_manager = secret_store.get_manager()
generate_plugin = plugin_manager.get_plugin_generate(key_spec)
generate_plugin = plugin_manager.get_plugin_generate(
key_spec, project_id=project_model.id)
# Create secret model to eventually save metadata to.
secret_model = models.Secret(spec)
@ -192,7 +196,8 @@ def generate_asymmetric_secret(spec, content_type, project_model):
passphrase=spec.get('passphrase'))
plugin_manager = secret_store.get_manager()
generate_plugin = plugin_manager.get_plugin_generate(key_spec)
generate_plugin = plugin_manager.get_plugin_generate(
key_spec, project_id=project_model.id)
# Create secret models to eventually save metadata to.
private_secret_model = models.Secret(spec)

View File

@ -74,7 +74,8 @@ class StoreCryptoAdapterPlugin(object):
# Find HSM-style 'crypto' plugin.
encrypting_plugin = manager.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT
crypto.PluginSupportTypes.ENCRYPT_DECRYPT,
project_id=context.project_model.id
)
# Find or create a key encryption key metadata.
@ -163,7 +164,8 @@ class StoreCryptoAdapterPlugin(object):
plugin_type,
key_spec.alg,
key_spec.bit_length,
key_spec.mode)
key_spec.mode,
project_id=context.project_model.id)
# Find or create a key encryption key metadata.
kek_datum_model, kek_meta_dto = _find_or_create_kek_objects(
@ -197,7 +199,8 @@ class StoreCryptoAdapterPlugin(object):
raise sstore.SecretAlgorithmNotSupportedException(key_spec.alg)
generating_plugin = manager.get_manager().get_plugin_store_generate(
plugin_type, key_spec.alg, key_spec.bit_length, None)
plugin_type, key_spec.alg, key_spec.bit_length,
project_id=context.project_model.id)
# Find or create a key encryption key metadata.
kek_datum_model, kek_meta_dto = _find_or_create_kek_objects(

View File

@ -21,6 +21,8 @@ from barbican.common import config
from barbican.common import exception
from barbican.common import utils
from barbican import i18n as u
from barbican.model import models as db_models
from barbican.model import repositories as db_repos
LOG = utils.getLogger(__name__)
@ -101,3 +103,192 @@ def read_multiple_backends_config():
global_default_count)
return parsed_stores
def sync_secret_stores(secretstore_manager, crypto_manager=None):
"""Synchronize secret store plugin names between service conf and database
This method reads secret and crypto store plugin name from service
configuration and then synchronizes corresponding data maintained in
database SecretStores table.
Any new plugin name(s) added in service configuration is added as a new
entry in SecretStores table. If global_default value is changed for
existing plugins, then global_default flag is updated to reflect that
change in database. If plugin name is removed from service configuration,
then removal is possible as long as respective plugin names are NOT set as
preferred secret store for a project. If it is used and plugin name is
removed, then error is raised. This logic is intended to be invoked at
server startup so any error raised here will result in critical failure.
"""
if not utils.is_multiple_backends_enabled():
return
# doing local import to avoid circular dependency between manager and
# current utils module
from barbican.plugin.crypto import manager as cm
secret_stores_repo = db_repos.get_secret_stores_repository()
proj_store_repo = db_repos.get_project_secret_store_repository()
if crypto_manager is None:
crypto_manager = cm.get_manager()
def get_friendly_name_dict(ext_manager):
"""Returns dict of plugin internal name and friendly name entries."""
names_dict = {}
for ext in ext_manager.extensions:
if ext.obj and hasattr(ext.obj, 'get_plugin_name'):
names_dict[ext.name] = ext.obj.get_plugin_name()
return names_dict
ss_friendly_names = get_friendly_name_dict(secretstore_manager)
crypto_friendly_names = get_friendly_name_dict(crypto_manager)
# get existing secret stores data from database
db_stores = secret_stores_repo.get_all()
# read secret store data from service configuration
conf_stores = []
for parsed_store in secretstore_manager.parsed_stores:
crypto_plugin = parsed_store.crypto_plugin
if not crypto_plugin:
crypto_plugin = None
if crypto_plugin:
friendly_name = crypto_friendly_names.get(crypto_plugin)
else:
friendly_name = ss_friendly_names.get(parsed_store.store_plugin)
conf_stores.append(db_models.SecretStores(
name=friendly_name, store_plugin=parsed_store.store_plugin,
crypto_plugin=crypto_plugin,
global_default=parsed_store.global_default))
if db_stores:
def fn_match(lh_store, rh_store):
return (lh_store.store_plugin == rh_store.store_plugin and
lh_store.crypto_plugin == rh_store.crypto_plugin)
for conf_store in conf_stores:
# find existing db entry for plugin using conf based plugin names
db_store_match = next((db_store for db_store in db_stores if
fn_match(conf_store, db_store)), None)
if db_store_match:
# update existing db entry if global default is changed now
if db_store_match.global_default != conf_store.global_default:
db_store_match.global_default = conf_store.global_default
# persist flag change.
db_store_match.save()
# remove matches store from local list after processing
db_stores.remove(db_store_match)
else: # new conf entry as no match found in existing entries
secret_stores_repo.create_from(conf_store)
# entries still present in db list are no longer configured in service
# configuration, so try to remove them provided there is no project
# is using it as preferred secret store.
for db_store in db_stores:
if proj_store_repo.get_count_by_secret_store(db_store.id) == 0:
secret_stores_repo.delete_entity_by_id(db_store.id, None)
else:
raise exception.MultipleStorePluginStillInUse(db_store.name)
else: # initial setup case when there is no secret stores data in db
for conf_store in conf_stores:
secret_stores_repo.create_from(conf_store)
def get_global_default_secret_store():
secret_store_repo = db_repos.get_secret_stores_repository()
default_ss = None
for secret_store in secret_store_repo.get_all():
if secret_store.global_default:
default_ss = secret_store
break
return default_ss
def get_applicable_crypto_plugins(manager, project_id, existing_plugin_name):
"""Get list of crypto plugins available for use.
:param: manager instance of crypto manager
:param: project_id project to identify preferred store if set
:param: existing_plugin_name full plugin name. If a secret has an existing
plugin defined, then we do not care if any preferred plugins have
been defined. We will return all configured plugins as if multiple
plugin support was not enabled. Subsequent code in the caller will
select the plugin by name.
When multiple backends support is enabled:
It return project preferred plugin as list when it is setup earlier.
If project preferred plugin is not set, then it uses plugin from default
secret store.
Plugin name is 'crypto_plugin' field value on identified secret store data.
It returns matched plugin as list to match existing functionality.
When multiple backends support is NOT enabled:
In this case, it just returns list of all active plugins which is
existing functionality before support for multiple backends is added.
"""
return _get_applicable_plugins_for_type(manager, project_id,
existing_plugin_name,
'crypto_plugin')
def get_applicable_store_plugins(manager, project_id, existing_plugin_name):
"""Get list of secret store plugins available for use.
:param: manager instance of secret store manager
:param: project_id project to identify preferred store if set
:param: existing_plugin_name full plugin name. If a secret has an existing
plugin defined, then we do not care if any preferred plugins have
been defined. We will return all configured plugins as if multiple
plugin support was not enabled. Subsequent code in the caller will
select the plugin by name.
When multiple backends support is enabled:
It return project preferred plugin as list when it is setup earlier.
If project preferred plugin is not set, then it uses plugin from default
secret store.
Plugin name is 'store_plugin' field value on identified secret store data.
It returns matched plugin as list to match existing functionality.
When multiple backends support is NOT enabled:
In this case, it just returns list of all active plugins which is
existing functionality before support for multiple backends is added.
"""
return _get_applicable_plugins_for_type(manager, project_id,
existing_plugin_name,
'store_plugin')
def _get_applicable_plugins_for_type(manager, project_id, existing_plugin_name,
plugin_type_field):
plugins = []
plugin_dict = {ext.name: ext.obj for ext in manager.extensions if ext.obj}
if utils.is_multiple_backends_enabled() and existing_plugin_name is None:
proj_store_repo = db_repos.get_project_secret_store_repository()
plugin_store = proj_store_repo.get_secret_store_for_project(
project_id, None, suppress_exception=True)
# If project specific store is not set, then use global default one.
if not plugin_store:
if manager.global_default_store_dict is None:
# Need to cache data as dict instead of db object to be usable
# across various request sqlalchemy sessions
store_dict = get_global_default_secret_store().to_dict_fields()
manager.global_default_store_dict = store_dict
secret_store_data = manager.global_default_store_dict
else:
secret_store_data = plugin_store.secret_store.to_dict_fields()
applicable_plugin_name = secret_store_data[plugin_type_field]
if applicable_plugin_name in plugin_dict:
plugins = [plugin_dict.get(applicable_plugin_name)]
elif applicable_plugin_name: # applicable_plugin_name has value
raise exception.MultipleStorePreferredPluginMissing(
applicable_plugin_name)
else:
plugins = plugin_dict.values()
return plugins

View File

@ -20,6 +20,7 @@ from barbican.plugin.crypto import crypto
from barbican.plugin.crypto import manager as cm
from barbican.plugin.crypto import p11_crypto
from barbican.plugin.interface import secret_store as str
from barbican.plugin import kmip_secret_store as kss
from barbican.plugin import store_crypto
from barbican.tests import utils
@ -259,9 +260,7 @@ class TestSecretStorePluginManagerMultipleBackend(
utils.MultipleBackendsTestCase):
def test_plugin_created_as_per_mulitple_backend_conf(self):
"""Check plugins are created as per multiple backend conf
"""
"""Check plugins are created as per multiple backend conf"""
store_plugin_names = ['store_crypto', 'kmip_plugin', 'store_crypto']
crypto_plugin_names = ['p11_crypto', '', 'simple_crypto']
@ -288,3 +287,30 @@ class TestSecretStorePluginManagerMultipleBackend(
crypto_plugin = cm.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT)
self.assertIsInstance(crypto_plugin, p11_crypto.P11CryptoPlugin)
def test_plugin_created_kmip_default_mulitple_backend_conf(self):
"""Check plugins are created as per multiple backend conf
Here KMIP plugin is marked as global default plugin
"""
store_plugin_names = ['store_crypto', 'kmip_plugin', 'store_crypto']
crypto_plugin_names = ['p11_crypto', '', 'simple_crypto']
self.init_via_conf_file(store_plugin_names,
crypto_plugin_names, enabled=True,
global_default_index=1)
with mock.patch('barbican.plugin.crypto.p11_crypto.P11CryptoPlugin.'
'_create_pkcs11') as m_pkcs11, \
mock.patch('kmip.pie.client.ProxyKmipClient') as m_kmip:
manager = str.SecretStorePluginManager()
# check pkcs11 and kmip plugin instantiation call is invoked
m_pkcs11.called_once_with(mock.ANY, mock.ANY)
m_kmip.called_once_with(mock.ANY)
# check kmip store is matched as its global default store.
keySpec = str.KeySpec(str.KeyAlgorithm.AES, 128)
plugin_found = manager.get_plugin_store(keySpec)
self.assertIsInstance(plugin_found, kss.KMIPSecretStore)

View File

@ -15,9 +15,19 @@
import collections
import mock
import uuid
from barbican.common import config
from barbican.common import exception
from barbican.model import models
from barbican.model import repositories
from barbican.plugin.crypto import crypto
from barbican.plugin.crypto import manager as cm
from barbican.plugin.crypto import p11_crypto
from barbican.plugin.crypto import simple_crypto
from barbican.plugin.interface import secret_store
from barbican.plugin import kmip_secret_store as kss
from barbican.plugin import store_crypto
from barbican.plugin.util import multiple_backends
from barbican.tests import utils as test_utils
@ -26,7 +36,8 @@ class MockedManager(object):
NAME_PREFIX = "friendly_"
def __init__(self, names):
def __init__(self, names, enabled=True,
plugin_lookup_field='store_plugin'):
ExtTuple = collections.namedtuple('ExtTuple', ['name', 'obj'])
self.extensions = []
for name in names:
@ -34,6 +45,8 @@ class MockedManager(object):
m.get_plugin_name.return_value = self.NAME_PREFIX + name
new_extension = ExtTuple(name, m)
self.extensions.append(new_extension)
self.global_default_store_dict = None
self.parsed_stores = multiple_backends.read_multiple_backends_config()
class WhenReadingMultipleBackendsConfig(test_utils.MultipleBackendsTestCase):
@ -99,6 +112,7 @@ class WhenReadingMultipleBackendsConfig(test_utils.MultipleBackendsTestCase):
stores = multiple_backends.read_multiple_backends_config()
self.assertEqual(len(ss_plugins), len(stores))
self.assertEqual('', stores[1].crypto_plugin)
def test_fail_when_global_default_not_specified(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
@ -136,3 +150,425 @@ class WhenReadingMultipleBackendsConfig(test_utils.MultipleBackendsTestCase):
self.assertRaises(exception.MultipleStorePluginValueMissing,
multiple_backends.read_multiple_backends_config)
class WhenInvokingSyncSecretStores(test_utils.MultipleBackendsTestCase):
def setUp(self):
super(WhenInvokingSyncSecretStores, self).setUp()
def test_successful_syncup_no_existing_secret_stores(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3', 'ss_p4', 'ss_p5']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3', 'cr_p4', 'cr_p5']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
default_secret_store = multiple_backends.\
get_global_default_secret_store()
self.assertEqual('ss_p1', default_secret_store.store_plugin)
self.assertEqual('cr_p1', default_secret_store.crypto_plugin)
self.assertEqual(MockedManager.NAME_PREFIX + 'cr_p1',
default_secret_store.name)
ss_db_entries = repositories.get_secret_stores_repository().get_all()
self.assertEqual(5, len(ss_db_entries))
def test_syncup_with_existing_secret_stores(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3', 'ss_p4', 'ss_p5']
cr_plugins = ['cr_p1', '', 'cr_p3', 'cr_p4', 'cr_p5']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
ss_db_entries = repositories.get_secret_stores_repository().get_all()
self.assertEqual(5, len(ss_db_entries))
# check friendly name for the case when crypto plugin is not there
ss_db_entry = self._get_secret_store_entry('ss_p2', None)
self.assertIsNotNone(ss_db_entry)
self.assertEqual(MockedManager.NAME_PREFIX + 'ss_p2',
ss_db_entry.name)
ss_plugins = ['ss_p3', 'ss_p4', 'ss_p5', 'ss_p6']
cr_plugins = ['cr_p3', 'cr_p4', 'cr_p5', 'cr_p6']
# update conf and re-run sync store
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
ss_db_entry = self._get_secret_store_entry('ss_p2', 'cr_p2')
self.assertIsNone(ss_db_entry)
ss_db_entry = self._get_secret_store_entry('ss_p6', 'cr_p6')
self.assertIsNotNone(ss_db_entry)
default_secret_store = multiple_backends.\
get_global_default_secret_store()
self.assertEqual('ss_p3', default_secret_store.store_plugin)
self.assertEqual('cr_p3', default_secret_store.crypto_plugin)
self.assertEqual(MockedManager.NAME_PREFIX + 'cr_p3',
default_secret_store.name)
ss_db_entries = repositories.get_secret_stores_repository().get_all()
self.assertEqual(4, len(ss_db_entries))
def test_syncup_modify_global_default(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3', 'ss_p4', 'ss_p5']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3', 'cr_p4', 'cr_p5']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
global_secret_store = multiple_backends.\
get_global_default_secret_store()
self.assertEqual('ss_p1', global_secret_store.store_plugin)
self.assertEqual('cr_p1', global_secret_store.crypto_plugin)
self.assertEqual(MockedManager.NAME_PREFIX + 'cr_p1',
global_secret_store.name)
ss_plugins = ['ss_p9', 'ss_p4', 'ss_p5']
cr_plugins = ['cr_p9', 'cr_p4', 'cr_p5']
# update conf and re-run sync store
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
global_secret_store = multiple_backends.\
get_global_default_secret_store()
self.assertEqual('ss_p9', global_secret_store.store_plugin)
self.assertEqual('cr_p9', global_secret_store.crypto_plugin)
self.assertEqual(MockedManager.NAME_PREFIX + 'cr_p9',
global_secret_store.name)
def test_syncup_with_store_and_crypto_plugins_count_mismatch(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3', 'ss_p4']
cr_plugins = ['cr_p1', '', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
# empty crypto_plugin name maps to None in database entry
ss_db_entry = self._get_secret_store_entry('ss_p2', None)
self.assertIsNotNone(ss_db_entry)
ss_db_entry = self._get_secret_store_entry('ss_p2', '')
self.assertIsNone(ss_db_entry)
# missing crypto plugin name maps to None in database entry
ss_db_entry = self._get_secret_store_entry('ss_p4', None)
self.assertIsNotNone(ss_db_entry)
def test_syncup_delete_secret_store_with_preferred_project_using_it(self):
"""Removing secret store will fail if its defined as preferred store.
"""
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3', 'ss_p4']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3', 'cr_p4']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
multiple_backends.sync_secret_stores(secretstore_manager,
crypto_manager)
with mock.patch('barbican.model.repositories.'
'get_project_secret_store_repository') as ps_repo:
# Mocking with 2 projects as using preferred secret store
ps_repo.get_count_by_secret_store.return_value = 2
ss_plugins = ['ss_p3', 'ss_p4']
cr_plugins = ['cr_p3', 'cr_p4']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
secretstore_manager = MockedManager(ss_plugins)
crypto_manager = MockedManager(cr_plugins)
self.assertRaises(exception.MultipleStorePluginStillInUse,
multiple_backends.sync_secret_stores,
secretstore_manager, crypto_manager)
def test_get_global_default_store_when_multiple_backends_disabled(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=False)
default_store = multiple_backends.get_global_default_secret_store()
self.assertIsNone(default_store)
class TestGetApplicablePlugins(test_utils.MultipleBackendsTestCase):
def setUp(self):
super(TestGetApplicablePlugins, self).setUp()
def test_get_when_project_preferred_plugin_is_set(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
ss_manager = MockedManager(ss_plugins)
project_id = uuid.uuid4().hex
with mock.patch('barbican.model.repositories.ProjectSecretStoreRepo.'
'get_secret_store_for_project') as pref_func:
# set preferred secret store to one of value in config
m_dict = {'store_plugin': 'ss_p3'}
m_rec = mock.MagicMock()
m_rec.secret_store.to_dict_fields.return_value = m_dict
pref_func.return_value = m_rec
objs = multiple_backends.get_applicable_store_plugins(
ss_manager, project_id, None)
self.assertIn(project_id, pref_func.call_args_list[0][0])
self.assertIsInstance(objs, list)
self.assertEqual(1, len(objs))
self.assertIn('ss_p3', objs[0].get_plugin_name())
def test_get_when_project_preferred_plugin_is_not_found_in_conf(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True)
ss_manager = MockedManager(ss_plugins)
project_id = uuid.uuid4().hex
with mock.patch('barbican.model.repositories.ProjectSecretStoreRepo.'
'get_secret_store_for_project') as pref_func:
# set preferred secret store value which is not defined in config
m_dict = {'store_plugin': 'old_preferred_plugin'}
m_rec = mock.MagicMock()
m_rec.secret_store.to_dict_fields.return_value = m_dict
pref_func.return_value = m_rec
self.assertRaises(exception.MultipleStorePreferredPluginMissing,
multiple_backends.get_applicable_store_plugins,
ss_manager, project_id, None)
self.assertIn(project_id, pref_func.call_args_list[0][0])
def test_get_when_project_preferred_plugin_not_set_then_default_used(self):
ss_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
# setting second plugin to be global default
self.init_via_conf_file(ss_plugins, cr_plugins, enabled=True,
global_default_index=1)
cr_manager = MockedManager(cr_plugins,
plugin_lookup_field='crypto_plugin')
project_id = uuid.uuid4().hex
with mock.patch('barbican.plugin.util.multiple_backends.'
'get_global_default_secret_store') as gd_func:
m_dict = {'crypto_plugin': 'cr_p2'}
gd_func.return_value.to_dict_fields.return_value = m_dict
objs = multiple_backends.get_applicable_crypto_plugins(cr_manager,
project_id,
None)
gd_func.assert_called_once_with()
self.assertIsInstance(objs, list)
self.assertEqual(1, len(objs))
self.assertIn('cr_p2', objs[0].get_plugin_name())
# call again with no project_id set
objs = multiple_backends.get_applicable_crypto_plugins(cr_manager,
None, None)
gd_func.assert_called_once_with()
self.assertIsInstance(objs, list)
self.assertEqual(1, len(objs))
self.assertIn('cr_p2', objs[0].get_plugin_name())
def test_get_applicable_store_plugins_when_multiple_backend_not_enabled(
self):
ss_config = config.get_module_config('secretstore')
ss_plugins = ['ss_p11', 'ss_p22', 'ss_p33', 'ss_p44']
ss_conf_plugins = ['ss_p1', 'ss_p2', 'ss_p3']
cr_conf_plugins = ['cr_p1', 'cr_p2', 'cr_p3']
self.init_via_conf_file(ss_conf_plugins, cr_conf_plugins,
enabled=False)
ss_manager = MockedManager(ss_plugins)
ss_config.set_override("enabled_secretstore_plugins",
ss_plugins, group='secretstore',
enforce_type=True)
objs = multiple_backends.get_applicable_store_plugins(ss_manager, None,
None)
self.assertEqual(4, len(objs))
@test_utils.parameterized_test_case
class TestPluginsGenerateStoreAPIMultipleBackend(
test_utils.MultipleBackendsTestCase):
backend_dataset = {
"db_backend": [{
'store_plugins': ['store_crypto', 'kmip_plugin', 'store_crypto'],
'crypto_plugins': ['simple_crypto', '', 'p11_crypto'],
'default_store_class': store_crypto.StoreCryptoAdapterPlugin,
'default_crypto_class': simple_crypto.SimpleCryptoPlugin
}],
"kmip": [{
'store_plugins': ['kmip_plugin', 'store_crypto', 'store_crypto'],
'crypto_plugins': ['', 'p11_crypto', 'simple_crypto'],
'default_store_class': kss.KMIPSecretStore,
'default_crypto_class': None
}],
"pkcs11": [{
'store_plugins': ['store_crypto', 'store_crypto', 'kmip_plugin'],
'crypto_plugins': ['p11_crypto', 'simple_crypto', ''],
'default_store_class': store_crypto.StoreCryptoAdapterPlugin,
'default_crypto_class': p11_crypto.P11CryptoPlugin
}]
}
def setUp(self):
super(TestPluginsGenerateStoreAPIMultipleBackend, self).setUp()
def _create_project(self):
session = repositories.get_project_repository().get_session()
project = models.Project()
project.external_id = "keystone_project_id" + uuid.uuid4().hex
project.save(session=session)
return project
def _create_project_store(self, project_id, secret_store_id):
proj_store_repo = repositories.get_project_secret_store_repository()
session = proj_store_repo.get_session()
proj_model = models.ProjectSecretStore(project_id, secret_store_id)
proj_s_store = proj_store_repo.create_from(proj_model, session)
proj_s_store.save(session=session)
return proj_s_store
@test_utils.parameterized_dataset(backend_dataset)
def test_no_preferred_default_plugin(self, dataset):
"""Check name, plugin and crypto class used for default secret store
Secret store name is crypto class plugin name if defined otherwise user
friendly name is derived from store class plugin name
"""
self.init_via_conf_file(dataset['store_plugins'],
dataset['crypto_plugins'],
enabled=True)
with mock.patch('barbican.plugin.crypto.p11_crypto.P11CryptoPlugin.'
'_create_pkcs11'), \
mock.patch('kmip.pie.client.ProxyKmipClient'):
manager = secret_store.SecretStorePluginManager()
keySpec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES, 128)
plugin_found = manager.get_plugin_store(keySpec)
self.assertIsInstance(plugin_found,
dataset['default_store_class'])
global_secret_store = multiple_backends.\
get_global_default_secret_store()
if dataset['default_crypto_class']:
crypto_plugin = cm.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT)
self.assertIsInstance(crypto_plugin,
dataset['default_crypto_class'])
# make sure secret store name is same as crypto class friendly name
# as store_plugin class is not direct impl of SecretStoreBase
self.assertEqual(global_secret_store.name,
crypto_plugin.get_plugin_name())
else: # crypto class is not used
# make sure secret store name is same as store plugin class
# friendly name
self.assertEqual(global_secret_store.name,
plugin_found.get_plugin_name())
# error raised for no crypto plugin
self.assertRaises(crypto.CryptoPluginNotFound,
cm.get_manager().get_plugin_store_generate,
crypto.PluginSupportTypes.ENCRYPT_DECRYPT)
@test_utils.parameterized_dataset(backend_dataset)
def test_project_preferred_default_plugin(self, dataset):
"""Check project preferred behavior with different global default"""
self.init_via_conf_file(dataset['store_plugins'],
dataset['crypto_plugins'],
enabled=True)
with mock.patch('barbican.plugin.crypto.p11_crypto.P11CryptoPlugin.'
'_create_pkcs11'), \
mock.patch('kmip.pie.client.ProxyKmipClient'):
manager = secret_store.SecretStorePluginManager()
pkcs11_secret_store = self._get_secret_store_entry('store_crypto',
'p11_crypto')
kmip_secret_store = self._get_secret_store_entry('kmip_plugin', None)
db_secret_store = self._get_secret_store_entry('store_crypto',
'simple_crypto')
project1 = self._create_project()
project2 = self._create_project()
project3 = self._create_project()
# For project1 , make pkcs11 as preferred secret store
self._create_project_store(project1.id, pkcs11_secret_store.id)
# For project2 , make kmip as preferred secret store
self._create_project_store(project2.id, kmip_secret_store.id)
# For project3 , make db backend as preferred secret store
self._create_project_store(project3.id, db_secret_store.id)
keySpec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES, 128)
cm_manager = cm.get_manager()
# For project1, verify store and crypto plugin instance used are pkcs11
# specific
plugin_found = manager.get_plugin_store(keySpec,
project_id=project1.id)
self.assertIsInstance(plugin_found,
store_crypto.StoreCryptoAdapterPlugin)
crypto_plugin = cm.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT, project_id=project1.id)
self.assertIsInstance(crypto_plugin, p11_crypto.P11CryptoPlugin)
# For project2, verify store plugin instance is kmip specific
# and there is no crypto plugin instance
plugin_found = manager.get_plugin_store(keySpec,
project_id=project2.id)
self.assertIsInstance(plugin_found, kss.KMIPSecretStore)
self.assertRaises(
crypto.CryptoPluginNotFound, cm_manager.get_plugin_store_generate,
crypto.PluginSupportTypes.ENCRYPT_DECRYPT, project_id=project2.id)
# For project3, verify store and crypto plugin instance used are db
# backend specific
plugin_found = manager.get_plugin_store(keySpec,
project_id=project3.id)
self.assertIsInstance(plugin_found,
store_crypto.StoreCryptoAdapterPlugin)
crypto_plugin = cm.get_manager().get_plugin_store_generate(
crypto.PluginSupportTypes.ENCRYPT_DECRYPT, project_id=project3.id)
self.assertIsInstance(crypto_plugin, simple_crypto.SimpleCryptoPlugin)
# Make sure for project with no preferred setting, uses global default
project4 = self._create_project()
plugin_found = manager.get_plugin_store(keySpec,
project_id=project4.id)
self.assertIsInstance(plugin_found,
dataset['default_store_class'])

View File

@ -106,6 +106,9 @@ class BaseTestCase(oslotest.BaseTestCase):
def tearDown(self):
super(BaseTestCase, self).tearDown()
ss_conf = config.get_module_config('secretstore')
ss_conf.clear_override("enable_multiple_secret_stores",
group='secretstore')
class MockModelRepositoryMixin(object):