Fix wrapper to work with barbicanclient 3.0.1
Due to the updates to the python-barbicanclient in version 3.0.1, the cinder
barbican wrapper no longer functions. This patch updates the cinder barbican
wrapper and test cases to work with barbicanclient 3.0.1.
Change-Id: Ibef4abd5fd9b12962420eaa5d7ebf62700171861
Closes-Bug: #1388461
(cherry picked from commit 2d52d4177c
)
This commit is contained in:
parent
628196e46e
commit
fd323fed35
@ -22,8 +22,8 @@ import base64
|
|||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
from barbicanclient import client as barbican_client
|
from barbicanclient import client as barbican_client
|
||||||
from barbicanclient.common import auth
|
from keystoneclient.auth import identity
|
||||||
from keystoneclient.v2_0 import client as keystone_client
|
from keystoneclient import session
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from cinder import exception
|
from cinder import exception
|
||||||
@ -42,14 +42,21 @@ LOG = logging.getLogger(__name__)
|
|||||||
class BarbicanKeyManager(key_mgr.KeyManager):
|
class BarbicanKeyManager(key_mgr.KeyManager):
|
||||||
"""Key Manager Interface that wraps the Barbican client API."""
|
"""Key Manager Interface that wraps the Barbican client API."""
|
||||||
|
|
||||||
def _create_connection(self, ctxt):
|
def __init__(self):
|
||||||
"""Creates a connection to the Barbican service.
|
self._base_url = CONF.keymgr.encryption_api_url
|
||||||
|
# the barbican endpoint can't have the '/v1' on the end
|
||||||
|
self._barbican_endpoint = self._base_url.rpartition('/')[0]
|
||||||
|
self._barbican_client = None
|
||||||
|
|
||||||
|
def _get_barbican_client(self, ctxt):
|
||||||
|
"""Creates a client to connect to the Barbican service.
|
||||||
|
|
||||||
:param ctxt: the user context for authentication
|
:param ctxt: the user context for authentication
|
||||||
:return: a Barbican Connection object
|
:return: a Barbican Client object
|
||||||
:throws NotAuthorized: if the ctxt is None
|
:throws NotAuthorized: if the ctxt is None
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if not self._barbican_client:
|
||||||
# Confirm context is provided, if not raise not authorized
|
# Confirm context is provided, if not raise not authorized
|
||||||
if not ctxt:
|
if not ctxt:
|
||||||
msg = _("User is not authorized to use key manager.")
|
msg = _("User is not authorized to use key manager.")
|
||||||
@ -57,17 +64,19 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
raise exception.NotAuthorized(msg)
|
raise exception.NotAuthorized(msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
endpoint = CONF.keymgr.encryption_auth_url
|
auth = identity.v3.Token(
|
||||||
keystone = keystone_client.Client(token=ctxt.auth_token,
|
auth_url=CONF.keymgr.encryption_auth_url,
|
||||||
endpoint=endpoint)
|
token=ctxt.auth_token)
|
||||||
keystone_auth = auth.KeystoneAuthV2(keystone=keystone)
|
sess = session.Session(auth=auth)
|
||||||
keystone_auth._barbican_url = CONF.keymgr.encryption_api_url
|
self._barbican_client = barbican_client.Client(
|
||||||
connection = barbican_client.Client(auth_plugin=keystone_auth)
|
session=sess,
|
||||||
return connection
|
endpoint=self._barbican_endpoint)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Error creating Barbican client: %s"), (e))
|
LOG.error(_("Error creating Barbican client: %s"), (e))
|
||||||
|
|
||||||
|
return self._barbican_client
|
||||||
|
|
||||||
def create_key(self, ctxt, expiration=None, name='Cinder Volume Key',
|
def create_key(self, ctxt, expiration=None, name='Cinder Volume Key',
|
||||||
payload_content_type='application/octet-stream', mode='CBC',
|
payload_content_type='application/octet-stream', mode='CBC',
|
||||||
algorithm='AES', length=256):
|
algorithm='AES', length=256):
|
||||||
@ -85,13 +94,18 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:return: the UUID of the new key
|
:return: the UUID of the new key
|
||||||
:throws Exception: if key creation fails
|
:throws Exception: if key creation fails
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
barbican_client = self._get_barbican_client(ctxt)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
order_ref = connection.orders.create(name, payload_content_type,
|
key_order = barbican_client.orders.create_key(
|
||||||
algorithm, length, mode,
|
name,
|
||||||
|
algorithm,
|
||||||
|
length,
|
||||||
|
mode,
|
||||||
|
payload_content_type,
|
||||||
expiration)
|
expiration)
|
||||||
order = connection.orders.get(order_ref)
|
order_ref = key_order.submit()
|
||||||
|
order = barbican_client.orders.get(order_ref)
|
||||||
secret_uuid = order.secret_ref.rpartition('/')[2]
|
secret_uuid = order.secret_ref.rpartition('/')[2]
|
||||||
return secret_uuid
|
return secret_uuid
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -123,7 +137,7 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:returns: the UUID of the stored key
|
:returns: the UUID of the stored key
|
||||||
:throws Exception: if key storage fails
|
:throws Exception: if key storage fails
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
barbican_client = self._get_barbican_client(ctxt)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if key.get_algorithm():
|
if key.get_algorithm():
|
||||||
@ -138,11 +152,15 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
encoded_key = base64.b64encode(binascii.unhexlify(string_key))
|
encoded_key = base64.b64encode(binascii.unhexlify(string_key))
|
||||||
else:
|
else:
|
||||||
encoded_key = key.get_encoded()
|
encoded_key = key.get_encoded()
|
||||||
secret_ref = connection.secrets.store(name, encoded_key,
|
secret = barbican_client.secrets.create(name,
|
||||||
|
encoded_key,
|
||||||
payload_content_type,
|
payload_content_type,
|
||||||
payload_content_encoding,
|
payload_content_encoding,
|
||||||
algorithm, bit_length, mode,
|
algorithm,
|
||||||
|
bit_length,
|
||||||
|
mode,
|
||||||
expiration)
|
expiration)
|
||||||
|
secret_ref = secret.store()
|
||||||
secret_uuid = secret_ref.rpartition('/')[2]
|
secret_uuid = secret_ref.rpartition('/')[2]
|
||||||
return secret_uuid
|
return secret_uuid
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -158,36 +176,40 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:return: the UUID of the key copy
|
:return: the UUID of the key copy
|
||||||
:throws Exception: if key copying fails
|
:throws Exception: if key copying fails
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
barbican_client = self._get_barbican_client(ctxt)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
secret_ref = self._create_secret_ref(key_id, connection)
|
secret_ref = self._create_secret_ref(key_id, barbican_client)
|
||||||
meta = self._get_secret_metadata(ctxt, secret_ref)
|
secret = self._get_secret(ctxt, secret_ref)
|
||||||
con_type = meta.content_types['default']
|
con_type = secret.content_types['default']
|
||||||
secret_data = self._get_secret_data(ctxt, secret_ref,
|
secret_data = self._get_secret_data(secret,
|
||||||
payload_content_type=con_type)
|
payload_content_type=con_type)
|
||||||
key = keymgr_key.SymmetricKey(meta.algorithm, secret_data)
|
key = keymgr_key.SymmetricKey(secret.algorithm, secret_data)
|
||||||
copy_uuid = self.store_key(ctxt, key, meta.expiration,
|
copy_uuid = self.store_key(ctxt, key, secret.expiration,
|
||||||
meta.name, con_type,
|
secret.name, con_type,
|
||||||
'base64',
|
'base64',
|
||||||
meta.algorithm, meta.bit_length,
|
secret.algorithm, secret.bit_length,
|
||||||
meta.mode, True)
|
secret.mode, True)
|
||||||
return copy_uuid
|
return copy_uuid
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Error copying key: %s"), (e))
|
LOG.error(_("Error copying key: %s"), (e))
|
||||||
|
|
||||||
def _create_secret_ref(self, key_id, connection):
|
def _create_secret_ref(self, key_id, barbican_client):
|
||||||
"""Creates the URL required for accessing a secret.
|
"""Creates the URL required for accessing a secret.
|
||||||
|
|
||||||
:param key_id: the UUID of the key to copy
|
:param key_id: the UUID of the key to copy
|
||||||
:param connection: barbican key manager object
|
:param barbican_client: barbican key manager object
|
||||||
|
|
||||||
:return: the URL of the requested secret
|
:return: the URL of the requested secret
|
||||||
"""
|
"""
|
||||||
return connection.base_url + "/secrets/" + key_id
|
if not key_id:
|
||||||
|
msg = "Key ID is None"
|
||||||
|
raise exception.KeyManagerError(msg)
|
||||||
|
return self._base_url + "/secrets/" + key_id
|
||||||
|
|
||||||
def _get_secret_data(self, ctxt, secret_ref,
|
def _get_secret_data(self,
|
||||||
|
secret,
|
||||||
payload_content_type='application/octet-stream'):
|
payload_content_type='application/octet-stream'):
|
||||||
"""Retrieves the secret data given a secret_ref and content_type.
|
"""Retrieves the secret data given a secret_ref and content_type.
|
||||||
|
|
||||||
@ -199,11 +221,8 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:returns: the secret data
|
:returns: the secret data
|
||||||
:throws Exception: if data cannot be retrieved
|
:throws Exception: if data cannot be retrieved
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
generated_data = connection.secrets.decrypt(secret_ref,
|
generated_data = secret.payload
|
||||||
payload_content_type)
|
|
||||||
if payload_content_type == 'application/octet-stream':
|
if payload_content_type == 'application/octet-stream':
|
||||||
secret_data = base64.b64encode(generated_data)
|
secret_data = base64.b64encode(generated_data)
|
||||||
else:
|
else:
|
||||||
@ -213,7 +232,7 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Error getting secret data: %s"), (e))
|
LOG.error(_("Error getting secret data: %s"), (e))
|
||||||
|
|
||||||
def _get_secret_metadata(self, ctxt, secret_ref):
|
def _get_secret(self, ctxt, secret_ref):
|
||||||
"""Creates the URL required for accessing a secret's metadata.
|
"""Creates the URL required for accessing a secret's metadata.
|
||||||
|
|
||||||
:param ctxt: contains information of the user and the environment for
|
:param ctxt: contains information of the user and the environment for
|
||||||
@ -224,10 +243,10 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:throws Exception: if there is an error retrieving the data
|
:throws Exception: if there is an error retrieving the data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
connection = self._create_connection(ctxt)
|
barbican_client = self._get_barbican_client(ctxt)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return connection.secrets.get(secret_ref)
|
return barbican_client.secrets.get(secret_ref)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Error getting secret metadata: %s"), (e))
|
LOG.error(_("Error getting secret metadata: %s"), (e))
|
||||||
@ -244,20 +263,18 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:return: SymmetricKey representation of the key
|
:return: SymmetricKey representation of the key
|
||||||
:throws Exception: if key retrieval fails
|
:throws Exception: if key retrieval fails
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
secret_ref = self._create_secret_ref(key_id, connection)
|
secret_ref = self._create_secret_ref(key_id, barbican_client)
|
||||||
secret_data = self._get_secret_data(ctxt, secret_ref,
|
secret = self._get_secret(ctxt, secret_ref)
|
||||||
|
secret_data = self._get_secret_data(secret,
|
||||||
payload_content_type)
|
payload_content_type)
|
||||||
if payload_content_type == 'application/octet-stream':
|
if payload_content_type == 'application/octet-stream':
|
||||||
# convert decoded string to list of unsigned ints for each byte
|
# convert decoded string to list of unsigned ints for each byte
|
||||||
secret = array.array('B',
|
key_data = array.array('B',
|
||||||
base64.b64decode(secret_data)).tolist()
|
base64.b64decode(secret_data)).tolist()
|
||||||
else:
|
else:
|
||||||
secret = secret_data
|
key_data = secret_data
|
||||||
meta = self._get_secret_metadata(ctxt, secret_ref)
|
key = keymgr_key.SymmetricKey(secret.algorithm, key_data)
|
||||||
key = keymgr_key.SymmetricKey(meta.algorithm, secret)
|
|
||||||
return key
|
return key
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
@ -271,11 +288,11 @@ class BarbicanKeyManager(key_mgr.KeyManager):
|
|||||||
:param key_id: the UUID of the key to delete
|
:param key_id: the UUID of the key to delete
|
||||||
:throws Exception: if key deletion fails
|
:throws Exception: if key deletion fails
|
||||||
"""
|
"""
|
||||||
connection = self._create_connection(ctxt)
|
barbican_client = self._get_barbican_client(ctxt)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
secret_ref = self._create_secret_ref(key_id, connection)
|
secret_ref = self._create_secret_ref(key_id, barbican_client)
|
||||||
connection.secrets.delete(secret_ref)
|
barbican_client.secrets.delete(secret_ref)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error(_("Error deleting key: %s"), (e))
|
LOG.error(_("Error deleting key: %s"), (e))
|
||||||
|
231
cinder/tests/keymgr/test_barbican.py
Normal file
231
cinder/tests/keymgr/test_barbican.py
Normal file
@ -0,0 +1,231 @@
|
|||||||
|
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Test cases for the barbican key manager.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import array
|
||||||
|
import base64
|
||||||
|
import binascii
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from cinder import exception
|
||||||
|
from cinder.keymgr import barbican
|
||||||
|
from cinder.keymgr import key as keymgr_key
|
||||||
|
from cinder.tests.keymgr import test_key_mgr
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.import_opt('encryption_auth_url', 'cinder.keymgr.key_mgr', group='keymgr')
|
||||||
|
CONF.import_opt('encryption_api_url', 'cinder.keymgr.key_mgr', group='keymgr')
|
||||||
|
|
||||||
|
|
||||||
|
class BarbicanKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
|
||||||
|
|
||||||
|
def _create_key_manager(self):
|
||||||
|
return barbican.BarbicanKeyManager()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(BarbicanKeyManagerTestCase, self).setUp()
|
||||||
|
|
||||||
|
# Create fake auth_token
|
||||||
|
self.ctxt = mock.Mock()
|
||||||
|
self.ctxt.auth_token = "fake_token"
|
||||||
|
|
||||||
|
# Create mock barbican client
|
||||||
|
self._build_mock_barbican()
|
||||||
|
|
||||||
|
# Create a key_id, secret_ref, pre_hex, and hex to use
|
||||||
|
self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40"
|
||||||
|
self.secret_ref = self.key_mgr._create_secret_ref(self.key_id,
|
||||||
|
self.mock_barbican)
|
||||||
|
self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY="
|
||||||
|
self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4"
|
||||||
|
"534ae16")
|
||||||
|
self.addCleanup(self._restore)
|
||||||
|
|
||||||
|
def _restore(self):
|
||||||
|
if hasattr(self, 'original_key'):
|
||||||
|
keymgr_key.SymmetricKey = self.original_key
|
||||||
|
if hasattr(self, 'original_base64'):
|
||||||
|
base64.b64encode = self.original_base64
|
||||||
|
|
||||||
|
def _build_mock_barbican(self):
|
||||||
|
self.mock_barbican = mock.MagicMock(name='mock_barbican')
|
||||||
|
|
||||||
|
# Set commonly used methods
|
||||||
|
self.get = self.mock_barbican.secrets.get
|
||||||
|
self.delete = self.mock_barbican.secrets.delete
|
||||||
|
self.store = self.mock_barbican.secrets.store
|
||||||
|
self.create = self.mock_barbican.secrets.create
|
||||||
|
|
||||||
|
self.key_mgr._barbican_client = self.mock_barbican
|
||||||
|
|
||||||
|
def _build_mock_symKey(self):
|
||||||
|
self.mock_symKey = mock.Mock()
|
||||||
|
|
||||||
|
def fake_sym_key(alg, key):
|
||||||
|
self.mock_symKey.get_encoded.return_value = key
|
||||||
|
self.mock_symKey.get_algorithm.return_value = alg
|
||||||
|
return self.mock_symKey
|
||||||
|
self.original_key = keymgr_key.SymmetricKey
|
||||||
|
keymgr_key.SymmetricKey = fake_sym_key
|
||||||
|
|
||||||
|
def _build_mock_base64(self):
|
||||||
|
|
||||||
|
def fake_base64_b64encode(string):
|
||||||
|
return self.pre_hex
|
||||||
|
|
||||||
|
self.original_base64 = base64.b64encode
|
||||||
|
base64.b64encode = fake_base64_b64encode
|
||||||
|
|
||||||
|
def test_copy_key(self):
|
||||||
|
# Create metadata for original secret
|
||||||
|
original_secret_metadata = mock.Mock()
|
||||||
|
original_secret_metadata.algorithm = 'fake_algorithm'
|
||||||
|
original_secret_metadata.bit_length = 'fake_bit_length'
|
||||||
|
original_secret_metadata.name = 'original_name'
|
||||||
|
original_secret_metadata.expiration = 'fake_expiration'
|
||||||
|
original_secret_metadata.mode = 'fake_mode'
|
||||||
|
content_types = {'default': 'fake_type'}
|
||||||
|
original_secret_metadata.content_types = content_types
|
||||||
|
original_secret_data = mock.Mock()
|
||||||
|
original_secret_metadata.payload = original_secret_data
|
||||||
|
self.get.return_value = original_secret_metadata
|
||||||
|
|
||||||
|
# Create the mock key
|
||||||
|
self._build_mock_symKey()
|
||||||
|
|
||||||
|
# Copy the original
|
||||||
|
self.key_mgr.copy_key(self.ctxt, self.key_id)
|
||||||
|
|
||||||
|
# Assert proper methods were called
|
||||||
|
self.get.assert_called_once_with(self.secret_ref)
|
||||||
|
self.create.assert_called_once_with(
|
||||||
|
original_secret_metadata.name,
|
||||||
|
self.mock_symKey.get_encoded(),
|
||||||
|
content_types['default'],
|
||||||
|
'base64',
|
||||||
|
original_secret_metadata.algorithm,
|
||||||
|
original_secret_metadata.bit_length,
|
||||||
|
original_secret_metadata.mode,
|
||||||
|
original_secret_metadata.expiration)
|
||||||
|
self.store.assert_called()
|
||||||
|
|
||||||
|
def test_copy_null_context(self):
|
||||||
|
self.key_mgr._barbican_client = None
|
||||||
|
self.assertRaises(exception.NotAuthorized,
|
||||||
|
self.key_mgr.copy_key, None, self.key_id)
|
||||||
|
|
||||||
|
def test_create_key(self):
|
||||||
|
# Create order_ref_url and assign return value
|
||||||
|
order_ref_url = ("http://localhost:9311/v1/None/orders/"
|
||||||
|
"4fe939b7-72bc-49aa-bd1e-e979589858af")
|
||||||
|
key_order = mock.Mock()
|
||||||
|
self.mock_barbican.orders.create_key.return_value = key_order
|
||||||
|
key_order.submit.return_value = order_ref_url
|
||||||
|
|
||||||
|
# Create order and assign return value
|
||||||
|
order = mock.Mock()
|
||||||
|
order.secret_ref = self.secret_ref
|
||||||
|
self.mock_barbican.orders.get.return_value = order
|
||||||
|
|
||||||
|
# Create the key, get the UUID
|
||||||
|
returned_uuid = self.key_mgr.create_key(self.ctxt)
|
||||||
|
|
||||||
|
self.mock_barbican.orders.get.assert_called_once_with(order_ref_url)
|
||||||
|
self.assertEqual(returned_uuid, self.key_id)
|
||||||
|
|
||||||
|
def test_create_null_context(self):
|
||||||
|
self.key_mgr._barbican_client = None
|
||||||
|
self.assertRaises(exception.NotAuthorized,
|
||||||
|
self.key_mgr.create_key, None)
|
||||||
|
|
||||||
|
def test_delete_null_context(self):
|
||||||
|
self.key_mgr._barbican_client = None
|
||||||
|
self.assertRaises(exception.NotAuthorized,
|
||||||
|
self.key_mgr.delete_key, None, self.key_id)
|
||||||
|
|
||||||
|
def test_delete_key(self):
|
||||||
|
self.key_mgr.delete_key(self.ctxt, self.key_id)
|
||||||
|
self.delete.assert_called_once_with(self.secret_ref)
|
||||||
|
|
||||||
|
def test_delete_unknown_key(self):
|
||||||
|
self.assertRaises(exception.KeyManagerError,
|
||||||
|
self.key_mgr.delete_key, self.ctxt, None)
|
||||||
|
|
||||||
|
def test_get_key(self):
|
||||||
|
self._build_mock_base64()
|
||||||
|
content_type = 'application/octet-stream'
|
||||||
|
|
||||||
|
key = self.key_mgr.get_key(self.ctxt, self.key_id, content_type)
|
||||||
|
|
||||||
|
self.get.assert_called_once_with(self.secret_ref)
|
||||||
|
encoded = array.array('B', binascii.unhexlify(self.hex)).tolist()
|
||||||
|
self.assertEqual(key.get_encoded(), encoded)
|
||||||
|
|
||||||
|
def test_get_null_context(self):
|
||||||
|
self.key_mgr._barbican_client = None
|
||||||
|
self.assertRaises(exception.NotAuthorized,
|
||||||
|
self.key_mgr.get_key, None, self.key_id)
|
||||||
|
|
||||||
|
def test_get_unknown_key(self):
|
||||||
|
self.assertRaises(exception.KeyManagerError,
|
||||||
|
self.key_mgr.get_key, self.ctxt, None)
|
||||||
|
|
||||||
|
def test_store_key_base64(self):
|
||||||
|
# Create Key to store
|
||||||
|
secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist()
|
||||||
|
_key = keymgr_key.SymmetricKey('AES', secret_key)
|
||||||
|
|
||||||
|
# Define the return values
|
||||||
|
secret = mock.Mock()
|
||||||
|
self.create.return_value = secret
|
||||||
|
secret.store.return_value = self.secret_ref
|
||||||
|
|
||||||
|
# Store the Key
|
||||||
|
returned_uuid = self.key_mgr.store_key(self.ctxt, _key, bit_length=32)
|
||||||
|
|
||||||
|
self.create.assert_called_once_with('Cinder Volume Key',
|
||||||
|
'AQKgsw==',
|
||||||
|
'application/octet-stream',
|
||||||
|
'base64',
|
||||||
|
'AES', 32, 'CBC',
|
||||||
|
None)
|
||||||
|
self.assertEqual(returned_uuid, self.key_id)
|
||||||
|
|
||||||
|
def test_store_key_plaintext(self):
|
||||||
|
# Create the plaintext key
|
||||||
|
secret_key_text = "This is a test text key."
|
||||||
|
_key = keymgr_key.SymmetricKey('AES', secret_key_text)
|
||||||
|
|
||||||
|
# Store the Key
|
||||||
|
self.key_mgr.store_key(self.ctxt, _key,
|
||||||
|
payload_content_type='text/plain',
|
||||||
|
payload_content_encoding=None)
|
||||||
|
self.create.assert_called_once_with('Cinder Volume Key',
|
||||||
|
secret_key_text,
|
||||||
|
'text/plain',
|
||||||
|
None,
|
||||||
|
'AES', 256, 'CBC',
|
||||||
|
None)
|
||||||
|
self.store.assert_called_once()
|
||||||
|
|
||||||
|
def test_store_null_context(self):
|
||||||
|
self.key_mgr._barbican_client = None
|
||||||
|
self.assertRaises(exception.NotAuthorized,
|
||||||
|
self.key_mgr.store_key, None, None)
|
Loading…
Reference in New Issue
Block a user