Merge "Replace key manager with Castellan"

This commit is contained in:
Jenkins 2016-05-04 17:15:54 +00:00 committed by Gerrit Code Review
commit 4b6ab39393
24 changed files with 236 additions and 1433 deletions

View File

@ -50,7 +50,7 @@ from nova.conf import hyperv
from nova.conf import image_file_url from nova.conf import image_file_url
from nova.conf import ipv6 from nova.conf import ipv6
from nova.conf import ironic from nova.conf import ironic
from nova.conf import keymgr from nova.conf import key_manager
# from nova.conf import keystone_authtoken # from nova.conf import keystone_authtoken
# from nova.conf import libvirt # from nova.conf import libvirt
from nova.conf import mks from nova.conf import mks
@ -121,7 +121,7 @@ mks.register_opts(CONF)
image_file_url.register_opts(CONF) image_file_url.register_opts(CONF)
ipv6.register_opts(CONF) ipv6.register_opts(CONF)
ironic.register_opts(CONF) ironic.register_opts(CONF)
keymgr.register_opts(CONF) key_manager.register_opts(CONF)
# keystone_authtoken.register_opts(CONF) # keystone_authtoken.register_opts(CONF)
# libvirt.register_opts(CONF) # libvirt.register_opts(CONF)
# matchmaker_redis.register_opts(CONF) # matchmaker_redis.register_opts(CONF)

View File

@ -15,26 +15,24 @@
from oslo_config import cfg from oslo_config import cfg
keymgr_group = cfg.OptGroup( key_manager_group = cfg.OptGroup(
'keymgr', 'key_manager',
title='Key manager options') title='Key manager options')
keymgr_opts = [ key_manager_opts = [
cfg.StrOpt('api_class',
default='nova.keymgr.conf_key_mgr.ConfKeyManager',
help='The full class name of the key manager API class'),
cfg.StrOpt( cfg.StrOpt(
'fixed_key', 'fixed_key',
help='Fixed key returned by key manager, specified in hex'), help='Fixed key returned by key manager, specified in hex',
deprecated_group='keymgr'),
] ]
def register_opts(conf): def register_opts(conf):
conf.register_group(keymgr_group) conf.register_group(key_manager_group)
conf.register_opts(keymgr_opts, group=keymgr_group) conf.register_opts(key_manager_opts, group=key_manager_group)
def list_opts(): def list_opts():
return { return {
keymgr_group.name: keymgr_opts key_manager_group.name: key_manager_opts
} }

View File

@ -14,13 +14,65 @@
# under the License. # under the License.
from castellan import options as castellan_opts
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils from oslo_utils import importutils
import nova.conf import nova.conf
from nova.i18n import _LW
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF CONF = nova.conf.CONF
def API(): castellan_opts.set_defaults(CONF)
cls = importutils.import_class(CONF.keymgr.api_class)
return cls() # NOTE(kfarr): This line can be removed when a value is assigned in DevStack
CONF.set_default('api_class', 'nova.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
# NOTE(kfarr): For backwards compatibility, everything below this comment
# is deprecated for removal
api_class = None
try:
api_class = CONF.key_manager.api_class
except cfg.NoSuchOptError:
LOG.warning(_LW("key_manager.api_class is not set, will use deprecated "
"option keymgr.api_class if set"))
try:
api_class = CONF.keymgr.api_class
except cfg.NoSuchOptError:
LOG.warning(_LW("keymgr.api_class is not set"))
deprecated_barbican = 'nova.keymgr.barbican.BarbicanKeyManager'
barbican = 'castellan.key_manager.barbican_key_manager.BarbicanKeyManager'
deprecated_mock = 'nova.tests.unit.keymgr.mock_key_mgr.MockKeyManager'
castellan_mock = ('castellan.tests.unit.key_manager.mock_key_manager.'
'MockKeyManager')
def log_deprecated_warning(deprecated, castellan):
LOG.warning(_LW("key manager api_class set to use deprecated option "
"%(deprecated)s, using %(castellan)s instead"),
{'deprecated': deprecated, 'castellan': castellan})
if api_class == deprecated_barbican:
log_deprecated_warning(deprecated_barbican, barbican)
api_class = barbican
elif api_class == deprecated_mock:
log_deprecated_warning(deprecated_mock, castellan_mock)
api_class = castellan_mock
elif api_class is None:
# TODO(kfarr): key_manager.api_class should be set in DevStack, and this
# block can be removed
LOG.warning(_LW("key manager not set, using insecure default %s"),
castellan_mock)
api_class = castellan_mock
CONF.set_override('api_class', api_class, 'key_manager')
def API(conf=CONF):
cls = importutils.import_class(CONF.key_manager.api_class)
return cls(conf)

View File

@ -1,340 +0,0 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager implementation for Barbican
"""
import array
import base64
import binascii
from barbicanclient import client as barbican_client
from keystoneauth1 import loading as ks_loading
from keystoneauth1 import session
from oslo_log import log as logging
from oslo_utils import excutils
import nova.conf
from nova import exception
from nova.i18n import _
from nova.i18n import _LE
from nova.keymgr import key as keymgr_key
from nova.keymgr import key_mgr
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
class BarbicanKeyManager(key_mgr.KeyManager):
"""Key Manager Interface that wraps the Barbican client API."""
def __init__(self):
self._barbican_client = None
self._current_context = None
self._base_url = None
def _get_barbican_client(self, ctxt):
"""Creates a client to connect to the Barbican service.
:param ctxt: the user context for authentication
:return: a Barbican Client object
:raises Forbidden: if the ctxt is None
"""
# Confirm context is provided, if not raise forbidden
if not ctxt:
msg = _("User is not authorized to use key manager.")
LOG.error(msg)
raise exception.Forbidden(msg)
if not hasattr(ctxt, 'project_id') or ctxt.project_id is None:
msg = _("Unable to create Barbican Client without project_id.")
LOG.error(msg)
raise exception.KeyManagerError(msg)
# If same context, return cached barbican client
if self._barbican_client and self._current_context == ctxt:
return self._barbican_client
try:
_SESSION = ks_loading.load_session_from_conf_options(
CONF,
nova.conf.barbican.barbican_group.name)
auth = ctxt.get_auth_plugin()
service_type, service_name, interface = (CONF.
barbican.
catalog_info.
split(':'))
region_name = CONF.barbican.os_region_name
service_parameters = {'service_type': service_type,
'service_name': service_name,
'interface': interface,
'region_name': region_name}
if CONF.barbican.endpoint_template:
self._base_url = (CONF.barbican.endpoint_template %
ctxt.to_dict())
else:
self._base_url = _SESSION.get_endpoint(
auth, **service_parameters)
# the barbican endpoint can't have the '/v1' on the end
self._barbican_endpoint = self._base_url.rpartition('/')[0]
sess = session.Session(auth=auth)
self._barbican_client = barbican_client.Client(
session=sess,
endpoint=self._barbican_endpoint)
self._current_context = ctxt
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error creating Barbican client: %s"), e)
return self._barbican_client
def create_key(self, ctxt, expiration=None, name='Nova Compute Key',
payload_content_type='application/octet-stream', mode='CBC',
algorithm='AES', length=256):
"""Creates a key.
:param ctxt: contains information of the user and the environment
for the request (nova/context.py)
:param expiration: the date the key will expire
:param name: a friendly name for the secret
:param payload_content_type: the format/type of the secret data
:param mode: the algorithm mode (e.g. CBC or CTR mode)
:param algorithm: the algorithm associated with the secret
:param length: the bit length of the secret
:return: the UUID of the new key
:raises Exception: if key creation fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
key_order = barbican_client.orders.create_key(
name,
algorithm,
length,
mode,
payload_content_type,
expiration)
order_ref = key_order.submit()
order = barbican_client.orders.get(order_ref)
return self._retrieve_secret_uuid(order.secret_ref)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error creating key: %s"), e)
def store_key(self, ctxt, key, expiration=None, name='Nova Compute Key',
payload_content_type='application/octet-stream',
payload_content_encoding='base64', algorithm='AES',
bit_length=256, mode='CBC', from_copy=False):
"""Stores (i.e., registers) a key with the key manager.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param key: the unencrypted secret data. Known as "payload" to the
barbicanclient api
:param expiration: the expiration time of the secret in ISO 8601
format
:param name: a friendly name for the key
:param payload_content_type: the format/type of the secret data
:param payload_content_encoding: the encoding of the secret data
:param algorithm: the algorithm associated with this secret key
:param bit_length: the bit length of this secret key
:param mode: the algorithm mode used with this secret key
:param from_copy: establishes whether the function is being used
to copy a key. In case of the latter, it does not
try to decode the key
:returns: the UUID of the stored key
:raises Exception: if key storage fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
if key.get_algorithm():
algorithm = key.get_algorithm()
if payload_content_type == 'text/plain':
payload_content_encoding = None
encoded_key = key.get_encoded()
elif (payload_content_type == 'application/octet-stream' and
not from_copy):
key_list = key.get_encoded()
string_key = ''.join(map(lambda byte: "%02x" % byte, key_list))
encoded_key = base64.b64encode(binascii.unhexlify(string_key))
else:
encoded_key = key.get_encoded()
secret = barbican_client.secrets.create(name,
encoded_key,
payload_content_type,
payload_content_encoding,
algorithm,
bit_length,
mode,
expiration)
secret_ref = secret.store()
return self._retrieve_secret_uuid(secret_ref)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error storing key: %s"), e)
def copy_key(self, ctxt, key_id):
"""Copies (i.e., clones) a key stored by barbican.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param key_id: the UUID of the key to copy
:return: the UUID of the key copy
:raises Exception: if key copying fails
"""
try:
secret = self._get_secret(ctxt, key_id)
con_type = secret.content_types['default']
secret_data = self._get_secret_data(secret,
payload_content_type=con_type)
key = keymgr_key.SymmetricKey(secret.algorithm, secret_data)
copy_uuid = self.store_key(ctxt, key, secret.expiration,
secret.name, con_type,
'base64',
secret.algorithm, secret.bit_length,
secret.mode, True)
return copy_uuid
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error copying key: %s"), e)
def _create_secret_ref(self, key_id):
"""Creates the URL required for accessing a secret.
:param key_id: the UUID of the key to copy
:return: the URL of the requested secret
"""
if not key_id:
msg = "Key ID is None"
raise exception.KeyManagerError(msg)
return self._base_url + "/secrets/" + key_id
def _retrieve_secret_uuid(self, secret_ref):
"""Retrieves the UUID of the secret from the secret_ref.
:param secret_ref: the href of the secret
:return: the UUID of the secret
"""
# The secret_ref is assumed to be of a form similar to
# http://host:9311/v1/secrets/d152fa13-2b41-42ca-a934-6c21566c0f40
# with the UUID at the end. This command retrieves everything
# after the last '/', which is the UUID.
return secret_ref.rpartition('/')[2]
def _get_secret_data(self,
secret,
payload_content_type='application/octet-stream'):
"""Retrieves the secret data given a secret and content_type.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param secret: the secret from barbican with the payload of data
:param payload_content_type: the format/type of the secret data
:returns: the secret data
:raises Exception: if data cannot be retrieved
"""
try:
generated_data = secret.payload
if payload_content_type == 'application/octet-stream':
secret_data = base64.b64encode(generated_data)
else:
secret_data = generated_data
return secret_data
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error getting secret data: %s"), e)
def _get_secret(self, ctxt, key_id):
"""Returns the metadata of the secret.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param key_id: UUID of the secret
:return: the secret's metadata
:raises Exception: if there is an error retrieving the data
"""
barbican_client = self._get_barbican_client(ctxt)
try:
secret_ref = self._create_secret_ref(key_id)
return barbican_client.secrets.get(secret_ref)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error getting secret metadata: %s"), e)
def get_key(self, ctxt, key_id,
payload_content_type='application/octet-stream'):
"""Retrieves the specified key.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param key_id: the UUID of the key to retrieve
:param payload_content_type: The format/type of the secret data
:return: SymmetricKey representation of the key
:raises Exception: if key retrieval fails
"""
try:
secret = self._get_secret(ctxt, key_id)
secret_data = self._get_secret_data(secret,
payload_content_type)
if payload_content_type == 'application/octet-stream':
# convert decoded string to list of unsigned ints for each byte
key_data = array.array('B',
base64.b64decode(secret_data)).tolist()
else:
key_data = secret_data
key = keymgr_key.SymmetricKey(secret.algorithm, key_data)
return key
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error getting key: %s"), e)
def delete_key(self, ctxt, key_id):
"""Deletes the specified key.
:param ctxt: contains information of the user and the environment for
the request (nova/context.py)
:param key_id: the UUID of the key to delete
:raises Exception: if key deletion fails
"""
barbican_client = self._get_barbican_client(ctxt)
try:
secret_ref = self._create_secret_ref(key_id)
barbican_client.secrets.delete(secret_ref)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Error deleting key: %s"), e)

View File

@ -31,15 +31,22 @@ encrypted with a key provided by this key manager actually share the same
encryption key so *any* volume can be decrypted once the fixed key is known. encryption key so *any* volume can be decrypted once the fixed key is known.
""" """
import binascii
from castellan.common.objects import symmetric_key as key
from castellan.key_manager import key_manager
from oslo_log import log as logging
import nova.conf import nova.conf
from nova.i18n import _ from nova import exception
from nova.keymgr import single_key_mgr from nova.i18n import _, _LW
CONF = nova.conf.CONF CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
class ConfKeyManager(single_key_mgr.SingleKeyManager): class ConfKeyManager(key_manager.KeyManager):
"""This key manager implementation supports all the methods specified by """This key manager implementation supports all the methods specified by
the key manager interface. This implementation creates a single key in the key manager interface. This implementation creates a single key in
response to all invocations of create_key. Side effects response to all invocations of create_key. Side effects
@ -47,11 +54,78 @@ class ConfKeyManager(single_key_mgr.SingleKeyManager):
as specified by the key manager interface. as specified by the key manager interface.
""" """
def __init__(self): def __init__(self, configuration):
if CONF.keymgr.fixed_key is None: LOG.warning(_LW('This key manager is insecure and is not recommended '
raise ValueError(_('keymgr.fixed_key not defined')) 'for production deployments'))
self._hex_key = CONF.keymgr.fixed_key super(ConfKeyManager, self).__init__(configuration)
super(ConfKeyManager, self).__init__()
def _generate_hex_key(self, **kwargs): self.key_id = '00000000-0000-0000-0000-000000000000'
return self._hex_key
self.conf = CONF if configuration is None else configuration
if CONF.key_manager.fixed_key is None:
raise ValueError(_('keymgr.fixed_key not defined'))
self._hex_key = CONF.key_manager.fixed_key
super(ConfKeyManager, self).__init__(configuration)
def _get_key(self):
key_bytes = bytes(binascii.unhexlify(self._hex_key))
return key.SymmetricKey('AES', len(key_bytes) * 8, key_bytes)
def create_key(self, context, algorithm, length, **kwargs):
"""Creates a symmetric key.
This implementation returns a UUID for the key read from the
configuration file. A Forbidden exception is raised if the
specified context is None.
"""
if context is None:
raise exception.Forbidden()
return self.key_id
def create_key_pair(self, context, **kwargs):
raise NotImplementedError(
"ConfKeyManager does not support asymmetric keys")
def store(self, context, managed_object, **kwargs):
"""Stores (i.e., registers) a key with the key manager."""
if context is None:
raise exception.Forbidden()
if managed_object != self._get_key():
raise exception.KeyManagerError(
reason="cannot store arbitrary keys")
return self.key_id
def get(self, context, managed_object_id):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A Forbidden exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if context is None:
raise exception.Forbidden()
if managed_object_id != self.key_id:
raise KeyError(str(managed_object_id) + " != " + str(self.key_id))
return self._get_key()
def delete(self, context, managed_object_id):
"""Represents deleting the key.
Because the ConfKeyManager has only one key, which is read from the
configuration file, the key is not actually deleted when this is
called.
"""
if context is None:
raise exception.Forbidden()
if managed_object_id != self.key_id:
raise exception.KeyManagerError(
reason="cannot delete non-existent key")
LOG.warning(_LW("Not deleting key %s"), managed_object_id)

View File

@ -1,90 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base Key and SymmetricKey Classes
This module defines the Key and SymmetricKey classes. The Key class is the base
class to represent all encryption keys. The basis for this class was copied
from Java.
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Key(object):
"""Base class to represent all keys."""
@abc.abstractmethod
def get_algorithm(self):
"""Returns the key's algorithm.
Returns the key's algorithm. For example, "DSA" indicates that this key
is a DSA key and "AES" indicates that this key is an AES key.
"""
pass
@abc.abstractmethod
def get_format(self):
"""Returns the encoding format.
Returns the key's encoding format or None if this key is not encoded.
"""
pass
@abc.abstractmethod
def get_encoded(self):
"""Returns the key in the format specified by its encoding."""
pass
class SymmetricKey(Key):
"""This class represents symmetric keys."""
def __init__(self, alg, key):
"""Create a new SymmetricKey object.
The arguments specify the algorithm for the symmetric encryption and
the bytes for the key.
"""
self.alg = alg
self.key = key
def get_algorithm(self):
"""Returns the algorithm for symmetric encryption."""
return self.alg
def get_format(self):
"""This method returns 'RAW'."""
return "RAW"
def get_encoded(self):
"""Returns the key in its encoded format."""
return self.key
def __eq__(self, other):
if isinstance(other, SymmetricKey):
return (self.alg == other.alg and
self.key == other.key)
return NotImplemented
def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
return not result

View File

@ -1,102 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager API
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class KeyManager(object):
"""Base Key Manager Interface
A Key Manager is responsible for managing encryption keys for volumes. A
Key Manager is responsible for creating, reading, and deleting keys.
"""
@abc.abstractmethod
def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
**kwargs):
"""Creates a key.
This method creates a key and returns the key's UUID. If the specified
context does not permit the creation of keys, then a NotAuthorized
exception should be raised.
"""
pass
@abc.abstractmethod
def store_key(self, ctxt, key, expiration=None, **kwargs):
"""Stores (i.e., registers) a key with the key manager.
This method stores the specified key and returns its UUID that
identifies it within the key manager. If the specified context does
not permit the creation of keys, then a NotAuthorized exception should
be raised.
"""
pass
@abc.abstractmethod
def copy_key(self, ctxt, key_id, **kwargs):
"""Copies (i.e., clones) a key stored by the key manager.
This method copies the specified key and returns the copy's UUID. If
the specified context does not permit copying keys, then a
NotAuthorized error should be raised.
Implementation note: This method should behave identically to::
store_key(context, get_key(context, <encryption key UUID>))
although it is preferable to perform this operation within the key
manager to avoid unnecessary handling of the key material.
"""
pass
@abc.abstractmethod
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the specified key.
Implementations should verify that the caller has permissions to
retrieve the key by checking the context object passed in as ctxt. If
the user lacks permission then a NotAuthorized exception is raised.
If the specified key does not exist, then a KeyError should be raised.
Implementations should preclude users from discerning the UUIDs of
keys that belong to other users by repeatedly calling this method.
That is, keys that belong to other users should be considered "non-
existent" and completely invisible.
"""
pass
@abc.abstractmethod
def delete_key(self, ctxt, key_id, **kwargs):
"""Deletes the specified key.
Implementations should verify that the caller has permission to delete
the key by checking the context object (ctxt). A NotAuthorized
exception should be raised if the caller lacks permission.
If the specified key does not exist, then a KeyError should be raised.
Implementations should preclude users from discerning the UUIDs of
keys that belong to other users by repeatedly calling this method.
That is, keys that belong to other users should be considered "non-
existent" and completely invisible.
"""
pass

View File

@ -1,133 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A mock implementation of a key manager that stores keys in a dictionary.
This key manager implementation is primarily intended for testing. In
particular, it does not store keys persistently. Lack of a centralized key
store also makes this implementation unsuitable for use among different
services.
Note: Instantiating this class multiple times will create separate key stores.
Keys created in one instance will not be accessible from other instances of
this class.
"""
import array
import codecs
from oslo_log import log as logging
from oslo_utils import uuidutils
from nova import exception
from nova.i18n import _LW
from nova.keymgr import key
from nova.keymgr import key_mgr
from nova import utils
LOG = logging.getLogger(__name__)
decode_hex = codecs.getdecoder("hex_codec")
class MockKeyManager(key_mgr.KeyManager):
"""This mock key manager implementation supports all the methods specified
by the key manager interface. This implementation stores keys within a
dictionary, and as a result, it is not acceptable for use across different
services. Side effects (e.g., raising exceptions) for each method are
handled as specified by the key manager interface.
This key manager is not suitable for use in production deployments.
"""
def __init__(self):
LOG.warning(_LW('This key manager is not suitable for use in '
'production deployments'))
self.keys = {}
def _generate_hex_key(self, **kwargs):
key_length = kwargs.get('key_length', 256)
# hex digit => 4 bits
hex_encoded = utils.generate_password(length=key_length // 4,
symbolgroups='0123456789ABCDEF')
return hex_encoded
def _generate_key(self, **kwargs):
_hex = self._generate_hex_key(**kwargs)
return key.SymmetricKey('AES',
array.array('B', decode_hex(_hex)[0]).tolist())
def create_key(self, ctxt, **kwargs):
"""Creates a key.
This implementation returns a UUID for the created key. A
Forbidden exception is raised if the specified context is None.
"""
if ctxt is None:
raise exception.Forbidden()
key = self._generate_key(**kwargs)
return self.store_key(ctxt, key)
def _generate_key_id(self):
key_id = uuidutils.generate_uuid()
while key_id in self.keys:
key_id = uuidutils.generate_uuid()
return key_id
def store_key(self, ctxt, key, **kwargs):
"""Stores (i.e., registers) a key with the key manager."""
if ctxt is None:
raise exception.Forbidden()
key_id = self._generate_key_id()
self.keys[key_id] = key
return key_id
def copy_key(self, ctxt, key_id, **kwargs):
if ctxt is None:
raise exception.Forbidden()
copied_key_id = self._generate_key_id()
self.keys[copied_key_id] = self.keys[key_id]
return copied_key_id
def get_key(self, ctxt, key_id, **kwargs):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A Forbidden exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if ctxt is None:
raise exception.Forbidden()
return self.keys[key_id]
def delete_key(self, ctxt, key_id, **kwargs):
"""Deletes the key identified by the specified id.
A Forbidden exception is raised if the context is None and a
KeyError is raised if the UUID is invalid.
"""
if ctxt is None:
raise exception.Forbidden()
del self.keys[key_id]

View File

@ -1,41 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Key manager implementation that raises NotImplementedError
"""
from nova.keymgr import key_mgr
class NotImplementedKeyManager(key_mgr.KeyManager):
"""Key Manager Interface that raises NotImplementedError for all operations
"""
def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
**kwargs):
raise NotImplementedError()
def store_key(self, ctxt, key, expiration=None, **kwargs):
raise NotImplementedError()
def copy_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()
def get_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()
def delete_key(self, ctxt, key_id, **kwargs):
raise NotImplementedError()

View File

@ -1,72 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
An implementation of a key manager that returns a single key in response to
all invocations of get_key.
"""
from oslo_log import log as logging
from nova import exception
from nova.i18n import _, _LW
from nova.keymgr import mock_key_mgr
LOG = logging.getLogger(__name__)
class SingleKeyManager(mock_key_mgr.MockKeyManager):
"""This key manager implementation supports all the methods specified by
the key manager interface. This implementation creates a single key in
response to all invocations of create_key. Side effects
(e.g., raising exceptions) for each method are handled as specified by
the key manager interface.
"""
def __init__(self):
LOG.warning(_LW('This key manager is insecure and is not recommended '
'for production deployments'))
super(SingleKeyManager, self).__init__()
self.key_id = '00000000-0000-0000-0000-000000000000'
self.key = self._generate_key(key_length=256)
# key should exist by default
self.keys[self.key_id] = self.key
def _generate_hex_key(self, **kwargs):
key_length = kwargs.get('key_length', 256)
return b'0' * (key_length // 4) # hex digit => 4 bits
def _generate_key_id(self):
return self.key_id
def store_key(self, ctxt, key, **kwargs):
if key != self.key:
raise exception.KeyManagerError(
reason=_("cannot store arbitrary keys"))
return super(SingleKeyManager, self).store_key(ctxt, key, **kwargs)
def delete_key(self, ctxt, key_id, **kwargs):
if ctxt is None:
raise exception.Forbidden()
if key_id != self.key_id:
raise exception.KeyManagerError(
reason=_("cannot delete non-existent key"))
LOG.warning(_LW("Not deleting key %s"), key_id)

View File

@ -26,6 +26,7 @@ from nova.tests.unit import utils
CONF = nova.conf.CONF CONF = nova.conf.CONF
CONF.import_opt('floating_ip_dns_manager', 'nova.network.floating_ips') CONF.import_opt('floating_ip_dns_manager', 'nova.network.floating_ips')
CONF.import_opt('instance_dns_manager', 'nova.network.floating_ips') CONF.import_opt('instance_dns_manager', 'nova.network.floating_ips')
CONF.import_opt('api_class', 'nova.keymgr', group='key_manager')
class ConfFixture(config_fixture.Config): class ConfFixture(config_fixture.Config):
@ -63,6 +64,9 @@ class ConfFixture(config_fixture.Config):
'[0-9a-fk\-]+', 'osapi_v21') '[0-9a-fk\-]+', 'osapi_v21')
self.conf.set_default('force_dhcp_release', False) self.conf.set_default('force_dhcp_release', False)
self.conf.set_default('periodic_enable', False) self.conf.set_default('periodic_enable', False)
self.conf.set_default('api_class',
'nova.keymgr.conf_key_mgr.ConfKeyManager',
group='key_manager')
policy_opts.set_defaults(self.conf) policy_opts.set_defaults(self.conf)
self.addCleanup(utils.cleanup_dns_managers) self.addCleanup(utils.cleanup_dns_managers)
self.addCleanup(ipv6.api.reset_backend) self.addCleanup(ipv6.api.reset_backend)

View File

@ -17,8 +17,8 @@
"""Implementation of a fake key manager.""" """Implementation of a fake key manager."""
from nova.keymgr import mock_key_mgr from castellan.tests.unit.key_manager import mock_key_manager
def fake_api(): def fake_api(configuration=None):
return mock_key_mgr.MockKeyManager() return mock_key_manager.MockKeyManager(configuration)

View File

@ -1,268 +0,0 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the barbican key manager.
"""
import array
import binascii
import mock
from nova import exception
from nova.keymgr import barbican
from nova.keymgr import key as keymgr_key
from nova.tests.unit.keymgr import test_key_mgr
class BarbicanKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return barbican.BarbicanKeyManager()
def setUp(self):
super(BarbicanKeyManagerTestCase, self).setUp()
# Create fake auth_token
self.ctxt = mock.MagicMock()
self.ctxt.auth_token = "fake_token"
self.ctxt.project = "fake_project"
# Create mock barbican client
self._build_mock_barbican()
# Create a key_id, secret_ref, pre_hex, and hex to use
self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40"
self.secret_ref = ("http://host:9311/v1/secrets/" + self.key_id)
self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY="
self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4"
"534ae16")
self.key_mgr._current_context = self.ctxt
self.key_mgr._base_url = "http://host:9311/v1"
self.addCleanup(self._restore)
def _restore(self):
if hasattr(self, 'original_key'):
keymgr_key.SymmetricKey = self.original_key
def _build_mock_barbican(self):
self.mock_barbican = mock.MagicMock(name='mock_barbican')
# Set commonly used methods
self.get = self.mock_barbican.secrets.get
self.delete = self.mock_barbican.secrets.delete
self.store = self.mock_barbican.secrets.store
self.create = self.mock_barbican.secrets.create
self.key_mgr._barbican_client = self.mock_barbican
def _build_mock_symKey(self):
self.mock_symKey = mock.Mock()
def fake_sym_key(alg, key):
self.mock_symKey.get_encoded.return_value = key
self.mock_symKey.get_algorithm.return_value = alg
return self.mock_symKey
self.original_key = keymgr_key.SymmetricKey
keymgr_key.SymmetricKey = fake_sym_key
def test_copy_key(self):
# Create metadata for original secret
original_secret_metadata = mock.Mock()
original_secret_metadata.algorithm = mock.sentinel.alg
original_secret_metadata.bit_length = mock.sentinel.bit
original_secret_metadata.name = mock.sentinel.name
original_secret_metadata.expiration = mock.sentinel.expiration
original_secret_metadata.mode = mock.sentinel.mode
content_types = {'default': 'fake_type'}
original_secret_metadata.content_types = content_types
original_secret_data = mock.Mock()
original_secret_metadata.payload = original_secret_data
# Create href for copied secret
copied_secret = mock.Mock()
copied_secret.store.return_value = 'http://test/uuid'
# Set get and create return values
self.get.return_value = original_secret_metadata
self.create.return_value = copied_secret
# Create the mock key
self._build_mock_symKey()
# Copy the original
self.key_mgr.copy_key(self.ctxt, self.key_id)
# Assert proper methods were called
self.get.assert_called_once_with(self.secret_ref)
self.create.assert_called_once_with(
mock.sentinel.name,
self.mock_symKey.get_encoded(),
content_types['default'],
'base64',
mock.sentinel.alg,
mock.sentinel.bit,
mock.sentinel.mode,
mock.sentinel.expiration)
copied_secret.store.assert_called_once_with()
def test_copy_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.copy_key, None, self.key_id)
def test_create_key(self):
# Create order_ref_url and assign return value
order_ref_url = ("http://localhost:9311/v1/None/orders/"
"4fe939b7-72bc-49aa-bd1e-e979589858af")
key_order = mock.Mock()
self.mock_barbican.orders.create_key.return_value = key_order
key_order.submit.return_value = order_ref_url
# Create order and assign return value
order = mock.Mock()
order.secret_ref = self.secret_ref
self.mock_barbican.orders.get.return_value = order
# Create the key, get the UUID
returned_uuid = self.key_mgr.create_key(self.ctxt)
self.mock_barbican.orders.get.assert_called_once_with(order_ref_url)
self.assertEqual(returned_uuid, self.key_id)
def test_create_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None)
def test_delete_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.delete_key, None, self.key_id)
def test_delete_key(self):
self.key_mgr.delete_key(self.ctxt, self.key_id)
self.delete.assert_called_once_with(self.secret_ref)
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None)
@mock.patch('base64.b64encode')
def test_get_key(self, b64_mock):
b64_mock.return_value = self.pre_hex
content_type = 'application/octet-stream'
key = self.key_mgr.get_key(self.ctxt, self.key_id, content_type)
self.get.assert_called_once_with(self.secret_ref)
encoded = array.array('B', binascii.unhexlify(self.hex)).tolist()
self.assertEqual(key.get_encoded(), encoded)
def test_get_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.get_key, None, self.key_id)
def test_get_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.get_key, self.ctxt, None)
def test_store_key_base64(self):
# Create Key to store
secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist()
_key = keymgr_key.SymmetricKey('AES', secret_key)
# Define the return values
secret = mock.Mock()
self.create.return_value = secret
secret.store.return_value = self.secret_ref
# Store the Key
returned_uuid = self.key_mgr.store_key(self.ctxt, _key, bit_length=32)
self.create.assert_called_once_with('Nova Compute Key',
b'AQKgsw==',
'application/octet-stream',
'base64',
'AES', 32, 'CBC',
None)
self.assertEqual(returned_uuid, self.key_id)
def test_store_key_plaintext(self):
# Create the plaintext key
secret_key_text = "This is a test text key."
_key = keymgr_key.SymmetricKey('AES', secret_key_text)
# Store the Key
self.key_mgr.store_key(self.ctxt, _key,
payload_content_type='text/plain',
payload_content_encoding=None)
self.create.assert_called_once_with('Nova Compute Key',
secret_key_text,
'text/plain',
None,
'AES', 256, 'CBC',
None)
self.assertEqual(self.store.call_count, 0)
def test_store_null_context(self):
self.key_mgr._barbican_client = None
self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, None)
@mock.patch('keystoneauth1.session.Session')
@mock.patch('barbicanclient.client.Client')
def test_get_barbican_client_new(self, mock_barbican, mock_keystone):
manager = self._create_key_manager()
manager._get_barbican_client(self.ctxt)
self.assertEqual(mock_barbican.call_count, 1)
@mock.patch('keystoneauth1.session.Session')
@mock.patch('barbicanclient.client.Client')
def test_get_barbican_client_reused(self, mock_barbican, mock_keystone):
manager = self._create_key_manager()
manager._get_barbican_client(self.ctxt)
self.assertEqual(mock_barbican.call_count, 1)
manager._get_barbican_client(self.ctxt)
self.assertEqual(mock_barbican.call_count, 1)
@mock.patch('keystoneauth1.session.Session')
@mock.patch('barbicanclient.client.Client')
def test_get_barbican_client_not_reused(self, mock_barbican,
mock_keystone):
manager = self._create_key_manager()
manager._get_barbican_client(self.ctxt)
self.assertEqual(mock_barbican.call_count, 1)
ctxt2 = mock.MagicMock()
ctxt2.auth_token = "fake_token2"
ctxt2.project = "fake_project2"
manager._get_barbican_client(ctxt2)
self.assertEqual(mock_barbican.call_count, 2)
def test_get_barbican_client_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr._get_barbican_client, None)
def test_get_barbican_client_missing_project(self):
del(self.ctxt.project_id)
self.assertRaises(exception.KeyManagerError,
self.key_mgr._get_barbican_client, self.ctxt)
def test_get_barbican_client_none_project(self):
self.ctxt.project_id = None
self.assertRaises(exception.KeyManagerError,
self.key_mgr._get_barbican_client, self.ctxt)

View File

@ -17,45 +17,96 @@
Test cases for the conf key manager. Test cases for the conf key manager.
""" """
import array import binascii
import codecs import codecs
from castellan.common.objects import symmetric_key as key
from oslo_config import cfg from oslo_config import cfg
from nova import context
from nova import exception
from nova.keymgr import conf_key_mgr from nova.keymgr import conf_key_mgr
from nova.keymgr import key from nova import test
from nova.tests.unit.keymgr import test_single_key_mgr
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt('fixed_key', 'nova.keymgr.conf_key_mgr', group='keymgr') CONF.import_opt('fixed_key', 'nova.keymgr.conf_key_mgr', group='key_manager')
decode_hex = codecs.getdecoder("hex_codec") decode_hex = codecs.getdecoder("hex_codec")
class ConfKeyManagerTestCase(test_single_key_mgr.SingleKeyManagerTestCase): class ConfKeyManagerTestCase(test.NoDBTestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(ConfKeyManagerTestCase, self).__init__(*args, **kwargs) super(ConfKeyManagerTestCase, self).__init__(*args, **kwargs)
self._hex_key = '0' * 64 self._hex_key = '0' * 64
def _create_key_manager(self): def _create_key_manager(self):
CONF.set_default('fixed_key', default=self._hex_key, group='keymgr') CONF.set_default('fixed_key', default=self._hex_key,
return conf_key_mgr.ConfKeyManager() group='key_manager')
return conf_key_mgr.ConfKeyManager(CONF)
def setUp(self): def setUp(self):
super(ConfKeyManagerTestCase, self).setUp() super(ConfKeyManagerTestCase, self).setUp()
encoded_key = array.array('B', decode_hex(self._hex_key)[0]).tolist() self.ctxt = context.RequestContext('fake', 'fake')
self.key = key.SymmetricKey('AES', encoded_key) self.key_mgr = self._create_key_manager()
encoded_key = bytes(binascii.unhexlify(self._hex_key))
self.key = key.SymmetricKey('AES', len(encoded_key) * 8, encoded_key)
self.key_id = self.key_mgr.key_id
def test_init(self): def test_init(self):
key_manager = self._create_key_manager() key_manager = self._create_key_manager()
self.assertEqual(self._hex_key, key_manager._hex_key) self.assertEqual(self._hex_key, key_manager._hex_key)
def test_init_value_error(self): def test_init_value_error(self):
CONF.set_default('fixed_key', default=None, group='keymgr') CONF.set_default('fixed_key', default=None, group='key_manager')
self.assertRaises(ValueError, conf_key_mgr.ConfKeyManager) self.assertRaises(ValueError, conf_key_mgr.ConfKeyManager, CONF)
def test_generate_hex_key(self): def test_create_key(self):
key_manager = self._create_key_manager() key_id_1 = self.key_mgr.create_key(self.ctxt, 'AES', 256)
self.assertEqual(self._hex_key, key_manager._generate_hex_key()) key_id_2 = self.key_mgr.create_key(self.ctxt, 'AES', 256)
# ensure that the UUIDs are the same
self.assertEqual(key_id_1, key_id_2)
def test_create_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None, 'AES', 256)
def test_store_key(self):
key_bytes = bytes(binascii.unhexlify('0' * 64))
_key = key.SymmetricKey('AES', len(key_bytes) * 8, key_bytes)
key_id = self.key_mgr.store(self.ctxt, _key)
actual_key = self.key_mgr.get(self.ctxt, key_id)
self.assertEqual(_key, actual_key)
def test_store_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.store, None, self.key)
def test_get_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.get, None, None)
def test_get_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.get, self.ctxt, None)
def test_get(self):
self.assertEqual(self.key,
self.key_mgr.get(self.ctxt, self.key_id))
def test_delete_key(self):
key_id = self.key_mgr.create_key(self.ctxt, 'AES', 256)
self.key_mgr.delete(self.ctxt, key_id)
# key won't acutally be deleted
self.assertEqual(self.key,
self.key_mgr.get(self.ctxt, key_id))
def test_delete_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.delete, None, None)
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete, self.ctxt, None)

View File

@ -1,70 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the key classes.
"""
import array
import codecs
from nova.keymgr import key
from nova import test
decode_hex = codecs.getdecoder("hex_codec")
class KeyTestCase(test.NoDBTestCase):
def _create_key(self):
raise NotImplementedError()
def setUp(self):
super(KeyTestCase, self).setUp()
self.key = self._create_key()
class SymmetricKeyTestCase(KeyTestCase):
def _create_key(self):
return key.SymmetricKey(self.algorithm, self.encoded)
def setUp(self):
self.algorithm = 'AES'
self.encoded = array.array('B', decode_hex('0' * 64)[0]).tolist()
super(SymmetricKeyTestCase, self).setUp()
def test_get_algorithm(self):
self.assertEqual(self.key.get_algorithm(), self.algorithm)
def test_get_format(self):
self.assertEqual(self.key.get_format(), 'RAW')
def test_get_encoded(self):
self.assertEqual(self.key.get_encoded(), self.encoded)
def test___eq__(self):
self.assertTrue(self.key == self.key)
self.assertFalse(self.key is None)
self.assertFalse(None == self.key)
def test___ne__(self):
self.assertFalse(self.key != self.key)
self.assertTrue(self.key is not None)
self.assertTrue(None != self.key)

View File

@ -1,31 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the key manager.
"""
from nova import test
class KeyManagerTestCase(test.NoDBTestCase):
def _create_key_manager(self):
raise NotImplementedError()
def setUp(self):
super(KeyManagerTestCase, self).setUp()
self.key_mgr = self._create_key_manager()

View File

@ -1,105 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the mock key manager.
"""
import array
import codecs
from nova import context
from nova import exception
from nova.keymgr import key as keymgr_key
from nova.keymgr import mock_key_mgr
from nova.tests.unit.keymgr import test_key_mgr
decode_hex = codecs.getdecoder("hex_codec")
class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return mock_key_mgr.MockKeyManager()
def setUp(self):
super(MockKeyManagerTestCase, self).setUp()
self.ctxt = context.RequestContext('fake', 'fake')
def test_create_key(self):
key_id_1 = self.key_mgr.create_key(self.ctxt)
key_id_2 = self.key_mgr.create_key(self.ctxt)
# ensure that the UUIDs are unique
self.assertNotEqual(key_id_1, key_id_2)
def test_create_key_with_length(self):
for length in [64, 128, 256]:
key_id = self.key_mgr.create_key(self.ctxt, key_length=length)
key = self.key_mgr.get_key(self.ctxt, key_id)
self.assertEqual(length // 8, len(key.get_encoded()))
def test_create_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.create_key, None)
def test_store_key(self):
secret_key = array.array('B', decode_hex('0' * 64)[0]).tolist()
_key = keymgr_key.SymmetricKey('AES', secret_key)
key_id = self.key_mgr.store_key(self.ctxt, _key)
actual_key = self.key_mgr.get_key(self.ctxt, key_id)
self.assertEqual(_key, actual_key)
def test_store_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, None)
def test_copy_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
key = self.key_mgr.get_key(self.ctxt, key_id)
copied_key_id = self.key_mgr.copy_key(self.ctxt, key_id)
copied_key = self.key_mgr.get_key(self.ctxt, copied_key_id)
self.assertNotEqual(key_id, copied_key_id)
self.assertEqual(key, copied_key)
def test_copy_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.copy_key, None, None)
def test_get_key(self):
pass
def test_get_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.get_key, None, None)
def test_get_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.get_key, self.ctxt, None)
def test_delete_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
self.key_mgr.delete_key(self.ctxt, key_id)
self.assertRaises(KeyError, self.key_mgr.get_key, self.ctxt, key_id)
def test_delete_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.delete_key, None, None)
def test_delete_unknown_key(self):
self.assertRaises(KeyError, self.key_mgr.delete_key, self.ctxt, None)

View File

@ -1,47 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the not implemented key manager.
"""
from nova.keymgr import not_implemented_key_mgr
from nova.tests.unit.keymgr import test_key_mgr
class NotImplementedKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
def _create_key_manager(self):
return not_implemented_key_mgr.NotImplementedKeyManager()
def test_create_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.create_key, None)
def test_store_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.store_key, None, None)
def test_copy_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.copy_key, None, None)
def test_get_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.get_key, None, None)
def test_delete_key(self):
self.assertRaises(NotImplementedError,
self.key_mgr.delete_key, None, None)

View File

@ -1,75 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for the single key manager.
"""
import array
import codecs
from nova import exception
from nova.keymgr import key
from nova.keymgr import single_key_mgr
from nova.tests.unit.keymgr import test_mock_key_mgr
decode_hex = codecs.getdecoder("hex_codec")
class SingleKeyManagerTestCase(test_mock_key_mgr.MockKeyManagerTestCase):
def _create_key_manager(self):
return single_key_mgr.SingleKeyManager()
def setUp(self):
super(SingleKeyManagerTestCase, self).setUp()
self.key_id = '00000000-0000-0000-0000-000000000000'
encoded = array.array('B', decode_hex('0' * 64)[0]).tolist()
self.key = key.SymmetricKey('AES', encoded)
def test___init__(self):
self.assertEqual(self.key,
self.key_mgr.get_key(self.ctxt, self.key_id))
def test_create_key(self):
key_id_1 = self.key_mgr.create_key(self.ctxt)
key_id_2 = self.key_mgr.create_key(self.ctxt)
# ensure that the UUIDs are the same
self.assertEqual(key_id_1, key_id_2)
def test_create_key_with_length(self):
pass
def test_store_null_context(self):
self.assertRaises(exception.Forbidden,
self.key_mgr.store_key, None, self.key)
def test_copy_key(self):
key_id = self.key_mgr.create_key(self.ctxt)
key = self.key_mgr.get_key(self.ctxt, key_id)
copied_key_id = self.key_mgr.copy_key(self.ctxt, key_id)
copied_key = self.key_mgr.get_key(self.ctxt, copied_key_id)
self.assertEqual(key_id, copied_key_id)
self.assertEqual(key, copied_key)
def test_delete_key(self):
pass
def test_delete_unknown_key(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.delete_key, self.ctxt, None)

View File

@ -19,6 +19,7 @@ import os
import shutil import shutil
import tempfile import tempfile
from castellan import key_manager
import fixtures import fixtures
import mock import mock
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
@ -30,7 +31,6 @@ from oslo_utils import uuidutils
from nova import context from nova import context
from nova import exception from nova import exception
from nova import keymgr
from nova import objects from nova import objects
from nova import test from nova import test
from nova.tests.unit import fake_processutils from nova.tests.unit import fake_processutils
@ -42,7 +42,7 @@ from nova.virt.libvirt import imagebackend
from nova.virt.libvirt.storage import rbd_utils from nova.virt.libvirt.storage import rbd_utils
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt('fixed_key', 'nova.keymgr.conf_key_mgr', group='keymgr') CONF.import_opt('fixed_key', 'nova.keymgr.conf_key_mgr', group='key_manager')
class FakeSecret(object): class FakeSecret(object):
@ -788,17 +788,17 @@ class EncryptedLvmTestCase(_ImageTestCase, test.NoDBTestCase):
self.flags(key_size=512, group='ephemeral_storage_encryption') self.flags(key_size=512, group='ephemeral_storage_encryption')
self.flags(fixed_key='00000000000000000000000000000000' self.flags(fixed_key='00000000000000000000000000000000'
'00000000000000000000000000000000', '00000000000000000000000000000000',
group='keymgr') group='key_manager')
self.flags(images_volume_group=self.VG, group='libvirt') self.flags(images_volume_group=self.VG, group='libvirt')
self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME) self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME)
self.OLD_STYLE_INSTANCE_PATH = None self.OLD_STYLE_INSTANCE_PATH = None
self.LV_PATH = os.path.join('/dev', self.VG, self.LV) self.LV_PATH = os.path.join('/dev', self.VG, self.LV)
self.PATH = os.path.join('/dev/mapper', self.PATH = os.path.join('/dev/mapper',
imagebackend.dmcrypt.volume_name(self.LV)) imagebackend.dmcrypt.volume_name(self.LV))
self.key_manager = keymgr.API() self.key_manager = key_manager.API()
self.INSTANCE['ephemeral_key_uuid'] =\ self.INSTANCE['ephemeral_key_uuid'] =\
self.key_manager.create_key(self.CONTEXT) self.key_manager.create_key(self.CONTEXT, 'AES', 256)
self.KEY = self.key_manager.get_key(self.CONTEXT, self.KEY = self.key_manager.get(self.CONTEXT,
self.INSTANCE['ephemeral_key_uuid']).get_encoded() self.INSTANCE['ephemeral_key_uuid']).get_encoded()
self.lvm = imagebackend.lvm self.lvm = imagebackend.lvm

View File

@ -14,24 +14,21 @@
# under the License. # under the License.
import array import binascii
import codecs
from castellan.common.objects import symmetric_key as key
import mock import mock
import six import six
from nova import exception from nova import exception
from nova.keymgr import key
from nova.tests.unit.volume.encryptors import test_base from nova.tests.unit.volume.encryptors import test_base
from nova.volume.encryptors import cryptsetup from nova.volume.encryptors import cryptsetup
decode_hex = codecs.getdecoder("hex_codec")
def fake__get_key(context): def fake__get_key(context):
raw = array.array('B', decode_hex('0' * 64)[0]).tolist() raw = bytes(binascii.unhexlify('0' * 32))
symmetric_key = key.SymmetricKey('AES', raw) symmetric_key = key.SymmetricKey('AES', len(raw) * 8, raw)
return symmetric_key return symmetric_key

View File

@ -647,7 +647,7 @@ class Lvm(Image):
self.ephemeral_key_uuid = instance.get('ephemeral_key_uuid') self.ephemeral_key_uuid = instance.get('ephemeral_key_uuid')
if self.ephemeral_key_uuid is not None: if self.ephemeral_key_uuid is not None:
self.key_manager = keymgr.API() self.key_manager = keymgr.API(CONF)
else: else:
self.key_manager = None self.key_manager = None
@ -727,7 +727,7 @@ class Lvm(Image):
# NOTE(dgenin): Key manager corresponding to the # NOTE(dgenin): Key manager corresponding to the
# specific backend catches and reraises an # specific backend catches and reraises an
# an exception if key retrieval fails. # an exception if key retrieval fails.
key = self.key_manager.get_key(kwargs['context'], key = self.key_manager.get(kwargs['context'],
self.ephemeral_key_uuid).get_encoded() self.ephemeral_key_uuid).get_encoded()
except Exception: except Exception:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():

View File

@ -41,7 +41,7 @@ class VolumeEncryptor(object):
:param: the connection information used to attach the volume :param: the connection information used to attach the volume
""" """
return self._key_manager.get_key(context, self.encryption_key_id) return self._key_manager.get(context, self.encryption_key_id)
@abc.abstractmethod @abc.abstractmethod
def attach_volume(self, context, **kwargs): def attach_volume(self, context, **kwargs):

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
import binascii
import os import os
from oslo_log import log as logging from oslo_log import log as logging
@ -54,7 +55,7 @@ class CryptsetupEncryptor(base.VolumeEncryptor):
def _get_passphrase(self, key): def _get_passphrase(self, key):
"""Convert raw key to string.""" """Convert raw key to string."""
return ''.join(hex(x).replace('0x', '') for x in key) return binascii.hexlify(key).decode('utf-8')
def _open_volume(self, passphrase, **kwargs): def _open_volume(self, passphrase, **kwargs):
"""Opens the LUKS partition on the volume using the specified """Opens the LUKS partition on the volume using the specified