Introduce Castellan Credential Factory

This patch introduces the credential factory which creates a
credential object based upon the values in the configuration file.
It is the second of several patches which will implement the
"Allow different Keystone Auth Support in Castellan" blueprint.

Other patches will add:
1.) barbican key manager logic and tests
2.) documentation on usage

Change-Id: I34243c7a2523d9d0aa4e86d823dd28f1beed821a
Implements: blueprint remove-keystone-dependency
This commit is contained in:
Fernando Diaz 2016-01-29 03:35:26 +00:00
parent db3c7a4e02
commit 28e0dcaf76
5 changed files with 367 additions and 2 deletions

View File

@ -62,3 +62,13 @@ class KeyManagerError(CastellanException):
class ManagedObjectNotFoundError(CastellanException):
message = u._("Key not found, uuid: %(uuid)s")
class AuthTypeInvalidError(CastellanException):
message = u._("Invalid auth_type was specified, auth_type: %(type)s")
class InsufficientCredentialDataError(CastellanException):
message = u._("Insufficient credential data was provided, either "
"\"token\" must be set in the passed conf, or a context "
"with an \"auth_token\" property must be passed.")

144
castellan/common/utils.py Normal file
View File

@ -0,0 +1,144 @@
# Copyright (c) 2016 IBM
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common utilities for Castellan.
"""
from castellan.common.credentials import keystone_password
from castellan.common.credentials import keystone_token
from castellan.common.credentials import password
from castellan.common.credentials import token
from castellan.common import exception
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
credential_opts = [
# auth_type opt
cfg.StrOpt('auth_type', default=None),
# token opt
cfg.StrOpt('token', default=None),
# password opts
cfg.StrOpt('username', default=None),
cfg.StrOpt('password', default=None),
# keystone credential opts
cfg.StrOpt('user_id', default=None),
cfg.StrOpt('user_domain_id', default=None),
cfg.StrOpt('user_domain_name', default=None),
cfg.StrOpt('trust_id', default=None),
cfg.StrOpt('domain_id', default=None),
cfg.StrOpt('domain_name', default=None),
cfg.StrOpt('project_id', default=None),
cfg.StrOpt('project_name', default=None),
cfg.StrOpt('project_domain_id', default=None),
cfg.StrOpt('project_domain_name', default=None),
cfg.BoolOpt('reauthenticate', default=True)
]
OPT_GROUP = 'key_manager'
def credential_factory(conf=None, context=None):
"""This function provides a factory for credentials.
It is used to create an appropriare credential object
from a passed configuration. This should be called before
making any calls to a key manager.
:param conf: Configuration file which this factory method uses
to generate a credential object. Note: In the future it will
become a required field.
:param context: Context used for authentication. It can be used
in conjunction with the configuration file. If no conf is passed,
then the context object will be converted to a KeystoneToken and
returned. If a conf is passed then only the 'token' is grabbed from
the context for the authentication types that require a token.
:returns: A credential object used for authenticating with the
Castellan key manager. Type of credential returned depends on
config and/or context passed.
"""
if conf:
conf.register_opts(credential_opts, group=OPT_GROUP)
if conf.key_manager.auth_type == 'token':
if conf.key_manager.token:
auth_token = conf.key_manager.token
elif context:
auth_token = context.auth_token
else:
raise exception.InsufficientCredentialDataError()
return token.Token(auth_token)
elif conf.key_manager.auth_type == 'password':
return password.Password(
conf.key_manager.username,
conf.key_manager.password)
elif conf.key_manager.auth_type == 'keystone_password':
return keystone_password.KeystonePassword(
conf.key_manager.password,
username=conf.key_manager.username,
user_id=conf.key_manager.user_id,
user_domain_id=conf.key_manager.user_domain_id,
user_domain_name=conf.key_manager.user_domain_name,
trust_id=conf.key_manager.trust_id,
domain_id=conf.key_manager.domain_id,
domain_name=conf.key_manager.domain_name,
project_id=conf.key_manager.project_id,
project_name=conf.key_manager.project_name,
project_domain_id=conf.key_manager.domain_id,
project_domain_name=conf.key_manager.domain_name,
reauthenticate=conf.key_manager.reauthenticate)
elif conf.key_manager.auth_type == 'keystone_token':
if conf.key_manager.token:
auth_token = conf.key_manager.token
elif context:
auth_token = context.auth_token
else:
raise exception.InsufficientCredentialDataError()
return keystone_token.KeystoneToken(
auth_token,
trust_id=conf.key_manager.trust_id,
domain_id=conf.key_manager.domain_id,
domain_name=conf.key_manager.domain_name,
project_id=conf.key_manager.project_id,
project_name=conf.key_manager.project_name,
project_domain_id=conf.key_manager.domain_id,
project_domain_name=conf.key_manager.domain_name,
reauthenticate=conf.key_manager.reauthenticate)
else:
LOG.error("Invalid auth_type specified.")
raise exception.AuthTypeInvalidError(
type=conf.key_manager.auth_type)
# for compatibility between _TokenData and RequestContext
if hasattr(context, 'tenant') and context.tenant:
project_id = context.tenant
elif hasattr(context, 'project_id') and context.project_id:
project_id = context.project_id
return keystone_token.KeystoneToken(
context.auth_token,
project_id=project_id)

View File

@ -28,4 +28,4 @@ def API(configuration=None):
conf.register_opts(key_manager_opts, group='key_manager')
cls = importutils.import_class(conf.key_manager.api_class)
return cls(configuration=conf)
return cls(configuration=conf)

View File

@ -20,6 +20,7 @@ try:
from castellan.key_manager import barbican_key_manager as bkm
except ImportError:
bkm = None
from castellan.common import utils
_DEFAULT_LOG_LEVELS = ['castellan=WARN']
@ -94,7 +95,11 @@ def list_opts():
:returns: a list of (group_name, opts) tuples
"""
opts = [('key_manager', km.key_manager_opts)]
key_manager_opts = []
key_manager_opts.extend(km.key_manager_opts)
key_manager_opts.extend(utils.credential_opts)
opts = [('key_manager', key_manager_opts)]
if bkm is not None:
opts.append((bkm.BARBICAN_OPT_GROUP, bkm.barbican_opts))
return opts

View File

@ -0,0 +1,206 @@
# Copyright (c) 2016 IBM
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Common utilities for Castellan.
"""
from castellan.common import exception
from castellan.common import utils
from castellan.tests import base
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_context import context
CONF = cfg.CONF
class TestUtils(base.TestCase):
def setUp(self):
super(TestUtils, self).setUp()
self.config_fixture = self.useFixture(config_fixture.Config(CONF))
CONF.register_opts(utils.credential_opts, group=utils.OPT_GROUP)
def test_token_credential(self):
token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
self.config_fixture.config(
auth_type='token',
token=token_value,
group='key_manager'
)
token_context = utils.credential_factory(conf=CONF)
token_context_class = token_context.__class__.__name__
self.assertEqual('Token', token_context_class)
self.assertEqual(token_value, token_context.token)
def test_token_credential_with_context(self):
token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
ctxt = context.RequestContext(auth_token=token_value)
self.config_fixture.config(
auth_type='token',
group='key_manager'
)
token_context = utils.credential_factory(conf=CONF, context=ctxt)
token_context_class = token_context.__class__.__name__
self.assertEqual('Token', token_context_class)
self.assertEqual(token_value, token_context.token)
def test_token_credential_config_override_context(self):
ctxt_token_value = '00000000000000000000000000000000'
ctxt = context.RequestContext(auth_token=ctxt_token_value)
conf_token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
self.config_fixture.config(
auth_type='token',
token=conf_token_value,
group='key_manager'
)
token_context = utils.credential_factory(conf=CONF, context=ctxt)
token_context_class = token_context.__class__.__name__
self.assertEqual('Token', token_context_class)
self.assertEqual(conf_token_value, token_context.token)
def test_token_credential_exception(self):
self.config_fixture.config(
auth_type='token',
group='key_manager'
)
self.assertRaises(exception.InsufficientCredentialDataError,
utils.credential_factory,
CONF)
def test_password_credential(self):
password_value = 'p4ssw0rd'
self.config_fixture.config(
auth_type='password',
password=password_value,
group='key_manager'
)
password_context = utils.credential_factory(conf=CONF)
password_context_class = password_context.__class__.__name__
self.assertEqual('Password', password_context_class)
self.assertEqual(password_value, password_context.password)
def test_keystone_token_credential(self):
token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
self.config_fixture.config(
auth_type='keystone_token',
token=token_value,
group='key_manager'
)
ks_token_context = utils.credential_factory(conf=CONF)
ks_token_context_class = ks_token_context.__class__.__name__
self.assertEqual('KeystoneToken', ks_token_context_class)
self.assertEqual(token_value, ks_token_context.token)
def test_keystone_token_credential_with_context(self):
token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
ctxt = context.RequestContext(auth_token=token_value)
self.config_fixture.config(
auth_type='keystone_token',
group='key_manager'
)
ks_token_context = utils.credential_factory(conf=CONF, context=ctxt)
ks_token_context_class = ks_token_context.__class__.__name__
self.assertEqual('KeystoneToken', ks_token_context_class)
self.assertEqual(token_value, ks_token_context.token)
def test_keystone_token_credential_config_override_context(self):
ctxt_token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
ctxt = context.RequestContext(auth_token=ctxt_token_value)
conf_token_value = 'ec9799cd921e4e0a8ab6111c08ebf065'
self.config_fixture.config(
auth_type='keystone_token',
token=conf_token_value,
group='key_manager'
)
ks_token_context = utils.credential_factory(conf=CONF, context=ctxt)
ks_token_context_class = ks_token_context.__class__.__name__
self.assertEqual('KeystoneToken', ks_token_context_class)
self.assertEqual(conf_token_value, ks_token_context.token)
def test_keystone_token_credential_exception(self):
self.config_fixture.config(
auth_type='keystone_token',
group='key_manager'
)
self.assertRaises(exception.InsufficientCredentialDataError,
utils.credential_factory,
CONF)
def test_keystone_password_credential(self):
password_value = 'p4ssw0rd'
self.config_fixture.config(
auth_type='keystone_password',
password=password_value,
group='key_manager'
)
ks_password_context = utils.credential_factory(conf=CONF)
ks_password_context_class = ks_password_context.__class__.__name__
self.assertEqual('KeystonePassword', ks_password_context_class)
self.assertEqual(password_value, ks_password_context.password)
def test_oslo_context_to_keystone_token(self):
auth_token_value = '16bd612f28ec479b8ffe8e124fc37b43'
tenant_value = '00c6ef5ad2984af2acd7d42c299935c0'
ctxt = context.RequestContext(
auth_token=auth_token_value,
tenant=tenant_value)
ks_token_context = utils.credential_factory(context=ctxt)
ks_token_context_class = ks_token_context.__class__.__name__
self.assertEqual('KeystoneToken', ks_token_context_class)
self.assertEqual(auth_token_value, ks_token_context.token)
self.assertEqual(tenant_value, ks_token_context.project_id)
def test_invalid_auth_type(self):
self.config_fixture.config(
auth_type='hotdog',
group='key_manager'
)
self.assertRaises(exception.AuthTypeInvalidError,
utils.credential_factory,
conf=CONF)