Adding functional tests for multiple backend changes (Part 5)

Change-Id: Iaf02d446a178baaa3e61d6a7267717822bd957f8
Partially-Implements: blueprint multiple-secret-backend
This commit is contained in:
Arun Kant 2016-08-23 16:28:15 -07:00
parent 6535e559cd
commit 845b3d045b
7 changed files with 459 additions and 1 deletions

View File

@ -106,6 +106,7 @@ def setup_database_engine_and_factory():
# session instance per thread.
session_maker = sa_orm.sessionmaker(bind=_ENGINE)
_SESSION_FACTORY = sqlalchemy.orm.scoped_session(session_maker)
_initialize_secret_stores_data()
def start():
@ -204,6 +205,18 @@ def _get_engine(engine):
return engine
def _initialize_secret_stores_data():
"""Initializes secret stores data in database.
This logic is executed only when database engine and factory is built.
Secret store get_manager internally reads secret store plugin configuration
from service configuration and saves it in secret_stores table in database.
"""
if utils.is_multiple_backends_enabled():
from barbican.plugin.interface import secret_store
secret_store.get_manager()
def is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended

View File

@ -69,6 +69,10 @@ auditor_b_password=barbican
# Default value is True.
server_host_href_set = True
# Flag to indicate if multiple backends support is enabled or not at barbican
# server side. Functional tests behavior changes depending on this flag value.
server_multiple_backends_enabled = False
[quotas]
# For each resource, the default maximum number that can be used for
# a project is set below. This value can be overridden for each

View File

@ -31,6 +31,8 @@ from functionaltests.common import config
CONF = config.get_config()
conf_host_href_used = CONF.keymanager.server_host_href_set
conf_multiple_backends_enabled = CONF.keymanager.\
server_multiple_backends_enabled
class TestCase(oslotest.BaseTestCase):

View File

@ -0,0 +1,101 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.api.v1.behaviors import base_behaviors
class SecretStoresBehaviors(base_behaviors.BaseBehaviors):
def get_all_secret_stores(self, extra_headers=None, use_auth=True,
user_name=None):
"""Retrieves list of secret stores available in barbican"""
resp = self.client.get('secret-stores',
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
json_data = None
if resp.status_code == 200:
json_data = self.get_json(resp)
return resp, json_data
def get_global_default(self, extra_headers=None, use_auth=True,
user_name=None):
"""Retrieves global default secret store."""
resp = self.client.get('secret-stores/global-default',
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
json_data = None
if resp.status_code == 200:
json_data = self.get_json(resp)
return resp, json_data
def get_project_preferred_store(self, extra_headers=None, use_auth=True,
user_name=None):
"""Retrieve global default secret store."""
resp = self.client.get('secret-stores/preferred',
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)
json_data = None
if resp.status_code == 200:
json_data = self.get_json(resp)
return resp, json_data
def get_a_secret_store(self, secret_store_ref, extra_headers=None,
use_auth=True, user_name=None):
"""Retrieve a specific secret store."""
resp = self.client.get(secret_store_ref, extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
json_data = None
if resp.status_code == 200:
json_data = self.get_json(resp)
return resp, json_data
def set_preferred_secret_store(self, secret_store_ref,
extra_headers=None, use_auth=True,
user_name=None):
"""Set a preferred secret store."""
project_id = None
try:
if user_name:
project_id = self.client.get_project_id_from_name(user_name)
except Exception:
pass
resp = self.client.post(secret_store_ref + '/preferred',
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
if resp.status_code == 204 and project_id:
# add tuple of store ref, user_name to cleanup later
self.created_entities.append((secret_store_ref, user_name))
return resp
def unset_preferred_secret_store(self, secret_store_ref,
extra_headers=None, use_auth=True,
user_name=None):
"""Unset a preferred secret store."""
return self.client.delete(secret_store_ref + '/preferred',
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
def cleanup_preferred_secret_store_entities(self):
for (store_ref, user_name) in self.created_entities:
self.unset_preferred_secret_store(store_ref, user_name=user_name)

View File

@ -27,7 +27,14 @@ from barbican.tests import keys
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.behaviors import secretstores_behaviors
from functionaltests.api.v1.models import secret_models
from functionaltests.common import config
CONF = config.get_config()
admin_a = CONF.rbac_users.admin_a
admin_b = CONF.rbac_users.admin_b
def get_pem_content(pem):
@ -1440,3 +1447,120 @@ class SecretsUnauthedTestCase(base.TestCase):
use_auth=False
)
self.assertEqual(401, resp.status_code)
@utils.parameterized_test_case
class SecretsMultipleBackendTestCase(base.TestCase):
def setUp(self):
super(SecretsMultipleBackendTestCase, self).setUp()
self.behaviors = secret_behaviors.SecretBehaviors(self.client)
self.ss_behaviors = secretstores_behaviors.SecretStoresBehaviors(
self.client)
self.default_secret_create_data = get_default_data()
if base.conf_multiple_backends_enabled:
resp, stores = self.ss_behaviors.get_all_secret_stores(
user_name=admin_a)
self.assertEqual(200, resp.status_code)
secret_store_ref = None
for store in stores['secret-stores']:
if not store['global_default']:
secret_store_ref = store['secret_store_ref']
break
# set preferred secret store for admin_a (project a) user
# and don't set preferred secret store for admin_b (project b) user
self.ss_behaviors.set_preferred_secret_store(secret_store_ref,
user_name=admin_a)
def tearDown(self):
self.behaviors.delete_all_created_secrets()
if base.conf_multiple_backends_enabled:
self.ss_behaviors.cleanup_preferred_secret_store_entities()
super(SecretsMultipleBackendTestCase, self).tearDown()
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset({
'symmetric_type_preferred_store': [
admin_a,
'symmetric',
base64.b64decode(get_default_payload()),
get_default_data()
],
'private_type_preferred_store': [
admin_a,
'private',
keys.get_private_key_pem(),
get_private_key_req()
],
'public_type_preferred_store': [
admin_a,
'public',
keys.get_public_key_pem(),
get_public_key_req()
],
'certificate_type_preferred_store': [
admin_a,
'certificate',
keys.get_certificate_pem(),
get_certificate_req()
],
'passphrase_type_preferred_store': [
admin_a,
'passphrase',
'mysecretpassphrase',
get_passphrase_req()
],
'symmetric_type_no_preferred_store': [
admin_b,
'symmetric',
base64.b64decode(get_default_payload()),
get_default_data()
],
'private_type_no_preferred_store': [
admin_b,
'private',
keys.get_private_key_pem(),
get_private_key_req()
],
'public_type_no_preferred_store': [
admin_b,
'public',
keys.get_public_key_pem(),
get_public_key_req()
],
'certificate_type_no_preferred_store': [
admin_b,
'certificate',
keys.get_certificate_pem(),
get_certificate_req()
],
'passphrase_type_no_preferred_store': [
admin_b,
'passphrase',
'mysecretpassphrase',
get_passphrase_req()
],
})
def test_secret_create_for(self, user_name, secret_type, expected, spec):
"""Create secrets with various secret types with multiple backends."""
test_model = secret_models.SecretModel(**spec)
test_model.secret_type = secret_type
resp, secret_ref = self.behaviors.create_secret(test_model,
user_name=user_name,
admin=user_name)
self.assertEqual(201, resp.status_code)
resp = self.behaviors.get_secret_metadata(secret_ref,
user_name=user_name)
secret_type_response = resp.model.secret_type
self.assertIsNotNone(secret_type_response)
self.assertEqual(secret_type, secret_type_response)
content_type = spec['payload_content_type']
get_resp = self.behaviors.get_secret(secret_ref,
content_type,
user_name=user_name)
self.assertEqual(expected, get_resp.content)

View File

@ -0,0 +1,213 @@
# (c) Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import testcase
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.behaviors import secretstores_behaviors
from functionaltests.common import config
CONF = config.get_config()
admin_a = CONF.rbac_users.admin_a
creator_a = CONF.rbac_users.creator_a
observer_a = CONF.rbac_users.observer_a
auditor_a = CONF.rbac_users.auditor_a
admin_b = CONF.rbac_users.admin_b
observer_b = CONF.rbac_users.observer_b
test_user_data_when_enabled = {
'with_admin_a': {'user': admin_a, 'expected_return': 200},
'with_creator_a': {'user': creator_a, 'expected_return': 403},
'with_observer_a': {'user': observer_a, 'expected_return': 403},
'with_auditor_a': {'user': auditor_a, 'expected_return': 403},
}
test_user_data_when_not_enabled = {
'with_admin_a': {'user': admin_a, 'expected_return': 404},
'with_creator_a': {'user': creator_a, 'expected_return': 403},
'with_observer_a': {'user': observer_a, 'expected_return': 403},
'with_auditor_a': {'user': auditor_a, 'expected_return': 403},
}
@utils.parameterized_test_case
class SecretStoresTestCase(base.TestCase):
"""Functional tests exercising ACL Features"""
def setUp(self):
super(SecretStoresTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.ss_behaviors = secretstores_behaviors.SecretStoresBehaviors(
self.client)
def tearDown(self):
self.ss_behaviors.cleanup_preferred_secret_store_entities()
self.secret_behaviors.delete_all_created_secrets()
super(SecretStoresTestCase, self).tearDown()
def _validate_secret_store_fields(self, secret_store):
self.assertIsNotNone(secret_store['name'])
self.assertIsNotNone(secret_store['secret_store_ref'])
self.assertIsNotNone(secret_store['secret_store_plugin'])
self.assertIsNotNone(secret_store['global_default'])
self.assertIsNotNone(secret_store['created'])
self.assertIsNotNone(secret_store['updated'])
self.assertEqual("ACTIVE", secret_store['status'])
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset(test_user_data_when_enabled)
def test_get_all_secret_stores_multiple_enabled(self, user,
expected_return):
resp, json_data = self.ss_behaviors.get_all_secret_stores(
user_name=user)
self.assertEqual(expected_return, resp.status_code)
if expected_return == 200:
self.assertIsNotNone(json_data['secret-stores'])
stores = json_data['secret-stores']
for secret_store in stores:
self._validate_secret_store_fields(secret_store)
@testcase.skipIf(base.conf_multiple_backends_enabled, 'executed only when '
'multiple backends support is NOT enabled in barbican '
'server side')
@utils.parameterized_dataset(test_user_data_when_not_enabled)
def test_get_all_secret_stores_multiple_disabled(self, user,
expected_return):
resp, _ = self.ss_behaviors.get_all_secret_stores(user_name=user)
self.assertEqual(expected_return, resp.status_code)
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset(test_user_data_when_enabled)
def test_get_global_default_multiple_enabled(self, user, expected_return):
resp, json_data = self.ss_behaviors.get_global_default(user_name=user)
self.assertEqual(expected_return, resp.status_code)
if expected_return == 200:
self._validate_secret_store_fields(json_data)
@testcase.skipIf(base.conf_multiple_backends_enabled, 'executed only when '
'multiple backends support is NOT enabled in barbican '
'server side')
@utils.parameterized_dataset(test_user_data_when_not_enabled)
def test_get_global_default_multiple_disabled(self, user, expected_return):
resp, _ = self.ss_behaviors.get_global_default(user_name=user)
self.assertEqual(expected_return, resp.status_code)
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset(test_user_data_when_enabled)
def test_get_project_preferred_multiple_enabled(self, user,
expected_return):
resp, json_data = self.ss_behaviors.get_all_secret_stores(
user_name=admin_a)
self.assertEqual(200, resp.status_code)
stores = json_data['secret-stores']
store = stores[len(stores)-1]
secret_store_ref = store['secret_store_ref']
resp = self.ss_behaviors.set_preferred_secret_store(secret_store_ref,
user_name=user)
if resp.status_code == 204:
resp, json_data = self.ss_behaviors.get_project_preferred_store(
user_name=user)
self.assertEqual(expected_return, resp.status_code)
if expected_return == 200:
self._validate_secret_store_fields(json_data)
self.assertEqual(store['secret_store_ref'],
json_data['secret_store_ref'])
@testcase.skipIf(base.conf_multiple_backends_enabled, 'executed only when '
'multiple backends support is NOT enabled in barbican '
'server side')
@utils.parameterized_dataset(test_user_data_when_not_enabled)
def test_get_project_preferred_multiple_disabled(self, user,
expected_return):
resp, _ = self.ss_behaviors.get_project_preferred_store(user_name=user)
self.assertEqual(expected_return, resp.status_code)
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset(test_user_data_when_enabled)
def test_get_a_secret_store_multiple_enabled(self, user, expected_return):
# read global default secret store via admin user as this is not a
# global default API check.
resp, json_data = self.ss_behaviors.get_global_default(
user_name=admin_a)
self.assertEqual(200, resp.status_code)
resp, json_data = self.ss_behaviors.get_a_secret_store(
json_data['secret_store_ref'], user_name=user)
self.assertEqual(expected_return, resp.status_code)
if expected_return == 200:
self._validate_secret_store_fields(json_data)
self.assertEqual(True, json_data['global_default'])
@testcase.skipIf(base.conf_multiple_backends_enabled, 'executed only when '
'multiple backends support is NOT enabled in barbican '
'server side')
@utils.parameterized_dataset(test_user_data_when_not_enabled)
def test_get_a_secret_store_multiple_disabled(self, user, expected_return):
resp, _ = self.ss_behaviors.get_project_preferred_store(user_name=user)
self.assertEqual(expected_return, resp.status_code)
@testcase.skipUnless(base.conf_multiple_backends_enabled, 'executed only '
'when multiple backends support is enabled in '
'barbican server side')
@utils.parameterized_dataset(test_user_data_when_enabled)
def test_unset_project_preferred_store_multiple_enabled(self, user,
expected_return):
resp, json_data = self.ss_behaviors.get_all_secret_stores(
user_name=admin_a)
self.assertEqual(200, resp.status_code)
stores = json_data['secret-stores']
store = stores[len(stores)-1]
secret_store_ref = store['secret_store_ref']
resp = self.ss_behaviors.set_preferred_secret_store(secret_store_ref,
user_name=user)
if resp.status_code == 204:
# after setting project preference, get preferred will return 200
resp, json_data = self.ss_behaviors.get_project_preferred_store(
user_name=user)
self.assertEqual(200, resp.status_code)
# now, remove project preferred secret store
self.ss_behaviors.unset_preferred_secret_store(
json_data['secret_store_ref'], user_name=user)
# get project preferred call should now return 404
resp, json_data = self.ss_behaviors.get_project_preferred_store(
user_name=user)
self.assertEqual(404, resp.status_code)

View File

@ -76,7 +76,8 @@ def setup_config(config_file=''):
cfg.StrOpt('override_url', default=''),
cfg.StrOpt('override_url_version', default=''),
cfg.BoolOpt('verify_ssl', default=True),
cfg.BoolOpt('server_host_href_set', default=True)
cfg.BoolOpt('server_host_href_set', default=True),
cfg.BoolOpt('server_multiple_backends_enabled', default=False)
]
TEST_CONF.register_group(keymanager_group)
TEST_CONF.register_opts(keymanager_options, group=keymanager_group)