castellan/castellan/tests/functional/key_manager/test_key_manager.py

245 lines
9.5 KiB
Python

# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test cases for a key manager.
These test cases should pass against any key manager.
"""
from castellan.common import exception
from castellan.common.objects import opaque_data
from castellan.common.objects import passphrase
from castellan.common.objects import private_key
from castellan.common.objects import public_key
from castellan.common.objects import symmetric_key
from castellan.common.objects import x_509
from castellan.tests import utils
def _get_test_symmetric_key():
key_bytes = bytes(utils.get_symmetric_key())
bit_length = 128
key = symmetric_key.SymmetricKey('AES', bit_length, key_bytes)
return key
def _get_test_public_key():
key_bytes = bytes(utils.get_public_key_der())
bit_length = 2048
key = public_key.PublicKey('RSA', bit_length, key_bytes)
return key
def _get_test_private_key():
key_bytes = bytes(utils.get_private_key_der())
bit_length = 2048
key = private_key.PrivateKey('RSA', bit_length, key_bytes)
return key
def _get_test_certificate():
data = bytes(utils.get_certificate_der())
cert = x_509.X509(data)
return cert
def _get_test_opaque_data():
data = bytes(b'opaque data')
opaque_object = opaque_data.OpaqueData(data)
return opaque_object
def _get_test_passphrase():
data = bytes(b'passphrase')
passphrase_object = passphrase.Passphrase(data)
return passphrase_object
@utils.parameterized_test_case
class KeyManagerTestCase(object):
def _create_key_manager(self):
raise NotImplementedError()
def setUp(self):
super(KeyManagerTestCase, self).setUp()
self.key_mgr = self._create_key_manager()
self.ctxt = None
def _get_valid_object_uuid(self, managed_object):
object_uuid = self.key_mgr.store(self.ctxt, managed_object)
self.assertIsNotNone(object_uuid)
return object_uuid
def test_create_key(self):
key_uuid = self.key_mgr.create_key(self.ctxt,
algorithm='AES',
length=256)
self.addCleanup(self.key_mgr.delete, self.ctxt, key_uuid)
self.assertIsNotNone(key_uuid)
def test_create_key_pair(self):
private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair(
self.ctxt,
algorithm='RSA',
length=2048)
self.addCleanup(self.key_mgr.delete, self.ctxt, private_key_uuid)
self.addCleanup(self.key_mgr.delete, self.ctxt, public_key_uuid)
self.assertIsNotNone(private_key_uuid)
self.assertIsNotNone(public_key_uuid)
self.assertNotEqual(private_key_uuid, public_key_uuid)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_delete(self, managed_object):
object_uuid = self._get_valid_object_uuid(managed_object)
self.key_mgr.delete(self.ctxt, object_uuid)
try:
self.key_mgr.get(self.ctxt, object_uuid)
except exception.ManagedObjectNotFoundError:
pass
else:
self.fail('No exception when deleting non-existent key')
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_get(self, managed_object):
uuid = self._get_valid_object_uuid(managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())
self.assertFalse(managed_object.is_metadata_only())
self.assertFalse(retrieved_object.is_metadata_only())
self.assertIsNotNone(retrieved_object.id)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_get_metadata(self, managed_object):
uuid = self._get_valid_object_uuid(managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt,
uuid,
metadata_only=True)
self.assertFalse(managed_object.is_metadata_only())
self.assertTrue(retrieved_object.is_metadata_only())
self.assertIsNotNone(retrieved_object.id)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_store(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
retrieved_object = self.key_mgr.get(self.ctxt, uuid)
self.assertEqual(managed_object.get_encoded(),
retrieved_object.get_encoded())
self.assertIsNotNone(retrieved_object.id)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_list(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check if the object we created is in the list
retrieved_objects = self.key_mgr.list(self.ctxt)
self.assertTrue(managed_object in retrieved_objects)
for retrieved_object in retrieved_objects:
self.assertFalse(retrieved_object.is_metadata_only())
self.assertIsNotNone(retrieved_object.id)
@utils.parameterized_dataset({
'symmetric_key': [_get_test_symmetric_key()],
'public_key': [_get_test_public_key()],
'private_key': [_get_test_private_key()],
'certificate': [_get_test_certificate()],
'passphrase': [_get_test_passphrase()],
'opaque_data': [_get_test_opaque_data()],
})
def test_list_metadata_only(self, managed_object):
uuid = self.key_mgr.store(self.ctxt, managed_object)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid)
expected_obj = self.key_mgr.get(self.ctxt, uuid, metadata_only=True)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check if the object we created is in the list
retrieved_objects = self.key_mgr.list(self.ctxt, metadata_only=True)
self.assertTrue(expected_obj in retrieved_objects)
for retrieved_object in retrieved_objects:
self.assertTrue(retrieved_object.is_metadata_only())
self.assertIsNotNone(retrieved_object.id)
@utils.parameterized_dataset({
'query_by_object_type': {
'object_1': _get_test_symmetric_key(),
'object_2': _get_test_public_key(),
'query_dict': dict(object_type=symmetric_key.SymmetricKey)
},
})
def test_list_with_filter(self, object_1, object_2, query_dict):
uuid1 = self.key_mgr.store(self.ctxt, object_1)
uuid2 = self.key_mgr.store(self.ctxt, object_2)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid1)
self.addCleanup(self.key_mgr.delete, self.ctxt, uuid2)
# the list command may return more objects than the one we just
# created if older objects were not cleaned up, so we will simply
# check that the returned objects have the expected type
retrieved_objects = self.key_mgr.list(self.ctxt, **query_dict)
for retrieved_object in retrieved_objects:
self.assertEqual(type(object_1), type(retrieved_object))
self.assertIsNotNone(retrieved_object.id)
self.assertTrue(object_1 in retrieved_objects)