
Added auth_url to the context we pass to castellan library. In a change [1] intended to deprecate the use of auth_endpoint passed as the oslo config, it actually completely removed the use of it[2], so this change became necessary or the integration is broken. [1] - https://review.openstack.org/#/c/483457 [2] - https://review.openstack.org/#/c/483457/6/castellan/key_manager/barbican_key_manager.py@143 Change-Id: I933367fa46aa0a3dc9aedf078b1be715bfa8c054
116 lines
5.2 KiB
Python
116 lines
5.2 KiB
Python
# Copyright (c) 2016 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from castellan import key_manager, options
|
|
from castellan.common.credentials import keystone_password
|
|
from oslo_config import cfg
|
|
from swift.common.middleware.crypto.keymaster import KeyMaster
|
|
from swift.common.utils import readconf
|
|
|
|
|
|
class KmsKeyMaster(KeyMaster):
|
|
"""Middleware for retrieving a encryption root secret from an external KMS.
|
|
|
|
The middleware accesses the encryption root secret from an external key
|
|
management system (KMS), e.g., a Barbican service, using Castellan. To be
|
|
able to do so, the appropriate configuration options shall be set in the
|
|
proxy-server.conf file, or in the configuration pointed to using the
|
|
keymaster_config_path configuration value in the proxy-server.conf file.
|
|
"""
|
|
|
|
def __init__(self, app, conf):
|
|
# Call the superclass __init__() method, which calls the overridden
|
|
# self._get_root_secret() below.
|
|
super(KmsKeyMaster, self).__init__(app, conf)
|
|
|
|
def _get_root_secret(self, conf):
|
|
"""
|
|
Retrieve the root encryption secret from an external key management
|
|
system using Castellan.
|
|
|
|
:param conf: the keymaster config section from proxy-server.conf
|
|
:type conf: dict
|
|
|
|
:return: the encryption root secret binary bytes
|
|
:rtype: bytearray
|
|
"""
|
|
if self.keymaster_config_path is not None:
|
|
keymaster_opts = ['username', 'password', 'project_name',
|
|
'user_domain_name', 'project_domain_name',
|
|
'user_id', 'user_domain_id', 'trust_id',
|
|
'domain_id', 'domain_name', 'project_id',
|
|
'project_domain_id', 'reauthenticate',
|
|
'auth_endpoint', 'api_class', 'key_id']
|
|
if any(opt in conf for opt in keymaster_opts):
|
|
raise ValueError('keymaster_config_path is set, but there '
|
|
'are other config options specified: %s' %
|
|
", ".join(list(
|
|
set(keymaster_opts).intersection(conf))))
|
|
conf = readconf(self.keymaster_config_path, 'kms_keymaster')
|
|
ctxt = keystone_password.KeystonePassword(
|
|
auth_url=conf.get('auth_endpoint'),
|
|
username=conf.get('username'),
|
|
password=conf.get('password'),
|
|
project_name=conf.get('project_name'),
|
|
user_domain_name=conf.get('user_domain_name'),
|
|
project_domain_name=conf.get(
|
|
'project_domain_name'),
|
|
user_id=conf.get('user_id'),
|
|
user_domain_id=conf.get('user_domain_id'),
|
|
trust_id=conf.get('trust_id'),
|
|
domain_id=conf.get('domain_id'),
|
|
domain_name=conf.get('domain_name'),
|
|
project_id=conf.get('project_id'),
|
|
project_domain_id=conf.get('project_domain_id'),
|
|
reauthenticate=conf.get('reauthenticate'))
|
|
oslo_conf = cfg.ConfigOpts()
|
|
options.set_defaults(
|
|
oslo_conf, auth_endpoint=conf.get('auth_endpoint'),
|
|
api_class=conf.get('api_class')
|
|
)
|
|
options.enable_logging()
|
|
manager = key_manager.API(oslo_conf)
|
|
key = manager.get(ctxt, conf.get('key_id'))
|
|
if key is None:
|
|
raise ValueError("Retrieval of encryption root secret with key_id "
|
|
"'%s' returned None." % conf.get('key_id'))
|
|
try:
|
|
if (key.bit_length < 256) or (key.algorithm.lower() != "aes"):
|
|
raise ValueError('encryption root secret stored in the '
|
|
'external KMS must be an AES key of at least '
|
|
'256 bits (provided key length: %d, provided '
|
|
'key algorithm: %s)'
|
|
% (key.bit_length, key.algorithm))
|
|
if (key.format != 'RAW'):
|
|
raise ValueError('encryption root secret stored in the '
|
|
'external KMS must be in RAW format and not '
|
|
'e.g., as a base64 encoded string (format of '
|
|
'key with uuid %s: %s)' %
|
|
(conf.get('key_id'), key.format))
|
|
except Exception:
|
|
raise ValueError("Secret with key_id '%s' is not a symmetric key "
|
|
"(type: %s)" % (conf.get('key_id'),
|
|
str(type(key))))
|
|
return key.get_encoded()
|
|
|
|
|
|
def filter_factory(global_conf, **local_conf):
|
|
conf = global_conf.copy()
|
|
conf.update(local_conf)
|
|
|
|
def kms_keymaster_filter(app):
|
|
return KmsKeyMaster(app, conf)
|
|
|
|
return kms_keymaster_filter
|