Merge "Add asymmetric key generation in dogtag plugin"

This commit is contained in:
Jenkins 2014-09-24 17:19:57 +00:00 committed by Gerrit Code Review
commit b6e29f5386
3 changed files with 347 additions and 40 deletions

View File

@ -315,7 +315,6 @@ class TypeOrderValidator(ValidatorBase):
self._assert_validity(asymmetric_meta is not None,
schema_name,
u._("'meta' attributes is required"), "meta")
self._raise_feature_not_implemented('asymmetric', schema_name)
def _validate_certificate_meta(self, certificate_meta, schema_name):
"""Validation specific to meta for certificate type order."""

View File

@ -16,7 +16,10 @@
import os
import uuid
from Crypto.PublicKey import RSA
from Crypto.Util import asn1
from oslo.config import cfg
import pki
import pki.cert
import pki.client
@ -27,11 +30,13 @@ import pki.profile
from requests import exceptions as request_exceptions
from barbican.common import exception
from barbican.common import utils
from barbican.openstack.common import gettextutils as u
import barbican.plugin.interface.certificate_manager as cm
import barbican.plugin.interface.secret_store as sstore
CONF = cfg.CONF
LOG = utils.getLogger(__name__)
dogtag_plugin_group = cfg.OptGroup(name='dogtag_plugin',
title="Dogtag Plugin Options")
@ -90,18 +95,41 @@ class DogtagPluginAlgorithmException(exception.BarbicanException):
message = u._("Invalid algorithm passed in")
class DogtagPluginNotSupportedException(exception.NotSupported):
message = u._("Operation not supported by Dogtag Plugin")
def __init__(self, msg=None):
if not msg:
message = self.message
else:
message = msg
super(DogtagPluginNotSupportedException, self).__init__(message)
class DogtagKRAPlugin(sstore.SecretStoreBase):
"""Implementation of the secret store plugin with KRA as the backend."""
TRANSPORT_NICK = "KRA transport cert"
# metadata constants
ALG = "alg"
BIT_LENGTH = "bit_length"
KEY_ID = "key_id"
SECRET_TYPE = "secret_type"
SECRET_KEYSPEC = "secret_keyspec"
SECRET_MODE = "secret_mode"
PASSPHRASE_KEY_ID = "passphrase_key_id"
CONVERT_TO_PEM = "convert_to_pem"
# string constants
DSA_PRIVATE_KEY_HEADER = '-----BEGIN DSA PRIVATE KEY-----'
DSA_PRIVATE_KEY_FOOTER = '-----END DSA PRIVATE KEY-----'
DSA_PUBLIC_KEY_HEADER = '-----BEGIN DSA PUBLIC KEY-----'
DSA_PUBLIC_KEY_FOOTER = '-----END DSA PUBLIC KEY-----'
def __init__(self, conf=CONF):
"""Constructor - create the keyclient."""
LOG.debug("starting DogtagKRAPlugin init")
crypto, create_nss_db = setup_nss_db(conf)
connection = create_connection(conf, 'kra')
@ -118,6 +146,8 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
self.keyclient.set_transport_cert(
DogtagKRAPlugin.TRANSPORT_NICK)
LOG.debug("completed DogtagKRAPlugin init")
def import_transport_cert(self, crypto):
# Get transport cert and insert in the certdb
transport_cert = self.systemcert_client.get_transport_cert()
@ -163,9 +193,10 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
key_algorithm=None,
key_size=None)
return {DogtagKRAPlugin.SECRET_TYPE: secret_dto.type,
DogtagKRAPlugin.SECRET_KEYSPEC: secret_dto.key_spec,
DogtagKRAPlugin.KEY_ID: response.get_key_id()}
meta_dict = {DogtagKRAPlugin.KEY_ID: response.get_key_id()}
self._store_secret_attributes(meta_dict, secret_dto)
return meta_dict
def get_secret(self, secret_metadata):
"""Retrieve a secret from the KRA
@ -185,22 +216,111 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
on the barbican client. That way only the client will be
able to unwrap the secret. This wrapping key is provided in the
secret_metadata by Barbican core.
Format/Type of the secret returned in the SecretDTO object.
-----------------------------------------------------------
The type of the secret returned is always dependent on the way it is
stored using the store_secret method.
In case of strings - like passphrase/PEM strings, the return will be a
string.
In case of binary data - the return will be the actual binary data.
In case of retrieving an asymmetric key that is generated using the
dogtag plugin, then the binary representation of, the asymmetric key in
PEM format, is returned
"""
key_id = secret_metadata[DogtagKRAPlugin.KEY_ID]
twsk = None
if 'trans_wrapped_session_key' in secret_metadata:
twsk = secret_metadata['trans_wrapped_session_key']
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
# TODO(alee-3) send transport key as well when dogtag client API
# changes in case the transport key has changed.
recovered_key = self.keyclient.retrieve_key(key_id, twsk)
key_spec = sstore.KeySpec(
alg=secret_metadata.get(DogtagKRAPlugin.ALG, None),
bit_length=secret_metadata.get(DogtagKRAPlugin.BIT_LENGTH, None),
mode=secret_metadata.get(DogtagKRAPlugin.SECRET_MODE, None),
passphrase=None
)
passphrase = self._get_passphrase_for_a_private_key(
secret_metadata, key_spec)
recovered_key = None
twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_metadata)
if DogtagKRAPlugin.CONVERT_TO_PEM in secret_metadata:
# Case for returning the asymmetric keys generated in KRA.
# Asymmetric keys generated in KRA are not generated in PEM format.
# This marker DogtagKRAPlugin.CONVERT_TO_PEM is set in the
# secret_metadata for asymmetric keys generated in KRA to
# help convert the returned private/public keys to PEM format and
# eventually return the binary data of the keys in PEM format.
if secret_type == sstore.SecretType.PUBLIC:
# Public key should be retrieved using the get_key_info method
# as it is treated as an attribute of the asymmetric key pair
# stored in the KRA database.
if key_spec.alg is None:
raise sstore.SecretAlgorithmNotSupportedException('None')
key_info = self.keyclient.get_key_info(key_id)
if key_spec.alg.upper() == key.KeyClient.RSA_ALGORITHM:
recovered_key = (RSA.importKey(key_info.public_key)
.publickey()
.exportKey('PEM')).encode('utf-8')
elif key_spec.alg.upper() == key.KeyClient.DSA_ALGORITHM:
pub_seq = asn1.DerSequence()
pub_seq[:] = key_info.public_key
recovered_key = (
("%s\n%s%s" %
(DogtagKRAPlugin.DSA_PUBLIC_KEY_HEADER,
pub_seq.encode().encode("base64"),
DogtagKRAPlugin.DSA_PUBLIC_KEY_FOOTER)
).encode('utf-8')
)
else:
raise sstore.SecretAlgorithmNotSupportedException(
key_spec.alg.upper()
)
elif secret_type == sstore.SecretType.PRIVATE:
key_data = self.keyclient.retrieve_key(key_id)
if key_spec.alg.upper() == key.KeyClient.RSA_ALGORITHM:
recovered_key = (
(RSA.importKey(key_data.data)
.exportKey('PEM', passphrase))
.encode('utf-8')
)
elif key_spec.alg.upper() == key.KeyClient.DSA_ALGORITHM:
pub_seq = asn1.DerSequence()
pub_seq[:] = key_data.data
recovered_key = (
("%s\n%s%s" %
(DogtagKRAPlugin.DSA_PRIVATE_KEY_HEADER,
pub_seq.encode().encode("base64"),
DogtagKRAPlugin.DSA_PRIVATE_KEY_FOOTER)
).encode('utf-8')
)
else:
raise sstore.SecretAlgorithmNotSupportedException(
key_spec.alg.upper()
)
else:
# TODO(alee-3) send transport key as well when dogtag client API
# changes in case the transport key has changed.
key_data = self.keyclient.retrieve_key(key_id, twsk)
if twsk:
# The data returned is a byte array.
recovered_key = key_data.encrypted_data
else:
recovered_key = key_data.data
# TODO(alee) remove final field when content_type is removed
# from secret_dto
ret = sstore.SecretDTO(
type=secret_metadata[DogtagKRAPlugin.SECRET_TYPE],
type=secret_type,
secret=recovered_key,
key_spec=secret_metadata[DogtagKRAPlugin.SECRET_KEYSPEC],
key_spec=key_spec,
content_type=None,
transport_key=None)
@ -232,20 +352,84 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
if algorithm is None:
raise DogtagPluginAlgorithmException
passphrase = key_spec.passphrase
if passphrase:
raise DogtagPluginNotSupportedException(
"Passphrase encryption is not supported for symmetric"
" key generating algorithms.")
response = self.keyclient.generate_symmetric_key(
client_key_id,
algorithm,
key_spec.bit_length,
usages)
return {DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
return {DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_MODE: key_spec.mode,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC,
DogtagKRAPlugin.KEY_ID: response.get_key_id()}
def generate_asymmetric_key(self, key_spec):
"""Generate an asymmetric key."""
raise NotImplementedError(
"Feature not yet implemented by dogtag plugin")
usages = [key.AsymKeyGenerationRequest.DECRYPT_USAGE,
key.AsymKeyGenerationRequest.ENCRYPT_USAGE]
client_key_id = uuid.uuid4().hex
algorithm = self._map_algorithm(key_spec.alg.lower())
passphrase = key_spec.passphrase
if algorithm is None:
raise DogtagPluginAlgorithmException
passphrase_key_id = None
passphrase_metadata = None
if passphrase:
if algorithm == key.KeyClient.DSA_ALGORITHM:
raise DogtagPluginNotSupportedException("Passphrase encryption"
" is not supported for"
" DSA algorithm")
stored_passphrase_info = self.keyclient.archive_key(
uuid.uuid4().hex,
self.keyclient.PASS_PHRASE_TYPE,
passphrase)
passphrase_key_id = stored_passphrase_info.get_key_id()
passphrase_metadata = {
DogtagKRAPlugin.KEY_ID: passphrase_key_id
}
response = self.keyclient.generate_asymmetric_key(
client_key_id,
algorithm,
key_spec.bit_length,
usages)
public_key_metadata = {
DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PUBLIC,
DogtagKRAPlugin.KEY_ID: response.get_key_id(),
DogtagKRAPlugin.CONVERT_TO_PEM: "true"
}
private_key_metadata = {
DogtagKRAPlugin.ALG: key_spec.alg,
DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.PRIVATE,
DogtagKRAPlugin.KEY_ID: response.get_key_id(),
DogtagKRAPlugin.CONVERT_TO_PEM: "true"
}
if passphrase_key_id:
private_key_metadata[DogtagKRAPlugin.PASSPHRASE_KEY_ID] = (
passphrase_key_id
)
return sstore.AsymmetricKeyMetadataDTO(private_key_metadata,
public_key_metadata,
passphrase_metadata)
def generate_supports(self, key_spec):
"""Key generation supported?
@ -279,21 +463,74 @@ class DogtagKRAPlugin(sstore.SecretStoreBase):
return key.KeyClient.DES_ALGORITHM
elif algorithm == sstore.KeyAlgorithm.DESEDE:
return key.KeyClient.DES3_ALGORITHM
elif algorithm == sstore.KeyAlgorithm.DIFFIE_HELLMAN:
# may be supported, needs to be tested
return None
elif algorithm == sstore.KeyAlgorithm.DSA:
return key.KeyClient.DSA_ALGORITHM
elif algorithm == sstore.KeyAlgorithm.RSA:
return key.KeyClient.RSA_ALGORITHM
elif algorithm == sstore.KeyAlgorithm.DIFFIE_HELLMAN:
# may be supported, needs to be tested
return None
elif algorithm == sstore.KeyAlgorithm.EC:
# asymmetric keys not yet supported
return None
elif algorithm == sstore.KeyAlgorithm.RSA:
# asymmetric keys not yet supported
return None
else:
return None
@staticmethod
def _store_secret_attributes(meta_dict, secret_dto):
# store the following attributes for retrieval
key_spec = secret_dto.key_spec
if key_spec.alg is not None:
meta_dict[DogtagKRAPlugin.ALG] = key_spec.alg
if key_spec.bit_length is not None:
meta_dict[DogtagKRAPlugin.BIT_LENGTH] = key_spec.bit_length
if key_spec.mode is not None:
meta_dict[DogtagKRAPlugin.SECRET_MODE] = key_spec.mode
if secret_dto.type is not None:
meta_dict[DogtagKRAPlugin.SECRET_TYPE] = secret_dto, type
def _get_passphrase_for_a_private_key(self, secret_metadata, key_spec):
"""Retrieve the passphrase for the private key which is stored
in the KRA.
"""
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
if secret_type is None:
return None
if key_spec.alg is None:
return None
passphrase = None
if DogtagKRAPlugin.PASSPHRASE_KEY_ID in secret_metadata:
if key_spec.alg.upper() == key.KeyClient.RSA_ALGORITHM:
passphrase = self.keyclient.retrieve_key(
secret_metadata.get(DogtagKRAPlugin.PASSPHRASE_KEY_ID)
).data
else:
if key_spec.alg.upper() == key.KeyClient.DSA_ALGORITHM:
raise sstore.SecretGeneralException(
"DSA keys should not have a passphrase in the"
" database, for being used during retrieval."
)
raise sstore.SecretGeneralException(
"Secrets of type " + secret_type +
" should not have a passphrase in the database, "
"for being used during retrieval."
)
return passphrase
@staticmethod
def _get_trans_wrapped_session_key(secret_metadata):
twsk = secret_metadata.get('trans_wrapped_session_key', None)
secret_type = secret_metadata.get(DogtagKRAPlugin.SECRET_TYPE, None)
if secret_type in [sstore.SecretType.PUBLIC,
sstore.SecretType.PRIVATE]:
if twsk:
raise DogtagPluginNotSupportedException(
"Encryption using session key is not supported when "
"retrieving a " + secret_type + " key.")
return twsk
def _catch_request_exception(ca_related_function):
def _catch_ca_unavailable(self, *args, **kwargs):
@ -302,6 +539,7 @@ def _catch_request_exception(ca_related_function):
except request_exceptions.RequestException:
return cm.ResultDTO(
cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST)
return _catch_ca_unavailable

View File

@ -16,6 +16,7 @@
import os
import tempfile
from Crypto.PublicKey import RSA
import mock
from requests import exceptions as request_exceptions
import testtools
@ -29,6 +30,7 @@ try:
import pki
import pki.cert as dogtag_cert
import pki.key as dogtag_key
imports_ok = True
except ImportError:
# dogtag imports probably not available
@ -58,10 +60,9 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
self.patcher.stop()
os.rmdir(self.nss_dir)
def test_generate(self):
def test_generate_symmetric_key(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.AES, 128)
context = mock.MagicMock()
self.plugin.generate_symmetric_key(key_spec, context)
self.plugin.generate_symmetric_key(key_spec)
self.keyclient_mock.generate_symmetric_key.assert_called_once_with(
mock.ANY,
@ -69,14 +70,22 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
128,
mock.ANY)
def test_generate_asymmetric_key(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.RSA, 2048)
self.plugin.generate_asymmetric_key(key_spec)
self.keyclient_mock.generate_asymmetric_key.assert_called_once_with(
mock.ANY,
sstore.KeyAlgorithm.RSA.upper(),
2048,
mock.ANY)
def test_generate_non_supported_algorithm(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.EC, 192)
context = mock.MagicMock()
self.assertRaises(
dogtag_import.DogtagPluginAlgorithmException,
self.plugin.generate_symmetric_key,
key_spec,
context
key_spec
)
def test_raises_error_with_no_pem_path(self):
@ -101,14 +110,13 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
payload = 'encrypt me!!'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
context = mock.MagicMock()
transport_key = None
secret_dto = sstore.SecretDTO(sstore.SecretType.SYMMETRIC,
payload,
key_spec,
content_type,
transport_key)
self.plugin.store_secret(secret_dto, context)
self.plugin.store_secret(secret_dto)
self.keyclient_mock.archive_key.assert_called_once_with(
mock.ANY,
"passPhrase",
@ -120,14 +128,13 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
payload = 'data wrapped in PKIArchiveOptions object'
key_spec = mock.MagicMock()
content_type = mock.MagicMock()
context = mock.MagicMock()
transport_key = mock.MagicMock()
secret_dto = sstore.SecretDTO(sstore.SecretType.SYMMETRIC,
payload,
key_spec,
content_type,
transport_key)
self.plugin.store_secret(secret_dto, context)
self.plugin.store_secret(secret_dto)
self.keyclient_mock.archive_pki_options.assert_called_once_with(
mock.ANY,
"passPhrase",
@ -136,39 +143,102 @@ class WhenTestingDogtagKRAPlugin(utils.BaseTestCase):
key_size=None)
def test_get_secret(self):
key_spec = mock.MagicMock()
context = mock.MagicMock()
secret_metadata = {
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.SYMMETRIC,
dogtag_import.DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagKRAPlugin.ALG: sstore.KeyAlgorithm.AES,
dogtag_import.DogtagKRAPlugin.BIT_LENGTH: 256,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1'
}
self.plugin.get_secret(secret_metadata, context)
self.plugin.get_secret(secret_metadata)
self.keyclient_mock.retrieve_key.assert_called_once_with('key1', None)
def test_get_secret_with_twsk(self):
key_spec = mock.MagicMock()
context = mock.MagicMock()
twsk = mock.MagicMock()
secret_metadata = {
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.SYMMETRIC,
dogtag_import.DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagKRAPlugin.ALG: sstore.KeyAlgorithm.AES,
dogtag_import.DogtagKRAPlugin.BIT_LENGTH: 256,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1',
'trans_wrapped_session_key': twsk
}
self.plugin.get_secret(secret_metadata, context)
self.plugin.get_secret(secret_metadata)
self.keyclient_mock.retrieve_key.assert_called_once_with('key1', twsk)
def test_get_private_key(self):
test_key = RSA.generate(2048)
key_data = dogtag_key.KeyData()
key_data.data = test_key.exportKey('DER')
self.keyclient_mock.retrieve_key.return_value = key_data
secret_metadata = {
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.PRIVATE,
dogtag_import.DogtagKRAPlugin.ALG: sstore.KeyAlgorithm.RSA,
dogtag_import.DogtagKRAPlugin.BIT_LENGTH: 2048,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1',
dogtag_import.DogtagKRAPlugin.CONVERT_TO_PEM: 'true'
}
result = self.plugin.get_secret(secret_metadata)
assert result.secret == test_key.exportKey('PEM').encode('utf-8')
def test_get_public_key(self):
test_public_key = RSA.generate(2048).publickey()
key_info = dogtag_key.KeyInfo()
key_info.public_key = test_public_key.exportKey('DER')
self.keyclient_mock.get_key_info.return_value = key_info
secret_metadata = {
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.PUBLIC,
dogtag_import.DogtagKRAPlugin.ALG: sstore.KeyAlgorithm.RSA,
dogtag_import.DogtagKRAPlugin.BIT_LENGTH: 2048,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1',
dogtag_import.DogtagKRAPlugin.CONVERT_TO_PEM: 'true'
}
result = self.plugin.get_secret(secret_metadata)
assert result.secret == (test_public_key.exportKey('PEM')
.encode('utf-8'))
def test_store_passphrase_for_using_in_private_key_retrieval(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.RSA, 2048,
passphrase="password123")
# Mock the response for passphrase archival
request_response = dogtag_key.KeyRequestResponse()
request_info = dogtag_key.KeyRequestInfo()
request_info.key_url = "https://example_url/1"
request_response.request_info = request_info
self.keyclient_mock.archive_key.return_value = request_response
asym_key_DTO = self.plugin.generate_asymmetric_key(key_spec)
assert asym_key_DTO.private_key_meta[
dogtag_import.DogtagKRAPlugin.PASSPHRASE_KEY_ID
] == '1'
self.keyclient_mock.generate_asymmetric_key.assert_called_once_with(
mock.ANY,
sstore.KeyAlgorithm.RSA.upper(),
2048,
mock.ANY)
def test_supports_symmetric_aes_key_generation(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.AES, 256)
self.assertTrue(
self.plugin.generate_supports(key_spec)
)
def test_supports_asymmetric_rsa_key_generation(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.RSA, 2048)
self.assertTrue(
self.plugin.generate_supports(key_spec)
)
def test_supports_asymmetric_ec_key_generation(self):
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.EC, 156)
self.assertFalse(