Replace key manager with Castellan

Because key manager code is duplicated across several projects, a key
manager interface was moved into its own library. This goes back to
replace the old code with the new library.

Implements: blueprint use-castellan-key-manager
Change-Id: Ief8885bb4ca8d62b03cf1a52c25dd0e62c835bfe
This commit is contained in:
Kaitlin Farr 2016-07-14 12:21:26 +08:00 committed by lisali
parent e6642d97fd
commit 993ba893ca
24 changed files with 205 additions and 1403 deletions

View File

@ -25,7 +25,7 @@ import six
from cinder.db import base
from cinder import exception
from cinder.i18n import _, _LI, _LW
from cinder import keymgr
from cinder import keymgr as key_manager
service_opts = [
cfg.IntOpt('backup_metadata_version', default=2,
@ -89,7 +89,8 @@ class BackupMetadataAPI(base.Base):
continue
# Copy the encryption key uuid for backup
if key is 'encryption_key_id' and value is not None:
value = keymgr.API().copy_key(self.context, value)
km = key_manager.API(CONF)
value = km.store(self.context, km.get(self.context, value))
LOG.debug("Copying encryption key uuid for backup.")
container[type_tag][key] = value

View File

@ -1,6 +1,7 @@
[DEFAULT]
output_file = etc/cinder/cinder.conf.sample
wrap_width = 79
namespace = castellan.config
namespace = cinder
namespace = keystonemiddleware.auth_token
namespace = oslo.config

View File

@ -13,19 +13,64 @@
# License for the specific language governing permissions and limitations
# under the License.
from castellan import options as castellan_opts
from oslo_config import cfg
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_utils import importutils
keymgr_opts = [
cfg.StrOpt('api_class',
default='cinder.keymgr.conf_key_mgr.ConfKeyManager',
help='The full class name of the key manager API class'),
]
from cinder.i18n import _LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.register_opts(keymgr_opts, group='keymgr')
castellan_opts.set_defaults(cfg.CONF)
# NOTE(kfarr): This line can be removed when a value is assigned in DevStack
CONF.set_default('api_class', 'cinder.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
# NOTE(kfarr): For backwards compatibility, everything below this comment
# is deprecated for removal
api_class = None
try:
api_class = CONF.key_manager.api_class
except cfg.NoSuchOptError:
LOG.warning(_LW("key_manager.api_class is not set, will use deprecated "
"option keymgr.api_class if set"))
try:
api_class = CONF.keymgr.api_class
except cfg.NoSuchOptError:
LOG.warning(_LW("keymgr.api_class is not set"))
deprecated_barbican = 'cinder.keymgr.barbican.BarbicanKeyManager'
barbican = 'castellan.key_manager.barbican_key_manager.BarbicanKeyManager'
deprecated_mock = 'cinder.tests.unit.keymgr.mock_key_mgr.MockKeyManager'
castellan_mock = ('castellan.tests.unit.key_manager.mock_key_manager.'
'MockKeyManager')
def API():
cls = importutils.import_class(CONF.keymgr.api_class)
return cls()
def log_deprecated_warning(deprecated, castellan):
versionutils.deprecation_warning(deprecated, versionutils.NEWTON,
in_favor_of=castellan, logger=LOG)
if api_class == deprecated_barbican:
log_deprecated_warning(deprecated_barbican, barbican)
api_class = barbican
elif api_class == deprecated_mock:
log_deprecated_warning(deprecated_mock, castellan_mock)
api_class = castellan_mock
elif api_class is None:
# TODO(kfarr): key_manager.api_class should be set in DevStack, and this
# block can be removed
LOG.warning(_LW("key manager not set, using insecure default %s"),
castellan_mock)
api_class = castellan_mock
CONF.set_override('api_class', api_class, 'key_manager')
def API(conf=CONF):
cls = importutils.import_class(conf.key_manager.api_class)
return cls(conf)

View File

@ -1,338 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager implementation for Barbican
"""
import array
import base64
import binascii
import re
from barbicanclient import client as barbican_client
from keystoneclient.auth import identity
from keystoneclient import session
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import exception
from cinder.i18n import _, _LE
from cinder.keymgr import key as keymgr_key
from cinder.keymgr import key_mgr
CONF = cfg.CONF
CONF.import_opt('encryption_auth_url', 'cinder.keymgr.key_mgr', group='keymgr')
CONF.import_opt('encryption_api_url', 'cinder.keymgr.key_mgr', group='keymgr')
LOG = logging.getLogger(__name__)
URL_PATTERN = re.compile(
"(?P<url_base>http[s]?://[^/]*)[/]?(?P<url_version>(v[0-9.]+)?).*")
class BarbicanKeyManager(key_mgr.KeyManager):
"""Key Manager Interface that wraps the Barbican client API."""
def __init__(self):
self._base_url = CONF.keymgr.encryption_api_url
self._parse_barbican_api_url()
self._barbican_client = None
self._current_context = None
def _parse_barbican_api_url(self):
"""Setup member variables to reference the Barbican URL.
The key manipulation functions in this module need to use the
barbican URL with the version appended. But the barbicanclient
Client() class needs the URL without the version appended.
So set up a member variables here for each case.
"""
m = URL_PATTERN.search(self._base_url)
if m is None:
raise exception.KeyManagerError(_(
"Invalid url: must be in the form "
"'http[s]://<ipaddr>|<fqdn>[:port]/<version>', "
"url specified is: %s"), self._base_url)
url_info = dict(m.groupdict())
if 'url_version' not in url_info or url_info['url_version'] == "":
raise exception.KeyManagerError(_(
"Invalid barbican api url: version is required, "
"e.g. 'http[s]://<ipaddr>|<fqdn>[:port]/<version>' "
"url specified is: %s") % self._base_url)
# We will also need the barbican API URL without the '/v1'.
# So save that now.
self._barbican_endpoint = url_info['url_base']
def _get_barbican_client(self, ctxt):
"""Creates a client to connect to the Barbican service.
:param ctxt: the user context for authentication
:return: a Barbican Client object
:throws NotAuthorized: if the ctxt is None
:throws KeyManagerError: if ctxt is missing project_id
or project_id is None
"""
# Confirm context is provided, if not raise not authorized
if not ctxt:
msg = _("User is not authorized to use key manager.")
LOG.error(msg)
raise exception.NotAuthorized(msg)
if not hasattr(ctxt, 'project_id') or ctxt.project_id is None:
msg = _("Unable to create Barbican Client without project_id.")
LOG.error(msg)
raise exception.KeyManagerError(msg)
# If same context, return cached barbican client
if self._barbican_client and self._current_context == ctxt:
return self._barbican_client
try:
auth = identity.v3.Token(
auth_url=CONF.keymgr.encryption_auth_url,
token=ctxt.auth_token,
project_id=ctxt.project_id)
sess = session.Session(auth=auth)
self._barbican_client = barbican_client.Client(
session=sess,
endpoint=self._barbican_endpoint)
self._current_context = ctxt
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating Barbican client."))
return self._barbican_client
def create_key(self, ctxt, expiration=None, name='Cinder Volume Key',
payload_content_type='application/octet-stream', mode='CBC',
algorithm='AES', length=256):
"""Creates a key.
:param ctxt: contains information of the user and the environment
for the request (cinder/context.py)
:param expiration: the date the key will expire
:param name: a friendly name for the secret
:param payload_content_type: the format/type of the secret data
:param mode: the algorithm mode (e.g. CBC or CTR mode)
:param algorithm: the algorithm associated with the secret
:param length: the bit length of the secret
:return: the UUID of the new key
:throws Exception: if key creation fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
key_order = barbican_client.orders.create_key(
name,
algorithm,
length,
mode,
payload_content_type,
expiration)
order_ref = key_order.submit()
order = barbican_client.orders.get(order_ref)
secret_uuid = order.secret_ref.rpartition('/')[2]
return secret_uuid
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error creating key."))
def store_key(self, ctxt, key, expiration=None, name='Cinder Volume Key',
payload_content_type='application/octet-stream',
payload_content_encoding='base64', algorithm='AES',
bit_length=256, mode='CBC', from_copy=False):
"""Stores (i.e., registers) a key with the key manager.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param key: the unencrypted secret data. Known as "payload" to the
barbicanclient api
:param expiration: the expiration time of the secret in ISO 8601
format
:param name: a friendly name for the key
:param payload_content_type: the format/type of the secret data
:param payload_content_encoding: the encoding of the secret data
:param algorithm: the algorithm associated with this secret key
:param bit_length: the bit length of this secret key
:param mode: the algorithm mode used with this secret key
:param from_copy: establishes whether the function is being used
to copy a key. In case of the latter, it does not
try to decode the key
:returns: the UUID of the stored key
:throws Exception: if key storage fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
if key.get_algorithm():
algorithm = key.get_algorithm()
if payload_content_type == 'text/plain':
payload_content_encoding = None
encoded_key = key.get_encoded()
elif (payload_content_type == 'application/octet-stream' and
not from_copy):
key_list = key.get_encoded()
string_key = ''.join(map(lambda byte: "%02x" % byte, key_list))
encoded_key = base64.b64encode(binascii.unhexlify(string_key))
else:
encoded_key = key.get_encoded()
secret = barbican_client.secrets.create(name,
encoded_key,
payload_content_type,
payload_content_encoding,
algorithm,
bit_length,
None,
mode,
expiration)
secret_ref = secret.store()
secret_uuid = secret_ref.rpartition('/')[2]
return secret_uuid
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error storing key."))
def copy_key(self, ctxt, key_id):
"""Copies (i.e., clones) a key stored by barbican.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param key_id: the UUID of the key to copy
:return: the UUID of the key copy
:throws Exception: if key copying fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
secret_ref = self._create_secret_ref(key_id, barbican_client)
secret = self._get_secret(ctxt, secret_ref)
con_type = secret.content_types['default']
secret_data = self._get_secret_data(secret,
payload_content_type=con_type)
key = keymgr_key.SymmetricKey(secret.algorithm, secret_data)
copy_uuid = self.store_key(ctxt, key, secret.expiration,
secret.name, con_type,
'base64',
secret.algorithm, secret.bit_length,
secret.mode, True)
return copy_uuid
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error copying key."))
def _create_secret_ref(self, key_id, barbican_client):
"""Creates the URL required for accessing a secret.
:param key_id: the UUID of the key to copy
:param barbican_client: barbican key manager object
:return: the URL of the requested secret
"""
if not key_id:
msg = "Key ID is None"
raise exception.KeyManagerError(msg)
return self._base_url + "/secrets/" + key_id
def _get_secret_data(self,
secret,
payload_content_type='application/octet-stream'):
"""Retrieves the secret data given a secret_ref and content_type.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param secret_ref: URL to access the secret
:param payload_content_type: the format/type of the secret data
:returns: the secret data
:throws Exception: if data cannot be retrieved
"""
try:
generated_data = secret.payload
if payload_content_type == 'application/octet-stream':
secret_data = base64.b64encode(generated_data)
else:
secret_data = generated_data
return secret_data
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error getting secret data."))
def _get_secret(self, ctxt, secret_ref):
"""Creates the URL required for accessing a secret's metadata.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param secret_ref: URL to access the secret
:return: the secret's metadata
:throws Exception: if there is an error retrieving the data
"""
barbican_client = self._get_barbican_client(ctxt)
try:
return barbican_client.secrets.get(secret_ref)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error getting secret metadata."))
def get_key(self, ctxt, key_id,
payload_content_type='application/octet-stream'):
"""Retrieves the specified key.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param key_id: the UUID of the key to retrieve
:param payload_content_type: The format/type of the secret data
:return: SymmetricKey representation of the key
:throws Exception: if key retrieval fails
"""
try:
secret_ref = self._create_secret_ref(key_id, barbican_client)
secret = self._get_secret(ctxt, secret_ref)
secret_data = self._get_secret_data(secret,
payload_content_type)
if payload_content_type == 'application/octet-stream':
# convert decoded string to list of unsigned ints for each byte
key_data = array.array('B',
base64.b64decode(secret_data)).tolist()
else:
key_data = secret_data
key = keymgr_key.SymmetricKey(secret.algorithm, key_data)
return key
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error getting key."))
def delete_key(self, ctxt, key_id):
"""Deletes the specified key.
:param ctxt: contains information of the user and the environment for
the request (cinder/context.py)
:param key_id: the UUID of the key to delete
:throws Exception: if key deletion fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
secret_ref = self._create_secret_ref(key_id, barbican_client)
barbican_client.secrets.delete(secret_ref)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error deleting key."))

View File

@ -31,31 +31,30 @@ encrypted with a key provided by this key manager actually share the same
encryption key so *any* volume can be decrypted once the fixed key is known.
"""
import array
import binascii
from castellan.common.objects import symmetric_key
from castellan.key_manager import key_manager
from oslo_config import cfg
from oslo_log import log as logging
from cinder import exception
from cinder.i18n import _, _LW
from cinder.keymgr import key
from cinder.keymgr import key_mgr
key_mgr_opts = [
cfg.StrOpt('fixed_key',
help='Fixed key returned by key manager, specified in hex'),
help='Fixed key returned by key manager, specified in hex',
deprecated_group='keymgr'),
]
CONF = cfg.CONF
CONF.register_opts(key_mgr_opts, group='keymgr')
CONF.register_opts(key_mgr_opts, group='key_manager')
LOG = logging.getLogger(__name__)
class ConfKeyManager(key_mgr.KeyManager):
class ConfKeyManager(key_manager.KeyManager):
"""Key Manager that supports one key defined by the fixed_key conf option.
This key manager implementation supports all the methods specified by the
@ -64,73 +63,79 @@ class ConfKeyManager(key_mgr.KeyManager):
for each method are handled as specified by the key manager interface.
"""
def __init__(self):
super(ConfKeyManager, self).__init__()
def __init__(self, configuration):
LOG.warning(_LW('This key manager is insecure and is not recommended '
'for production deployments'))
super(ConfKeyManager, self).__init__(configuration)
self.conf = configuration
self.conf.register_opts(key_mgr_opts, group='key_manager')
self.key_id = '00000000-0000-0000-0000-000000000000'
def _generate_key(self, **kwargs):
_hex = self._generate_hex_key(**kwargs)
key_list = array.array('B', binascii.unhexlify(_hex)).tolist()
return key.SymmetricKey('AES', key_list)
def _get_key(self):
if self.conf.key_manager.fixed_key is None:
raise ValueError(_('config option key_manager.fixed_key is not '
'defined'))
hex_key = self.conf.key_manager.fixed_key
key_bytes = bytes(binascii.unhexlify(hex_key))
return symmetric_key.SymmetricKey('AES',
len(key_bytes) * 8,
key_bytes)
def _generate_hex_key(self, **kwargs):
if CONF.keymgr.fixed_key is None:
LOG.warning(
_LW('config option keymgr.fixed_key has not been defined:'
' some operations may fail unexpectedly'))
raise ValueError(_('keymgr.fixed_key not defined'))
return CONF.keymgr.fixed_key
def create_key(self, context, **kwargs):
"""Creates a symmetric key.
def create_key(self, ctxt, **kwargs):
"""Creates a key.
This implementation returns a UUID for the created key. A
NotAuthorized exception is raised if the specified context is None.
This implementation returns a UUID for the key read from the
configuration file. A NotAuthorized exception is raised if the
specified context is None.
"""
if ctxt is None:
if context is None:
raise exception.NotAuthorized()
return self.key_id
def store_key(self, ctxt, key, **kwargs):
def create_key_pair(self, context, **kwargs):
raise NotImplementedError(
"ConfKeyManager does not support asymmetric keys")
def store(self, context, managed_object, **kwargs):
"""Stores (i.e., registers) a key with the key manager."""
if ctxt is None:
if context is None:
raise exception.NotAuthorized()
if key != self._generate_key():
if managed_object != self._get_key():
raise exception.KeyManagerError(
reason="cannot store arbitrary keys")
return self.key_id
def copy_key(self, ctxt, key_id, **kwargs):
if ctxt is None:
raise exception.NotAuthorized()
return self.key_id
def get_key(self, ctxt, key_id, **kwargs):
def get(self, context, managed_object_id):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A NotAuthorized exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if ctxt is None:
if context is None:
raise exception.NotAuthorized()
if key_id != self.key_id:
raise KeyError(key_id)
if managed_object_id != self.key_id:
raise KeyError(str(managed_object_id) + " != " + str(self.key_id))
return self._generate_key()
return self._get_key()
def delete_key(self, ctxt, key_id, **kwargs):
if ctxt is None:
def delete(self, context, managed_object_id):
"""Represents deleting the key.
Because the ConfKeyManager has only one key, which is read from the
configuration file, the key is not actually deleted when this is
called.
"""
if context is None:
raise exception.NotAuthorized()
if key_id != self.key_id:
if managed_object_id != self.key_id:
raise exception.KeyManagerError(
reason="cannot delete non-existent key")
LOG.warning(_LW("Not deleting key %s"), key_id)
LOG.warning(_LW("Not deleting key %s"), managed_object_id)

View File

@ -1,90 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base Key and SymmetricKey Classes
This module defines the Key and SymmetricKey classes. The Key class is the base
class to represent all encryption keys. The basis for this class was copied
from Java.
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Key(object):
"""Base class to represent all keys."""
@abc.abstractmethod
def get_algorithm(self):
"""Returns the key's algorithm.
Returns the key's algorithm. For example, "DSA" indicates that this key
is a DSA key and "AES" indicates that this key is an AES key.
"""
pass
@abc.abstractmethod
def get_format(self):
"""Returns the encoding format.
Returns the key's encoding format or None if this key is not encoded.
"""
pass
@abc.abstractmethod
def get_encoded(self):
"""Returns the key in the format specified by its encoding."""
pass
class SymmetricKey(Key):
"""This class represents symmetric keys."""
def __init__(self, alg, key):
"""Create a new SymmetricKey object.
The arguments specify the algorithm for the symmetric encryption and
the bytes for the key.
"""
self.alg = alg
self.key = key
def get_algorithm(self):
"""Returns the algorithm for symmetric encryption."""
return self.alg
def get_format(self):
"""This method returns 'RAW'."""
return "RAW"
def get_encoded(self):
"""Returns the key in its encoded format."""
return self.key
def __eq__(self, other):
if isinstance(other, SymmetricKey):
return (self.alg == other.alg and
self.key == other.key)
return NotImplemented
def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
return not result

View File

@ -1,117 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager API
"""
import abc
from oslo_config import cfg
import six
encryption_opts = [
cfg.StrOpt('encryption_auth_url',
default='http://localhost:5000/v3',
help='Authentication url for encryption service.'),
cfg.StrOpt('encryption_api_url',
default='http://localhost:9311/v1',
help='Url for encryption service.'),
]
CONF = cfg.CONF
CONF.register_opts(encryption_opts, group='keymgr')
@six.add_metaclass(abc.ABCMeta)
class KeyManager(object):
"""Base Key Manager Interface
A Key Manager is responsible for managing encryption keys for volumes. A
Key Manager is responsible for creating, reading, and deleting keys.
"""
@abc.abstractmethod
def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
**kwargs):
"""Creates a key.
This method creates a key and returns the key's UUID. If the specified
context does not permit the creation of keys, then a NotAuthorized
exception should be raised.
"""
pass
@abc.abstractmethod
def store_key(self, ctxt, key, expiration=None, **kwargs):
"""Stores (i.e., registers) a key with the key manager.
This method stores the specified key and returns its UUID that
identifies it within the key manager. If the specified context does
not permit the creation of keys, then a NotAuthorized exception should
be raised.
"""
pass
@abc.abstractmethod
def copy_key(self, ctxt, key_id, **kwargs):
"""Copies (i.e., clones) a key stored by the key manager.
This method copies the specified key and returns the copy's UUID. If
the specified context does not permit copying keys, then a
NotAuthorized error should be raised.
Implementation note: This method should behave identically to
.. code-block:: python
store_key(context, get_key(context, <encryption key UUID>))
although it is preferable to perform this operation within the key
manager to avoid unnecessary handling of the key material.
"""
pass
@abc.abstractmethod
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the specified key.
Implementations should verify that the caller has permissions to
retrieve the key by checking the context object passed in as ctxt. If
the user lacks permission then a NotAuthorized exception is raised.
If the specified key does not exist, then a KeyError should be raised.
Implementations should preclude users from discerning the UUIDs of
keys that belong to other users by repeatedly calling this method.
That is, keys that belong to other users should be considered "non-
existent" and completely invisible.
"""
pass
@abc.abstractmethod
def delete_key(self, ctxt, key_id, **kwargs):
"""Deletes the specified key.
Implementations should verify that the caller has permission to delete
the key by checking the context object (ctxt). A NotAuthorized
exception should be raised if the caller lacks permission.
If the specified key does not exist, then a KeyError should be raised.
Implementations should preclude users from discerning the UUIDs of
keys that belong to other users by repeatedly calling this method.
That is, keys that belong to other users should be considered "non-
existent" and completely invisible.
"""
pass

View File

@ -1,40 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager implementation that raises NotImplementedError
"""
from cinder.keymgr import key_mgr
class NotImplementedKeyManager(key_mgr.KeyManager):
"""Key Manager interface that raises NotImplementedError"""
def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
**kwargs):
raise NotImplementedError()
def store_key(self, ctxt, key, expiration=None, **kwargs):
raise NotImplementedError()
def copy_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()
def get_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()
def delete_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()

View File

@ -41,9 +41,7 @@ from cinder.db import base as cinder_db_base
from cinder import exception as cinder_exception
from cinder.image import glance as cinder_image_glance
from cinder.image import image_utils as cinder_image_imageutils
import cinder.keymgr
from cinder.keymgr import conf_key_mgr as cinder_keymgr_confkeymgr
from cinder.keymgr import key_mgr as cinder_keymgr_keymgr
from cinder.message import api as cinder_message_api
from cinder import quota as cinder_quota
from cinder.scheduler import driver as cinder_scheduler_driver
@ -200,12 +198,6 @@ def list_opts():
cinder_zonemanager_fczonemanager.zone_manager_opts,
cinder_zonemanager_drivers_cisco_ciscofczonedriver.cisco_opts,
)),
('KEYMGR',
itertools.chain(
cinder_keymgr_keymgr.encryption_opts,
cinder.keymgr.keymgr_opts,
cinder_keymgr_confkeymgr.key_mgr_opts,
)),
('DEFAULT',
itertools.chain(
cinder_backup_driver.service_opts,
@ -380,6 +372,10 @@ def list_opts():
itertools.chain(
cinder_coordination.coordination_opts,
)),
('KEY_MANAGER',
itertools.chain(
cinder_keymgr_confkeymgr.key_mgr_opts,
)),
('BACKEND',
itertools.chain(
[cinder_cmd_volume.host_opt],

View File

@ -23,6 +23,7 @@ from cinder.backup import driver
from cinder import context
from cinder import db
from cinder import exception
from cinder import keymgr as key_manager
from cinder import objects
from cinder import test
from cinder.tests.unit.backup import fake_service
@ -286,8 +287,9 @@ class BackupMetadataAPITestCase(test.TestCase):
def _create_encrypted_volume_db_entry(self, id, type_id, encrypted):
if encrypted:
key_id = key_manager.API().key_id
vol = {'id': id, 'size': 1, 'status': 'available',
'volume_type_id': type_id, 'encryption_key_id': 'fake_id'}
'volume_type_id': type_id, 'encryption_key_id': key_id}
else:
vol = {'id': id, 'size': 1, 'status': 'available',
'volume_type_id': type_id, 'encryption_key_id': None}

View File

@ -24,7 +24,8 @@ CONF = cfg.CONF
CONF.import_opt('policy_file', 'cinder.policy', group='oslo_policy')
CONF.import_opt('volume_driver', 'cinder.volume.manager')
CONF.import_opt('backup_driver', 'cinder.backup.manager')
CONF.import_opt('fixed_key', 'cinder.keymgr.conf_key_mgr', group='keymgr')
CONF.import_opt('api_class', 'cinder.keymgr', group='key_manager')
CONF.import_opt('fixed_key', 'cinder.keymgr.conf_key_mgr', group='key_manager')
CONF.import_opt('scheduler_driver', 'cinder.scheduler.manager')
def_vol_type = 'fake_vol_type'
@ -41,7 +42,10 @@ def set_defaults(conf):
conf.set_default('policy_file', 'cinder.tests.unit/policy.json',
group='oslo_policy')
conf.set_default('backup_driver', 'cinder.tests.unit.backup.fake_service')
conf.set_default('fixed_key', default='0' * 64, group='keymgr')
conf.set_default('api_class',
'cinder.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
conf.set_default('fixed_key', default='0' * 64, group='key_manager')
conf.set_default('scheduler_driver',
'cinder.scheduler.filter_scheduler.FilterScheduler')
conf.set_default('state_path', os.path.abspath(

View File

@ -17,8 +17,8 @@
"""Implementation of a fake key manager."""
from cinder.tests.unit.keymgr import mock_key_mgr
from castellan.tests.unit.key_manager import mock_key_manager
def fake_api():
return mock_key_mgr.MockKeyManager()
def fake_api(configuration=None):
return mock_key_manager.MockKeyManager(configuration)

View File

@ -1,128 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A mock implementation of a key manager that stores keys in a dictionary.
This key manager implementation is primarily intended for testing. In
particular, it does not store keys persistently. Lack of a centralized key
store also makes this implementation unsuitable for use among different
services.
Note: Instantiating this class multiple times will create separate key stores.
Keys created in one instance will not be accessible from other instances of
this class.
"""
import array
import binascii
import uuid
from cinder import exception
from cinder.keymgr import key
from cinder.keymgr import key_mgr
from cinder.volume import utils
class MockKeyManager(key_mgr.KeyManager):
"""Mocking manager for integration tests.
This mock key manager implementation supports all the methods specified
by the key manager interface. This implementation stores keys within a
dictionary, and as a result, it is not acceptable for use across different
services. Side effects (e.g., raising exceptions) for each method are
handled as specified by the key manager interface.
This key manager is not suitable for use in production deployments.
"""
def __init__(self):
self.keys = {}
def _generate_hex_key(self, length):
if not length:
length = 256
# hex digit => 4 bits
hex_encoded = utils.generate_password(length=length // 4,
symbolgroups='0123456789ABCDEF')
return hex_encoded
def _generate_key(self, **kwargs):
_hex = self._generate_hex_key(kwargs.get('length'))
key_bytes = array.array('B', binascii.unhexlify(_hex)).tolist()
algorithm = kwargs.get('algorithm', 'AES')
return key.SymmetricKey(algorithm, key_bytes)
def create_key(self, ctxt, **kwargs):
"""Creates a key.
This implementation returns a UUID for the created key. A
NotAuthorized exception is raised if the specified context is None.
"""
if ctxt is None:
raise exception.NotAuthorized()
key = self._generate_key(**kwargs)
return self.store_key(ctxt, key)
def _generate_key_id(self):
key_id = str(uuid.uuid4())
while key_id in self.keys:
key_id = str(uuid.uuid4())
return key_id
def store_key(self, ctxt, key, **kwargs):
"""Stores (i.e., registers) a key with the key manager."""
if ctxt is None:
raise exception.NotAuthorized()
key_id = self._generate_key_id()
self.keys[key_id] = key
return key_id
def copy_key(self, ctxt, key_id, **kwargs):
if ctxt is None:
raise exception.NotAuthorized()
copied_key_id = self._generate_key_id()
self.keys[copied_key_id] = self.keys[key_id]
return copied_key_id
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A NotAuthorized exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if ctxt is None:
raise exception.NotAuthorized()
return self.keys[key_id]
def delete_key(self, ctxt, key_id, **kwargs):
"""Deletes the key identified by the specified id.
A NotAuthorized exception is raised if the context is None and a
KeyError is raised if the UUID is invalid.
"""
if ctxt is None:
raise exception.NotAuthorized()
del self.keys[key_id]

View File

@ -1,292 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the barbican key manager.
"""
import array
import base64
import binascii
import mock
from oslo_config import cfg
from cinder import exception
from cinder.keymgr import barbican
from cinder.keymgr import key as keymgr_key
from cinder.tests.unit.keymgr import test_key_mgr
CONF = cfg.CONF
CONF.import_opt('encryption_auth_url', 'cinder.keymgr.key_mgr', group='keymgr')
CONF.import_opt('encryption_api_url', 'cinder.keymgr.key_mgr', group='keymgr')
class BarbicanKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return barbican.BarbicanKeyManager()
def setUp(self):
super(BarbicanKeyManagerTestCase, self).setUp()
# Create fake auth_token
self.ctxt = mock.Mock()
self.ctxt.auth_token = "fake_token"
self.ctxt.project_id = "fake_project_id"
# Create mock barbican client
self._build_mock_barbican()
# Create a key_id, secret_ref, pre_hex, and hex to use
self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40"
self.secret_ref = self.key_mgr._create_secret_ref(self.key_id,
self.mock_barbican)
self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY="
self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4"
"534ae16")
self.original_api_url = CONF.keymgr.encryption_api_url
self.addCleanup(self._restore)
def _restore(self):
if hasattr(self, 'original_key'):
keymgr_key.SymmetricKey = self.original_key
if hasattr(self, 'original_base64'):
base64.b64encode = self.original_base64
if hasattr(self, 'original_api_url'):
CONF.keymgr.encryption_api_url = self.original_api_url
def _build_mock_barbican(self):
self.mock_barbican = mock.MagicMock(name='mock_barbican')
# Set commonly used methods
self.get = self.mock_barbican.secrets.get
self.delete = self.mock_barbican.secrets.delete
self.store = self.mock_barbican.secrets.store
self.create = self.mock_barbican.secrets.create
self.key_mgr._barbican_client = self.mock_barbican
self.key_mgr._current_context = self.ctxt
def _build_mock_symKey(self):
self.mock_symKey = mock.Mock()
def fake_sym_key(alg, key):
self.mock_symKey.get_encoded.return_value = key
self.mock_symKey.get_algorithm.return_value = alg
return self.mock_symKey
self.original_key = keymgr_key.SymmetricKey
keymgr_key.SymmetricKey = fake_sym_key
def _build_mock_base64(self):
def fake_base64_b64encode(string):
return self.pre_hex
self.original_base64 = base64.b64encode
base64.b64encode = fake_base64_b64encode
def test_copy_key(self):
# Create metadata for original secret
original_secret_metadata = mock.Mock()
original_secret_metadata.algorithm = 'fake_algorithm'
original_secret_metadata.bit_length = 'fake_bit_length'
original_secret_metadata.name = 'original_name'
original_secret_metadata.expiration = 'fake_expiration'
original_secret_metadata.mode = 'fake_mode'
content_types = {'default': 'fake_type'}
original_secret_metadata.content_types = content_types
original_secret_data = mock.Mock()
original_secret_metadata.payload = original_secret_data
self.get.return_value = original_secret_metadata
# Create the mock key
self._build_mock_symKey()
# Copy the original
self.key_mgr.copy_key(self.ctxt, self.key_id)
# Assert proper methods were called
self.get.assert_called_once_with(self.secret_ref)
self.create.assert_called_once_with(
original_secret_metadata.name,
self.mock_symKey.get_encoded(),
content_types['default'],
'base64',
original_secret_metadata.algorithm,
original_secret_metadata.bit_length,
None,
original_secret_metadata.mode,
original_secret_metadata.expiration)
self.create.return_value.store.assert_called_once_with()
def test_copy_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.NotAuthorized,
self.key_mgr.copy_key, None, self.key_id)
def test_create_key(self):
# Create order_ref_url and assign return value
order_ref_url = ("http://localhost:9311/v1/None/orders/"
"4fe939b7-72bc-49aa-bd1e-e979589858af")
key_order = mock.Mock()
self.mock_barbican.orders.create_key.return_value = key_order
key_order.submit.return_value = order_ref_url
# Create order and assign return value
order = mock.Mock()
order.secret_ref = self.secret_ref
self.mock_barbican.orders.get.return_value = order
# Create the key, get the UUID
returned_uuid = self.key_mgr.create_key(self.ctxt)
self.mock_barbican.orders.get.assert_called_once_with(order_ref_url)
self.assertEqual(self.key_id, returned_uuid)
def test_create_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.NotAuthorized,
self.key_mgr.create_key, None)
def test_delete_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.NotAuthorized,
self.key_mgr.delete_key, None, self.key_id)
def test_delete_key(self):
self.key_mgr.delete_key(self.ctxt, self.key_id)
self.delete.assert_called_once_with(self.secret_ref)
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None)
def test_get_key(self):
self._build_mock_base64()
content_type = 'application/octet-stream'
key = self.key_mgr.get_key(self.ctxt, self.key_id, content_type)
self.get.assert_called_once_with(self.secret_ref)
encoded = array.array('B', binascii.unhexlify(self.hex)).tolist()
self.assertEqual(encoded, key.get_encoded())
def test_get_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.NotAuthorized,
self.key_mgr.get_key, None, self.key_id)
def test_get_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get_key, self.ctxt, None)
def test_store_key_base64(self):
# Create Key to store
secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist()
_key = keymgr_key.SymmetricKey('AES', secret_key)
# Define the return values
secret = mock.Mock()
self.create.return_value = secret
secret.store.return_value = self.secret_ref
# Store the Key
returned_uuid = self.key_mgr.store_key(self.ctxt, _key, bit_length=32)
self.create.assert_called_once_with('Cinder Volume Key',
b'AQKgsw==',
'application/octet-stream',
'base64',
'AES', 32, None, 'CBC',
None)
self.assertEqual(self.key_id, returned_uuid)
def test_store_key_plaintext(self):
# Create the plaintext key
secret_key_text = "This is a test text key."
_key = keymgr_key.SymmetricKey('AES', secret_key_text)
# Store the Key
self.key_mgr.store_key(self.ctxt, _key,
payload_content_type='text/plain',
payload_content_encoding=None)
self.create.assert_called_once_with('Cinder Volume Key',
secret_key_text,
'text/plain',
None,
'AES', 256, None, 'CBC',
None)
self.create.return_value.store.assert_called_once_with()
def test_store_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.NotAuthorized,
self.key_mgr.store_key, None, None)
def test_null_project_id(self):
self.key_mgr._barbican_client = None
self.ctxt.project_id = None
self.assertRaises(exception.KeyManagerError,
self.key_mgr.create_key, self.ctxt)
def test_ctxt_without_project_id(self):
self.key_mgr._barbican_client = None
del self.ctxt.project_id
self.assertRaises(exception.KeyManagerError,
self.key_mgr.create_key, self.ctxt)
@mock.patch('cinder.keymgr.barbican.identity.v3.Token')
@mock.patch('cinder.keymgr.barbican.session.Session')
@mock.patch('cinder.keymgr.barbican.barbican_client.Client')
def test_ctxt_with_project_id(self, mock_client, mock_session,
mock_token):
# set client to None so that client creation will occur
self.key_mgr._barbican_client = None
# mock the return values
mock_auth = mock.Mock()
mock_token.return_value = mock_auth
mock_sess = mock.Mock()
mock_session.return_value = mock_sess
# mock the endpoint
mock_endpoint = mock.Mock()
self.key_mgr._barbican_endpoint = mock_endpoint
self.key_mgr.create_key(self.ctxt)
# assert proper calls occurred, including with project_id
mock_token.assert_called_once_with(
auth_url=CONF.keymgr.encryption_auth_url,
token=self.ctxt.auth_token,
project_id=self.ctxt.project_id)
mock_session.assert_called_once_with(auth=mock_auth)
mock_client.assert_called_once_with(session=mock_sess,
endpoint=mock_endpoint)
def test_parse_barbican_api_url(self):
# assert that the correct format is handled correctly
CONF.keymgr.encryption_api_url = "http://host:port/v1/"
dummy = barbican.BarbicanKeyManager()
self.assertEqual(dummy._barbican_endpoint, "http://host:port")
# assert that invalid api url formats will raise an exception
CONF.keymgr.encryption_api_url = "http://host:port/"
self.assertRaises(exception.KeyManagerError,
barbican.BarbicanKeyManager)
CONF.keymgr.encryption_api_url = "http://host:port/secrets"
self.assertRaises(exception.KeyManagerError,
barbican.BarbicanKeyManager)

View File

@ -17,40 +17,40 @@
Test cases for the conf key manager.
"""
import array
import binascii
from castellan.common.objects import symmetric_key as key
from oslo_config import cfg
from cinder import context
from cinder import exception
from cinder.keymgr import conf_key_mgr
from cinder.keymgr import key
from cinder.tests.unit.keymgr import test_key_mgr
from cinder import test
CONF = cfg.CONF
CONF.import_opt('fixed_key', 'cinder.keymgr.conf_key_mgr', group='keymgr')
CONF.import_opt('fixed_key', 'cinder.keymgr.conf_key_mgr', group='key_manager')
class ConfKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
class ConfKeyManagerTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
super(ConfKeyManagerTestCase, self).__init__(*args, **kwargs)
self._hex_key = '1' * 64
def _create_key_manager(self):
CONF.set_default('fixed_key', default=self._hex_key, group='keymgr')
return conf_key_mgr.ConfKeyManager()
CONF.set_default('fixed_key', default=self._hex_key,
group='key_manager')
return conf_key_mgr.ConfKeyManager(CONF)
def setUp(self):
super(ConfKeyManagerTestCase, self).setUp()
self.key_mgr = self._create_key_manager()
self.ctxt = context.RequestContext('fake', 'fake')
self.key_id = '00000000-0000-0000-0000-000000000000'
encoded = array.array('B', binascii.unhexlify(self._hex_key)).tolist()
self.key = key.SymmetricKey('AES', encoded)
encoded = bytes(binascii.unhexlify(self._hex_key))
self.key = key.SymmetricKey('AES', len(encoded) * 8, encoded)
def test___init__(self):
self.assertEqual(self.key_id, self.key_mgr.key_id)
@ -65,60 +65,54 @@ class ConfKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.create_key, None)
def test_store_key(self):
key_id = self.key_mgr.store_key(self.ctxt, self.key)
def test_create_key_pair(self):
self.assertRaises(NotImplementedError,
self.key_mgr.create_key_pair, self.ctxt)
actual_key = self.key_mgr.get_key(self.ctxt, key_id)
def test_create_key_pair_null_context(self):
self.assertRaises(NotImplementedError,
self.key_mgr.create_key_pair, None)
def test_store_key(self):
key_id = self.key_mgr.store(self.ctxt, self.key)
actual_key = self.key_mgr.get(self.ctxt, key_id)
self.assertEqual(self.key, actual_key)
def test_store_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.store_key, None, self.key)
self.key_mgr.store, None, self.key)
def test_store_key_invalid(self):
encoded = self.key.get_encoded()
inverse_key = key.SymmetricKey('AES', [~b for b in encoded])
encoded = bytes(binascii.unhexlify('0' * 64))
inverse_key = key.SymmetricKey('AES', len(encoded) * 8, encoded)
self.assertRaises(exception.KeyManagerError,
self.key_mgr.store_key, self.ctxt, inverse_key)
def test_copy_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
key = self.key_mgr.get_key(self.ctxt, key_id)
copied_key_id = self.key_mgr.copy_key(self.ctxt, key_id)
copied_key = self.key_mgr.get_key(self.ctxt, copied_key_id)
self.assertEqual(key_id, copied_key_id)
self.assertEqual(key, copied_key)
def test_copy_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.copy_key, None, None)
self.key_mgr.store, self.ctxt, inverse_key)
def test_delete_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
self.key_mgr.delete_key(self.ctxt, key_id)
self.key_mgr.delete(self.ctxt, key_id)
# cannot delete key -- might have lingering references
self.assertEqual(self.key,
self.key_mgr.get_key(self.ctxt, self.key_id))
self.key_mgr.get(self.ctxt, self.key_id))
def test_delete_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.delete_key, None, None)
self.key_mgr.delete, None, None)
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None)
self.key_mgr.delete, self.ctxt, None)
def test_get_key(self):
self.assertEqual(self.key,
self.key_mgr.get_key(self.ctxt, self.key_id))
self.key_mgr.get(self.ctxt, self.key_id))
def test_get_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.get_key, None, None)
self.key_mgr.get, None, None)
def test_get_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.get_key, self.ctxt, None)
self.assertRaises(KeyError, self.key_mgr.get, self.ctxt, None)

View File

@ -1,65 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the key classes.
"""
from cinder.keymgr import key
from cinder import test
class KeyTestCase(test.TestCase):
def _create_key(self):
raise NotImplementedError()
def setUp(self):
super(KeyTestCase, self).setUp()
self.key = self._create_key()
class SymmetricKeyTestCase(KeyTestCase):
def _create_key(self):
return key.SymmetricKey(self.algorithm, self.encoded)
def setUp(self):
self.algorithm = 'AES'
self.encoded = [0] * 32
super(SymmetricKeyTestCase, self).setUp()
def test_get_algorithm(self):
self.assertEqual(self.algorithm, self.key.get_algorithm())
def test_get_format(self):
self.assertEqual('RAW', self.key.get_format())
def test_get_encoded(self):
self.assertEqual(self.encoded, self.key.get_encoded())
def test___eq__(self):
self.assertTrue(self.key == self.key)
self.assertFalse(self.key is None)
self.assertFalse(None == self.key)
def test___ne__(self):
self.assertFalse(self.key != self.key)
self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)

View File

@ -1,33 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the key manager.
"""
from cinder import test
class KeyManagerTestCase(test.TestCase):
def __init__(self, *args, **kwargs):
super(KeyManagerTestCase, self).__init__(*args, **kwargs)
def _create_key_manager(self):
raise NotImplementedError()
def setUp(self):
super(KeyManagerTestCase, self).setUp()
self.key_mgr = self._create_key_manager()

View File

@ -1,102 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the mock key manager.
"""
import array
from cinder import context
from cinder import exception
from cinder.keymgr import key as keymgr_key
from cinder.tests.unit.keymgr import mock_key_mgr
from cinder.tests.unit.keymgr import test_key_mgr
class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return mock_key_mgr.MockKeyManager()
def setUp(self):
super(MockKeyManagerTestCase, self).setUp()
self.ctxt = context.RequestContext('fake', 'fake')
def test_create_key(self):
key_id_1 = self.key_mgr.create_key(self.ctxt)
key_id_2 = self.key_mgr.create_key(self.ctxt)
# ensure that the UUIDs are unique
self.assertNotEqual(key_id_1, key_id_2)
def test_create_key_with_length(self):
for length in [64, 128, 256]:
key_id = self.key_mgr.create_key(self.ctxt, length=length)
key = self.key_mgr.get_key(self.ctxt, key_id)
self.assertEqual(length // 8, len(key.get_encoded()))
def test_create_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.create_key, None)
def test_store_key(self):
secret_key = array.array('B', b'\x00' * 32).tolist()
_key = keymgr_key.SymmetricKey('AES', secret_key)
key_id = self.key_mgr.store_key(self.ctxt, _key)
actual_key = self.key_mgr.get_key(self.ctxt, key_id)
self.assertEqual(_key, actual_key)
def test_store_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.store_key, None, None)
def test_copy_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
key = self.key_mgr.get_key(self.ctxt, key_id)
copied_key_id = self.key_mgr.copy_key(self.ctxt, key_id)
copied_key = self.key_mgr.get_key(self.ctxt, copied_key_id)
self.assertNotEqual(key_id, copied_key_id)
self.assertEqual(key, copied_key)
def test_copy_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.copy_key, None, None)
def test_get_key(self):
pass
def test_get_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.get_key, None, None)
def test_get_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.get_key, self.ctxt, None)
def test_delete_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
self.key_mgr.delete_key(self.ctxt, key_id)
self.assertRaises(KeyError, self.key_mgr.get_key, self.ctxt, key_id)
def test_delete_null_context(self):
self.assertRaises(exception.NotAuthorized,
self.key_mgr.delete_key, None, None)
def test_delete_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.delete_key, self.ctxt, None)

View File

@ -1,50 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the not implemented key manager.
"""
from cinder.keymgr import not_implemented_key_mgr
from cinder.tests.unit.keymgr import test_key_mgr
class NotImplementedKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return not_implemented_key_mgr.NotImplementedKeyManager()
def setUp(self):
super(NotImplementedKeyManagerTestCase, self).setUp()
def test_create_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.create_key, None)
def test_store_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.store_key, None, None)
def test_copy_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.copy_key, None, None)
def test_get_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.get_key, None, None)
def test_delete_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.delete_key, None, None)

View File

@ -47,7 +47,7 @@ from cinder import coordination
from cinder import db
from cinder import exception
from cinder.image import image_utils
from cinder import keymgr
from cinder import keymgr as key_manager
from cinder.message import defined_messages
from cinder.message import resource_types
from cinder import objects
@ -1046,7 +1046,7 @@ class VolumeTestCase(BaseVolumeTestCase):
volume_type=db_vol_type)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_aes(self):
ctxt = context.get_admin_context()
@ -1076,9 +1076,9 @@ class VolumeTestCase(BaseVolumeTestCase):
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get_key(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.key) * 8)
self.assertEqual('aes', key.alg)
key = key_manager.get(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.get_encoded()) * 8)
self.assertEqual('aes', key.algorithm)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@ -1086,7 +1086,7 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_blowfish(self):
ctxt = context.get_admin_context()
@ -1116,9 +1116,8 @@ class VolumeTestCase(BaseVolumeTestCase):
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get_key(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.key) * 8)
self.assertEqual('blowfish', key.alg)
key = key_manager.get(self.context, volume['encryption_key_id'])
self.assertEqual('blowfish', key.algorithm)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@ -1136,13 +1135,16 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.create_volume(self.context, volume['id'])
self.assertEqual(fake.PROVIDER_ID, volume['provider_id'])
@mock.patch.object(keymgr, 'API', new=fake_keymgr.fake_api)
@mock.patch.object(key_manager, 'API', new=fake_keymgr.fake_api)
def test_create_delete_volume_with_encrypted_volume_type(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER})
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
@ -2013,10 +2015,12 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertRaises(exception.VolumeNotFound, db.volume_get,
self.context, src_vol_id)
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of an encrypted volume"""
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
@ -2024,7 +2028,8 @@ class VolumeTestCase(BaseVolumeTestCase):
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER})
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
volume_api = cinder.volume.api.API()
@ -2061,15 +2066,17 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertIsNotNone(volume_dst['encryption_key_id'])
key_manager = volume_api.key_manager # must use *same* key manager
volume_src_key = key_manager.get_key(self.context,
volume_src['encryption_key_id'])
volume_dst_key = key_manager.get_key(self.context,
volume_dst['encryption_key_id'])
volume_src_key = key_manager.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = key_manager.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_create_volume_from_encrypted_volume(self):
"""Test volume can be created from an encrypted volume."""
self.stubs.Set(keymgr, 'API', fake_keymgr.fake_api)
self.stubs.Set(key_manager, 'API', fake_keymgr.fake_api)
cipher = 'aes-xts-plain64'
key_size = 256
volume_api = cinder.volume.api.API()
@ -2081,7 +2088,8 @@ class VolumeTestCase(BaseVolumeTestCase):
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER})
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
'LUKS')
@ -2107,11 +2115,11 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertIsNotNone(volume_src['encryption_key_id'])
self.assertIsNotNone(volume_dst['encryption_key_id'])
key_manager = volume_api.key_manager # must use *same* key manager
volume_src_key = key_manager.get_key(self.context,
volume_src['encryption_key_id'])
volume_dst_key = key_manager.get_key(self.context,
volume_dst['encryption_key_id'])
km = volume_api.key_manager # must use *same* key manager
volume_src_key = km.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = km.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_delete_encrypted_volume(self):
@ -2121,7 +2129,7 @@ class VolumeTestCase(BaseVolumeTestCase):
vol_api = cinder.volume.api.API()
with mock.patch.object(
vol_api.key_manager,
'delete_key',
'delete',
side_effect=Exception):
self.assertRaises(exception.InvalidVolume,
vol_api.delete,
@ -6409,7 +6417,7 @@ class ImageVolumeCacheTestCase(BaseVolumeTestCase):
volume = tests_utils.create_volume(self.context, **volume_params)
with mock.patch.object(
volume_api.key_manager, 'delete_key') as key_del_mock:
volume_api.key_manager, 'delete') as key_del_mock:
key_del_mock.side_effect = Exception("Key not found")
volume_api.delete(self.context, volume)

View File

@ -17,6 +17,7 @@
import ddt
import mock
from castellan.tests.unit.key_manager import mock_key_manager
from oslo_utils import imageutils
from cinder import context
@ -27,7 +28,6 @@ from cinder.tests.unit import fake_constants as fakes
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit.keymgr import mock_key_mgr
from cinder.tests.unit import utils
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import create_volume
@ -117,7 +117,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['status'] = 'active'
image_meta['size'] = 1
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
task = create_volume.ExtractVolumeRequestTask(
fake_image_service,
@ -158,7 +158,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['status'] = 'active'
image_meta['size'] = 1
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
volume_type = 'type1'
task = create_volume.ExtractVolumeRequestTask(
@ -212,7 +212,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['status'] = 'active'
image_meta['size'] = 1
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
volume_type = 'type1'
task = create_volume.ExtractVolumeRequestTask(
@ -258,7 +258,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['status'] = 'active'
image_meta['size'] = 1
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
volume_type = 'type1'
task = create_volume.ExtractVolumeRequestTask(
@ -313,7 +313,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['status'] = 'active'
image_meta['size'] = 1
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
volume_type = 'type1'
task = create_volume.ExtractVolumeRequestTask(
@ -376,7 +376,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['properties'] = {}
image_meta['properties']['cinder_img_volume_type'] = image_volume_type
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
task = create_volume.ExtractVolumeRequestTask(
fake_image_service,
@ -439,7 +439,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['properties'] = {}
image_meta['properties']['cinder_img_volume_type'] = image_volume_type
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
task = create_volume.ExtractVolumeRequestTask(
fake_image_service,
@ -504,7 +504,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['size'] = 1
image_meta['properties'] = fake_img_properties
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
task = create_volume.ExtractVolumeRequestTask(
fake_image_service,
@ -562,7 +562,7 @@ class CreateVolumeFlowTestCase(test.TestCase):
image_meta['id'] = image_id
image_meta['status'] = 'inactive'
fake_image_service.create(self.ctxt, image_meta)
fake_key_manager = mock_key_mgr.MockKeyManager()
fake_key_manager = mock_key_manager.MockKeyManager()
task = create_volume.ExtractVolumeRequestTask(
fake_image_service,

View File

@ -38,7 +38,7 @@ from cinder import flow_utils
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import cache as image_cache
from cinder.image import glance
from cinder import keymgr
from cinder import keymgr as key_manager
from cinder import objects
from cinder.objects import base as objects_base
from cinder.objects import fields
@ -130,7 +130,7 @@ class API(base.Base):
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
self.availability_zones = []
self.availability_zones_last_fetched = None
self.key_manager = keymgr.API()
self.key_manager = key_manager.API(CONF)
super(API, self).__init__(db_driver)
def list_availability_zones(self, enable_cache=False):
@ -437,7 +437,7 @@ class API(base.Base):
encryption_key_id = volume.get('encryption_key_id', None)
if encryption_key_id is not None:
try:
self.key_manager.delete_key(context, encryption_key_id)
self.key_manager.delete(context, encryption_key_id)
except Exception as e:
LOG.warning(_LW("Unable to delete encryption key for "
"volume: %s."), e.msg, resource=volume)

View File

@ -368,8 +368,8 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# Clone the existing key and associate a separate -- but
# identical -- key with each volume.
if encryption_key_id is not None:
encryption_key_id = key_manager.copy_key(context,
encryption_key_id)
encryption_key_id = key_manager.store(
context, key_manager.get(context, encryption_key_id))
else:
volume_type_encryption = (
volume_types.get_volume_type_encryption(context,

View File

@ -61,3 +61,4 @@ os-brick>=1.3.0 # Apache-2.0
os-win>=0.2.3 # Apache-2.0
tooz>=1.28.0 # Apache-2.0
google-api-python-client>=1.4.2 # Apache-2.0
castellan>=0.4.0 # Apache-2.0