Merge "Add Dogtag crypto plugin."
This commit is contained in:
206
barbican/crypto/dogtag_crypto.py
Normal file
206
barbican/crypto/dogtag_crypto.py
Normal file
@@ -0,0 +1,206 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import base64
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
import pki
|
||||
from pki.client import PKIConnection
|
||||
import pki.cryptoutil as cryptoutil
|
||||
import pki.key as key
|
||||
from pki.kraclient import KRAClient
|
||||
|
||||
from barbican.crypto import plugin
|
||||
from barbican.openstack.common.gettextutils import _
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
dogtag_crypto_plugin_group = cfg.OptGroup(name='dogtag_crypto_plugin',
|
||||
title="Dogtag Crypto Plugin Options")
|
||||
dogtag_crypto_plugin_opts = [
|
||||
cfg.StrOpt('pem_path',
|
||||
default=None,
|
||||
help=_('Path to PEM file for authentication')),
|
||||
cfg.StrOpt('pem_password',
|
||||
default=None,
|
||||
help=_('Password to unlock PEM file')),
|
||||
cfg.StrOpt('drm_host',
|
||||
default="localhost",
|
||||
help=_('Hostname for the DRM')),
|
||||
cfg.StrOpt('drm_port',
|
||||
default="8443",
|
||||
help=_('Port for the DRM')),
|
||||
cfg.StrOpt('nss_db_path',
|
||||
default=None,
|
||||
help=_('Path to the NSS certificate database')),
|
||||
cfg.StrOpt('nss_password',
|
||||
default=None,
|
||||
help=_('Password for NSS certificate database'))
|
||||
]
|
||||
|
||||
CONF.register_group(dogtag_crypto_plugin_group)
|
||||
CONF.register_opts(dogtag_crypto_plugin_opts, group=dogtag_crypto_plugin_group)
|
||||
|
||||
|
||||
class DogtagCryptoPlugin(plugin.CryptoPluginBase):
|
||||
"""Dogtag implementation of the crypto plugin with DRM as the backend"""
|
||||
|
||||
TRANSPORT_NICK = "DRM transport cert"
|
||||
|
||||
def __init__(self, conf=CONF):
|
||||
"""Constructor - create the keyclient"""
|
||||
pem_path = conf.dogtag_crypto_plugin.pem_path
|
||||
if pem_path is None:
|
||||
raise ValueError(_("pem_path is required"))
|
||||
|
||||
pem_password = conf.dogtag_crypto_plugin.pem_password
|
||||
if pem_password is None:
|
||||
raise ValueError(_("pem_password is required"))
|
||||
|
||||
crypto = None
|
||||
create_nss_db = False
|
||||
|
||||
nss_db_path = conf.dogtag_crypto_plugin.nss_db_path
|
||||
if nss_db_path is not None:
|
||||
nss_password = conf.dogtag_crypto_plugin.nss_password
|
||||
if nss_password is None:
|
||||
raise ValueError(_("nss_password is required"))
|
||||
|
||||
if not os.path.exists(nss_db_path):
|
||||
create_nss_db = True
|
||||
cryptoutil.NSSCryptoUtil.setup_database(
|
||||
nss_db_path, nss_password, over_write=True)
|
||||
|
||||
crypto = cryptoutil.NSSCryptoUtil(nss_db_path, nss_password)
|
||||
|
||||
# set up connection
|
||||
connection = PKIConnection('https',
|
||||
conf.dogtag_crypto_plugin.drm_host,
|
||||
conf.dogtag_crypto_plugin.drm_port,
|
||||
'kra')
|
||||
connection.set_authentication_cert(pem_path)
|
||||
|
||||
# what happened to the password?
|
||||
# until we figure out how to pass the password to requests, we'll
|
||||
# just use -nodes to create the admin cert pem file. Any required
|
||||
# code will end up being in the DRM python client
|
||||
|
||||
#create kraclient
|
||||
kraclient = KRAClient(connection, crypto)
|
||||
self.keyclient = kraclient.keys
|
||||
self.systemcert_client = kraclient.system_certs
|
||||
|
||||
if crypto is not None:
|
||||
if create_nss_db:
|
||||
# Get transport cert and insert in the certdb
|
||||
transport_cert = self.systemcert_client.get_transport_cert()
|
||||
tcert = transport_cert[
|
||||
len(pki.CERT_HEADER):
|
||||
len(transport_cert) - len(pki.CERT_FOOTER)]
|
||||
crypto.import_cert(DogtagCryptoPlugin.TRANSPORT_NICK,
|
||||
base64.decodestring(tcert), "u,u,u")
|
||||
|
||||
crypto.initialize()
|
||||
self.keyclient.set_transport_cert(
|
||||
DogtagCryptoPlugin.TRANSPORT_NICK)
|
||||
|
||||
def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id):
|
||||
"""
|
||||
Store a secret in the DRM
|
||||
|
||||
This will likely require another parameter which includes the wrapped
|
||||
session key to be passed. Until that is added, we will call
|
||||
archive_key() which relies on the DRM python client to create the
|
||||
session keys.
|
||||
|
||||
We may also be able to be more specific in terms of the data_type
|
||||
if we know that the data being stored is a symmetric key. Until
|
||||
then, we need to assume that the secret is pass_phrase_type.
|
||||
"""
|
||||
data_type = key.KeyClient.PASS_PHRASE_TYPE
|
||||
client_key_id = uuid.uuid4().hex
|
||||
response = self.keyclient.archive_key(client_key_id,
|
||||
data_type,
|
||||
encrypt_dto.unencrypted,
|
||||
key_algorithm=None,
|
||||
key_size=None)
|
||||
return response.get_key_id(), None
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
keystone_id):
|
||||
"""
|
||||
Retrieve a secret from the DRM
|
||||
|
||||
The encrypted parameter simply contains the plain text key_id by which
|
||||
the secret is known to the DRM. The remaining parameters are not
|
||||
used.
|
||||
|
||||
Note: There are two ways to retrieve secrets from the DRM.
|
||||
|
||||
The first, which is implemented here, will call retrieve_key without
|
||||
a wrapping key. This relies on the DRM client to generate a wrapping
|
||||
key (and wrap it with the DRM transport cert), and is completely
|
||||
transparent to the Barbican server. What is returned to the caller
|
||||
is the unencrypted secret.
|
||||
|
||||
The second way is to provide a wrapping key that ideally would be
|
||||
generated on the barbican client. That way only the client will be
|
||||
able to unwrap the secret. This is not yet implemented because
|
||||
decrypt() and the barbican API still need to be changed to pass the
|
||||
wrapping key.
|
||||
"""
|
||||
key_id = decrypt_dto.encrypted
|
||||
key = self.keyclient.retrieve_key(key_id)
|
||||
return key.data
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
"""
|
||||
This function is not used by this plugin
|
||||
"""
|
||||
return kek_meta_dto
|
||||
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
"""
|
||||
Generate a symmetric key
|
||||
|
||||
This calls generate_symmetric_key() on the DRM passing in the
|
||||
algorithm, bit_length and id (used as the client_key_id) from
|
||||
the secret. The remaining parameters are not used.
|
||||
|
||||
Returns a keyId which will be stored in an EncryptedDatum
|
||||
table for later retrieval.
|
||||
"""
|
||||
|
||||
usages = [key.SymKeyGenerationRequest.DECRYPT_USAGE,
|
||||
key.SymKeyGenerationRequest.ENCRYPT_USAGE]
|
||||
|
||||
client_key_id = uuid.uuid4().hex
|
||||
response = self.keyclient.generate_symmetric_key(
|
||||
client_key_id, generate_dto.algorithm.upper(),
|
||||
generate_dto.bit_length, usages)
|
||||
return response.get_key_id(), None
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
"""
|
||||
Specifies what operations the plugin supports
|
||||
"""
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
164
barbican/tests/crypto/test_dogtag_crypto.py
Normal file
164
barbican/tests/crypto/test_dogtag_crypto.py
Normal file
@@ -0,0 +1,164 @@
|
||||
# Copyright (c) 2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
import tempfile
|
||||
import testtools
|
||||
|
||||
try:
|
||||
from barbican.crypto import plugin as plugin_import
|
||||
from barbican.crypto.dogtag_crypto import DogtagCryptoPlugin
|
||||
from barbican.model import models
|
||||
imports_ok = True
|
||||
except:
|
||||
# dogtag imports probably not available
|
||||
imports_ok = False
|
||||
|
||||
|
||||
class WhenTestingDogtagCryptoPlugin(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingDogtagCryptoPlugin, self).setUp()
|
||||
if not imports_ok:
|
||||
return
|
||||
|
||||
self.keyclient_mock = mock.MagicMock(name="KeyClient mock")
|
||||
self.patcher = mock.patch('pki.cryptoutil.NSSCryptoUtil')
|
||||
self.patcher.start()
|
||||
|
||||
# create nss db for test only
|
||||
self.nss_dir = tempfile.mkdtemp()
|
||||
|
||||
self.cfg_mock = mock.MagicMock(name='config mock')
|
||||
self.cfg_mock.dogtag_crypto_plugin = mock.MagicMock(
|
||||
nss_db_path=self.nss_dir)
|
||||
self.plugin = DogtagCryptoPlugin(self.cfg_mock)
|
||||
self.plugin.keyclient = self.keyclient_mock
|
||||
|
||||
def tearDown(self):
|
||||
super(WhenTestingDogtagCryptoPlugin, self).tearDown()
|
||||
if not imports_ok:
|
||||
return
|
||||
self.patcher.stop()
|
||||
os.rmdir(self.nss_dir)
|
||||
|
||||
def test_generate(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
secret = models.Secret()
|
||||
secret.bit_length = 128
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin_import.GenerateDTO(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
_encrypted, _kek_ext = self.plugin.generate(
|
||||
generate_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
)
|
||||
|
||||
self.keyclient_mock.generate_symmetric_key.assert_called_once_with(
|
||||
mock.ANY,
|
||||
secret.algorithm.upper(),
|
||||
secret.bit_length,
|
||||
mock.ANY)
|
||||
|
||||
def test_raises_error_with_no_pem_path(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
m = mock.MagicMock()
|
||||
m.dogtag_crypto_plugin = mock.MagicMock(pem_path=None)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
DogtagCryptoPlugin,
|
||||
m,
|
||||
)
|
||||
|
||||
def test_raises_error_with_no_pem_password(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
m = mock.MagicMock()
|
||||
m.dogtag_crypto_plugin = mock.MagicMock(pem_password=None)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
DogtagCryptoPlugin,
|
||||
m,
|
||||
)
|
||||
|
||||
def test_raises_error_with_no_nss_password(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
m = mock.MagicMock()
|
||||
m.dogtag_crypto_plugin = mock.MagicMock(nss_password=None)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
DogtagCryptoPlugin,
|
||||
m,
|
||||
)
|
||||
|
||||
def test_encrypt(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
payload = 'encrypt me!!'
|
||||
encrypt_dto = plugin_import.EncryptDTO(payload)
|
||||
_cyphertext, _kek_meta_extended = self.plugin.encrypt(encrypt_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
self.keyclient_mock.archive_key.assert_called_once_with(
|
||||
mock.ANY,
|
||||
"passPhrase",
|
||||
payload,
|
||||
key_algorithm=None,
|
||||
key_size=None)
|
||||
|
||||
def test_decrypt(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
key_id = 'key1'
|
||||
decrypt_dto = plugin_import.DecryptDTO(key_id)
|
||||
self.plugin.decrypt(decrypt_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
self.keyclient_mock.retrieve_key.assert_called_once_with(key_id)
|
||||
|
||||
def test_supports_encrypt_decrypt(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin_import.PluginSupportTypes.ENCRYPT_DECRYPT
|
||||
)
|
||||
)
|
||||
|
||||
def test_supports_symmetric_key_generation(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
|
||||
)
|
||||
)
|
||||
|
||||
def test_does_not_support_unknown_type(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
self.assertFalse(
|
||||
self.plugin.supports("SOMETHING_RANDOM")
|
||||
)
|
||||
@@ -150,3 +150,15 @@ version = '1.1'
|
||||
# Server name for RPC service
|
||||
server_name = 'barbican.queue'
|
||||
|
||||
# ================= Crypto plugin ===================
|
||||
#[crypto]
|
||||
#namespace = barbican.crypto.plugin
|
||||
#enabled_crypto_plugins = dogtag_crypto
|
||||
|
||||
[dogtag_crypto_plugin]
|
||||
pem_path = '/etc/barbican/drm_admin_cert.pem'
|
||||
pem_password = 'password123'
|
||||
drm_host = localhost
|
||||
drm_port = 8443
|
||||
nss_db_path = '/etc/barbican/alias'
|
||||
nss_password = 'password123'
|
||||
|
||||
@@ -29,6 +29,7 @@ scripts =
|
||||
barbican.crypto.plugin =
|
||||
p11_crypto = barbican.crypto.p11_crypto:P11CryptoPlugin
|
||||
simple_crypto = barbican.crypto.plugin:SimpleCryptoPlugin
|
||||
dogtag_crypto = barbican.crypto.dogtag_crypto:DogtagCryptoPlugin
|
||||
barbican.test.crypto.plugin =
|
||||
test_crypto = barbican.tests.crypto.test_plugin:TestCryptoPlugin
|
||||
|
||||
|
||||
Reference in New Issue
Block a user