Add Dogtag crypto plugin.

This patch adds an initial version of the Dogtag plugin
that implements the decrypt(), encrypt() and generate()
functions.  In this version, we will create and initialize
an NSS database on the server to do crypto operations
within the Dogtag DRM python client.  This allows us to
decrypt() and encrypt() secrets without passing in wrapping
keys.

When the barbican-client and the Barbican server API have
been changed to pass through the wrapping keys, this plugin
will need to enhanced to make the relevant DRM client calls.

This plugin has been tested by running the barbican server
within a virtual environment.  There are several steps
needed to get things configured, which are detailed in the
blueprint.

Added unit tests.  These unit tests will be skipped if the
dogtag dependencies are not present to import.

Change-Id: Ib53bf3df5a65af7ac602ec7f7895e9c723c36c06
Implements: blueprint dogtag-plugin
This commit is contained in:
Ade Lee 2014-04-03 14:22:40 -04:00
parent e4e7ce18c9
commit 0465b85087
4 changed files with 383 additions and 0 deletions

View File

@ -0,0 +1,206 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import uuid
from oslo.config import cfg
import pki
from pki.client import PKIConnection
import pki.cryptoutil as cryptoutil
import pki.key as key
from pki.kraclient import KRAClient
from barbican.crypto import plugin
from barbican.openstack.common.gettextutils import _
CONF = cfg.CONF
dogtag_crypto_plugin_group = cfg.OptGroup(name='dogtag_crypto_plugin',
title="Dogtag Crypto Plugin Options")
dogtag_crypto_plugin_opts = [
cfg.StrOpt('pem_path',
default=None,
help=_('Path to PEM file for authentication')),
cfg.StrOpt('pem_password',
default=None,
help=_('Password to unlock PEM file')),
cfg.StrOpt('drm_host',
default="localhost",
help=_('Hostname for the DRM')),
cfg.StrOpt('drm_port',
default="8443",
help=_('Port for the DRM')),
cfg.StrOpt('nss_db_path',
default=None,
help=_('Path to the NSS certificate database')),
cfg.StrOpt('nss_password',
default=None,
help=_('Password for NSS certificate database'))
]
CONF.register_group(dogtag_crypto_plugin_group)
CONF.register_opts(dogtag_crypto_plugin_opts, group=dogtag_crypto_plugin_group)
class DogtagCryptoPlugin(plugin.CryptoPluginBase):
"""Dogtag implementation of the crypto plugin with DRM as the backend"""
TRANSPORT_NICK = "DRM transport cert"
def __init__(self, conf=CONF):
"""Constructor - create the keyclient"""
pem_path = conf.dogtag_crypto_plugin.pem_path
if pem_path is None:
raise ValueError(_("pem_path is required"))
pem_password = conf.dogtag_crypto_plugin.pem_password
if pem_password is None:
raise ValueError(_("pem_password is required"))
crypto = None
create_nss_db = False
nss_db_path = conf.dogtag_crypto_plugin.nss_db_path
if nss_db_path is not None:
nss_password = conf.dogtag_crypto_plugin.nss_password
if nss_password is None:
raise ValueError(_("nss_password is required"))
if not os.path.exists(nss_db_path):
create_nss_db = True
cryptoutil.NSSCryptoUtil.setup_database(
nss_db_path, nss_password, over_write=True)
crypto = cryptoutil.NSSCryptoUtil(nss_db_path, nss_password)
# set up connection
connection = PKIConnection('https',
conf.dogtag_crypto_plugin.drm_host,
conf.dogtag_crypto_plugin.drm_port,
'kra')
connection.set_authentication_cert(pem_path)
# what happened to the password?
# until we figure out how to pass the password to requests, we'll
# just use -nodes to create the admin cert pem file. Any required
# code will end up being in the DRM python client
#create kraclient
kraclient = KRAClient(connection, crypto)
self.keyclient = kraclient.keys
self.systemcert_client = kraclient.system_certs
if crypto is not None:
if create_nss_db:
# Get transport cert and insert in the certdb
transport_cert = self.systemcert_client.get_transport_cert()
tcert = transport_cert[
len(pki.CERT_HEADER):
len(transport_cert) - len(pki.CERT_FOOTER)]
crypto.import_cert(DogtagCryptoPlugin.TRANSPORT_NICK,
base64.decodestring(tcert), "u,u,u")
crypto.initialize()
self.keyclient.set_transport_cert(
DogtagCryptoPlugin.TRANSPORT_NICK)
def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id):
"""
Store a secret in the DRM
This will likely require another parameter which includes the wrapped
session key to be passed. Until that is added, we will call
archive_key() which relies on the DRM python client to create the
session keys.
We may also be able to be more specific in terms of the data_type
if we know that the data being stored is a symmetric key. Until
then, we need to assume that the secret is pass_phrase_type.
"""
data_type = key.KeyClient.PASS_PHRASE_TYPE
client_key_id = uuid.uuid4().hex
response = self.keyclient.archive_key(client_key_id,
data_type,
encrypt_dto.unencrypted,
key_algorithm=None,
key_size=None)
return response.get_key_id(), None
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
keystone_id):
"""
Retrieve a secret from the DRM
The encrypted parameter simply contains the plain text key_id by which
the secret is known to the DRM. The remaining parameters are not
used.
Note: There are two ways to retrieve secrets from the DRM.
The first, which is implemented here, will call retrieve_key without
a wrapping key. This relies on the DRM client to generate a wrapping
key (and wrap it with the DRM transport cert), and is completely
transparent to the Barbican server. What is returned to the caller
is the unencrypted secret.
The second way is to provide a wrapping key that ideally would be
generated on the barbican client. That way only the client will be
able to unwrap the secret. This is not yet implemented because
decrypt() and the barbican API still need to be changed to pass the
wrapping key.
"""
key_id = decrypt_dto.encrypted
key = self.keyclient.retrieve_key(key_id)
return key.data
def bind_kek_metadata(self, kek_meta_dto):
"""
This function is not used by this plugin
"""
return kek_meta_dto
def generate(self, generate_dto, kek_meta_dto, keystone_id):
"""
Generate a symmetric key
This calls generate_symmetric_key() on the DRM passing in the
algorithm, bit_length and id (used as the client_key_id) from
the secret. The remaining parameters are not used.
Returns a keyId which will be stored in an EncryptedDatum
table for later retrieval.
"""
usages = [key.SymKeyGenerationRequest.DECRYPT_USAGE,
key.SymKeyGenerationRequest.ENCRYPT_USAGE]
client_key_id = uuid.uuid4().hex
response = self.keyclient.generate_symmetric_key(
client_key_id, generate_dto.algorithm.upper(),
generate_dto.bit_length, usages)
return response.get_key_id(), None
def supports(self, type_enum, algorithm=None, mode=None):
"""
Specifies what operations the plugin supports
"""
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
return True
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
return True
else:
return False

View File

@ -0,0 +1,164 @@
# Copyright (c) 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import os
import tempfile
import testtools
try:
from barbican.crypto import plugin as plugin_import
from barbican.crypto.dogtag_crypto import DogtagCryptoPlugin
from barbican.model import models
imports_ok = True
except:
# dogtag imports probably not available
imports_ok = False
class WhenTestingDogtagCryptoPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingDogtagCryptoPlugin, self).setUp()
if not imports_ok:
return
self.keyclient_mock = mock.MagicMock(name="KeyClient mock")
self.patcher = mock.patch('pki.cryptoutil.NSSCryptoUtil')
self.patcher.start()
# create nss db for test only
self.nss_dir = tempfile.mkdtemp()
self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.dogtag_crypto_plugin = mock.MagicMock(
nss_db_path=self.nss_dir)
self.plugin = DogtagCryptoPlugin(self.cfg_mock)
self.plugin.keyclient = self.keyclient_mock
def tearDown(self):
super(WhenTestingDogtagCryptoPlugin, self).tearDown()
if not imports_ok:
return
self.patcher.stop()
os.rmdir(self.nss_dir)
def test_generate(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
secret = models.Secret()
secret.bit_length = 128
secret.algorithm = "AES"
generate_dto = plugin_import.GenerateDTO(
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
secret.algorithm,
secret.bit_length,
None)
_encrypted, _kek_ext = self.plugin.generate(
generate_dto,
mock.MagicMock(),
mock.MagicMock()
)
self.keyclient_mock.generate_symmetric_key.assert_called_once_with(
mock.ANY,
secret.algorithm.upper(),
secret.bit_length,
mock.ANY)
def test_raises_error_with_no_pem_path(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
m = mock.MagicMock()
m.dogtag_crypto_plugin = mock.MagicMock(pem_path=None)
self.assertRaises(
ValueError,
DogtagCryptoPlugin,
m,
)
def test_raises_error_with_no_pem_password(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
m = mock.MagicMock()
m.dogtag_crypto_plugin = mock.MagicMock(pem_password=None)
self.assertRaises(
ValueError,
DogtagCryptoPlugin,
m,
)
def test_raises_error_with_no_nss_password(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
m = mock.MagicMock()
m.dogtag_crypto_plugin = mock.MagicMock(nss_password=None)
self.assertRaises(
ValueError,
DogtagCryptoPlugin,
m,
)
def test_encrypt(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
payload = 'encrypt me!!'
encrypt_dto = plugin_import.EncryptDTO(payload)
_cyphertext, _kek_meta_extended = self.plugin.encrypt(encrypt_dto,
mock.MagicMock(),
mock.MagicMock())
self.keyclient_mock.archive_key.assert_called_once_with(
mock.ANY,
"passPhrase",
payload,
key_algorithm=None,
key_size=None)
def test_decrypt(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
key_id = 'key1'
decrypt_dto = plugin_import.DecryptDTO(key_id)
self.plugin.decrypt(decrypt_dto,
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock())
self.keyclient_mock.retrieve_key.assert_called_once_with(key_id)
def test_supports_encrypt_decrypt(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
self.assertTrue(
self.plugin.supports(
plugin_import.PluginSupportTypes.ENCRYPT_DECRYPT
)
)
def test_supports_symmetric_key_generation(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
self.assertTrue(
self.plugin.supports(
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
)
)
def test_does_not_support_unknown_type(self):
if not imports_ok:
self.skipTest("Dogtag imports not available")
self.assertFalse(
self.plugin.supports("SOMETHING_RANDOM")
)

View File

@ -150,3 +150,15 @@ version = '1.1'
# Server name for RPC service
server_name = 'barbican.queue'
# ================= Crypto plugin ===================
#[crypto]
#namespace = barbican.crypto.plugin
#enabled_crypto_plugins = dogtag_crypto
[dogtag_crypto_plugin]
pem_path = '/etc/barbican/drm_admin_cert.pem'
pem_password = 'password123'
drm_host = localhost
drm_port = 8443
nss_db_path = '/etc/barbican/alias'
nss_password = 'password123'

View File

@ -29,6 +29,7 @@ scripts =
barbican.crypto.plugin =
p11_crypto = barbican.crypto.p11_crypto:P11CryptoPlugin
simple_crypto = barbican.crypto.plugin:SimpleCryptoPlugin
dogtag_crypto = barbican.crypto.dogtag_crypto:DogtagCryptoPlugin
barbican.test.crypto.plugin =
test_crypto = barbican.tests.crypto.test_plugin:TestCryptoPlugin