Add certificate_manager functionality to dogtag plugin

Also included in this patch are changes to change the name of the
DRM to KRA, and changes to reflect the latest Dogtag API.

Implements: blueprint orders-add-cert-workflow-plugin
Change-Id: I34cb0b70f2c5c06d91ef69e486aa5d03f0b945e6
This commit is contained in:
Ade Lee 2014-08-15 16:57:50 -04:00
parent b293d11c56
commit bd6fe53de6
6 changed files with 907 additions and 112 deletions

View File

@ -877,11 +877,11 @@ msgid "Password to unlock PEM file"
msgstr ""
#: barbican/plugin/dogtag.py:42
msgid "Hostname for the DRM"
msgid "Hostname for the KRA"
msgstr ""
#: barbican/plugin/dogtag.py:45
msgid "Port for the DRM"
msgid "Port for the KRA"
msgstr ""
#: barbican/plugin/dogtag.py:47

View File

@ -13,19 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import uuid
from oslo.config import cfg
import pki
import pki.cert
import pki.client
import pki.cryptoutil as cryptoutil
import pki.crypto as cryptoutil
import pki.key as key
import pki.kraclient
import pki.kra
import pki.profile
from requests import exceptions as request_exceptions
from barbican.common import exception
from barbican.openstack.common import gettextutils as u
import barbican.plugin.interface.certificate_manager as cm
import barbican.plugin.interface.secret_store as sstore
CONF = cfg.CONF
@ -35,14 +38,12 @@ dogtag_plugin_group = cfg.OptGroup(name='dogtag_plugin',
dogtag_plugin_opts = [
cfg.StrOpt('pem_path',
help=u._('Path to PEM file for authentication')),
cfg.StrOpt('pem_password',
help=u._('Password to unlock PEM file')),
cfg.StrOpt('drm_host',
cfg.StrOpt('dogtag_host',
default="localhost",
help=u._('Hostname for the DRM')),
cfg.StrOpt('drm_port',
help=u._('Hostname for the Dogtag instance')),
cfg.StrOpt('dogtag_port',
default="8443",
help=u._('Port for the DRM')),
help=u._('Port for the Dogtag instance')),
cfg.StrOpt('nss_db_path',
help=u._('Path to the NSS certificate database')),
cfg.StrOpt('nss_password',
@ -53,14 +54,46 @@ CONF.register_group(dogtag_plugin_group)
CONF.register_opts(dogtag_plugin_opts, group=dogtag_plugin_group)
def setup_nss_db(conf):
crypto = None
create_nss_db = False
nss_db_path = conf.dogtag_plugin.nss_db_path
if nss_db_path is not None:
nss_password = conf.dogtag_plugin.nss_password
if nss_password is None:
raise ValueError(u._("nss_password is required"))
if not os.path.exists(nss_db_path):
create_nss_db = True
cryptoutil.NSSCryptoProvider.setup_database(
nss_db_path, nss_password, over_write=True)
crypto = cryptoutil.NSSCryptoProvider(nss_db_path, nss_password)
return crypto, create_nss_db
def create_connection(conf, subsystem_path):
pem_path = conf.dogtag_plugin.pem_path
if pem_path is None:
raise ValueError(u._("pem_path is required"))
connection = pki.client.PKIConnection(
'https',
conf.dogtag_plugin.dogtag_host,
conf.dogtag_plugin.dogtag_port,
subsystem_path)
connection.set_authentication_cert(pem_path)
return connection
class DogtagPluginAlgorithmException(exception.BarbicanException):
message = u._("Invalid algorithm passed in")
class DogtagPlugin(sstore.SecretStoreBase):
"""Implementation of the secret store plugin with DRM as the backend."""
class DogtagKRAPlugin(sstore.SecretStoreBase):
"""Implementation of the secret store plugin with KRA as the backend."""
TRANSPORT_NICK = "DRM transport cert"
TRANSPORT_NICK = "KRA transport cert"
# metadata constants
KEY_ID = "key_id"
@ -69,64 +102,31 @@ class DogtagPlugin(sstore.SecretStoreBase):
def __init__(self, conf=CONF):
"""Constructor - create the keyclient."""
pem_path = conf.dogtag_plugin.pem_path
if pem_path is None:
raise ValueError(u._("pem_path is required"))
pem_password = conf.dogtag_plugin.pem_password
if pem_password is None:
raise ValueError(u._("pem_password is required"))
crypto = None
create_nss_db = False
nss_db_path = conf.dogtag_plugin.nss_db_path
if nss_db_path is not None:
nss_password = conf.dogtag_plugin.nss_password
if nss_password is None:
raise ValueError(u._("nss_password is required"))
if not os.path.exists(nss_db_path):
create_nss_db = True
cryptoutil.NSSCryptoUtil.setup_database(
nss_db_path, nss_password, over_write=True)
crypto = cryptoutil.NSSCryptoUtil(nss_db_path, nss_password)
# set up connection
connection = pki.client.PKIConnection(
'https',
conf.dogtag_plugin.drm_host,
conf.dogtag_plugin.drm_port,
'kra')
connection.set_authentication_cert(pem_path)
# what happened to the password?
# until we figure out how to pass the password to requests, we'll
# just use -nodes to create the admin cert pem file. Any required
# code will end up being in the DRM python client
crypto, create_nss_db = setup_nss_db(conf)
connection = create_connection(conf, 'kra')
#create kraclient
kraclient = pki.kraclient.KRAClient(connection, crypto)
kraclient = pki.kra.KRAClient(connection, crypto)
self.keyclient = kraclient.keys
self.systemcert_client = kraclient.system_certs
if crypto is not None:
if create_nss_db:
# Get transport cert and insert in the certdb
transport_cert = self.systemcert_client.get_transport_cert()
tcert = transport_cert[
len(pki.CERT_HEADER):
len(transport_cert) - len(pki.CERT_FOOTER)]
crypto.import_cert(DogtagPlugin.TRANSPORT_NICK,
base64.decodestring(tcert), "u,u,u")
self.import_transport_cert(crypto)
crypto.initialize()
self.keyclient.set_transport_cert(
DogtagPlugin.TRANSPORT_NICK)
DogtagKRAPlugin.TRANSPORT_NICK)
def import_transport_cert(self, crypto):
# Get transport cert and insert in the certdb
transport_cert = self.systemcert_client.get_transport_cert()
crypto.import_cert(DogtagKRAPlugin.TRANSPORT_NICK,
transport_cert,
"u,u,u")
def store_secret(self, secret_dto, context):
"""Store a secret in the DRM
"""Store a secret in the KRA
If secret_dto.transport_key is not None, then we expect
secret_dto.secret to include a base64 encoded PKIArchiveOptions
@ -135,7 +135,7 @@ class DogtagPlugin(sstore.SecretStoreBase):
and parameters to specify the symmetric key wrapping.
Otherwise, the data is unencrypted and we use a call to archive_key()
to have the Dogtag DRM client generate the relevant session keys.
to have the Dogtag KRA client generate the relevant session keys.
The secret_dto contains additional information on the type of secret
that is being stored. We will use that shortly. For, now, lets just
@ -163,21 +163,21 @@ class DogtagPlugin(sstore.SecretStoreBase):
key_algorithm=None,
key_size=None)
return {DogtagPlugin.SECRET_TYPE: secret_dto.type,
DogtagPlugin.SECRET_KEYSPEC: secret_dto.key_spec,
DogtagPlugin.KEY_ID: response.get_key_id()}
return {DogtagKRAPlugin.SECRET_TYPE: secret_dto.type,
DogtagKRAPlugin.SECRET_KEYSPEC: secret_dto.key_spec,
DogtagKRAPlugin.KEY_ID: response.get_key_id()}
def get_secret(self, secret_metadata, context):
"""Retrieve a secret from the DRM
"""Retrieve a secret from the KRA
The secret_metadata is simply the dict returned by a store_secret() or
get_secret() call. We will extract the key_id from this dict.
Note: There are two ways to retrieve secrets from the DRM.
Note: There are two ways to retrieve secrets from the KRA.
The first method calls retrieve_key without a wrapping key. This
relies on the DRM client to generate a wrapping key (and wrap it with
the DRM transport cert), and is completely transparent to the
relies on the KRA client to generate a wrapping key (and wrap it with
the KRA transport cert), and is completely transparent to the
Barbican server. What is returned to the caller is the
unencrypted secret.
@ -186,7 +186,7 @@ class DogtagPlugin(sstore.SecretStoreBase):
able to unwrap the secret. This wrapping key is provided in the
secret_metadata by Barbican core.
"""
key_id = secret_metadata[DogtagPlugin.KEY_ID]
key_id = secret_metadata[DogtagKRAPlugin.KEY_ID]
twsk = None
if 'trans_wrapped_session_key' in secret_metadata:
twsk = secret_metadata['trans_wrapped_session_key']
@ -198,16 +198,16 @@ class DogtagPlugin(sstore.SecretStoreBase):
# TODO(alee) remove final field when content_type is removed
# from secret_dto
ret = sstore.SecretDTO(
type=secret_metadata[DogtagPlugin.SECRET_TYPE],
type=secret_metadata[DogtagKRAPlugin.SECRET_TYPE],
secret=recovered_key,
key_spec=secret_metadata[DogtagPlugin.SECRET_KEYSPEC],
key_spec=secret_metadata[DogtagKRAPlugin.SECRET_KEYSPEC],
content_type=None,
transport_key=None)
return ret
def delete_secret(self, secret_metadata):
"""Delete a secret from the DRM
"""Delete a secret from the KRA
There is currently no way to delete a secret in Dogtag.
We will be implementing such a method shortly.
@ -217,7 +217,7 @@ class DogtagPlugin(sstore.SecretStoreBase):
def generate_symmetric_key(self, key_spec, context):
"""Generate a symmetric key
This calls generate_symmetric_key() on the DRM passing in the
This calls generate_symmetric_key() on the KRA passing in the
algorithm, bit_length and id (used as the client_key_id) from
the secret. The remaining parameters are not used.
@ -238,9 +238,9 @@ class DogtagPlugin(sstore.SecretStoreBase):
algorithm,
key_spec.bit_length,
usages)
return {DogtagPlugin.SECRET_KEYSPEC: key_spec,
DogtagPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC,
DogtagPlugin.KEY_ID: response.get_key_id()}
return {DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
DogtagKRAPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC,
DogtagKRAPlugin.KEY_ID: response.get_key_id()}
def generate_asymmetric_key(self, key_spec, context):
"""Generate an asymmetric key."""
@ -292,3 +292,238 @@ class DogtagPlugin(sstore.SecretStoreBase):
return None
else:
return None
def _catch_request_exception(ca_related_function):
def _catch_ca_unavailable(self, *args, **kwargs):
try:
return ca_related_function(self, *args, **kwargs)
except request_exceptions.RequestException:
return cm.ResultDTO(
cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST)
return _catch_ca_unavailable
class DogtagCAPlugin(cm.CertificatePluginBase):
"""Implementation of the cert plugin with Dogtag CA as the backend."""
# order_metadata fields
PROFILE_ID = "profile_id"
# plugin_metadata fields
REQUEST_ID = "request_id"
def __init__(self, conf=CONF):
"""Constructor - create the cert clients."""
crypto, create_nss_db = setup_nss_db(conf)
connection = create_connection(conf, 'ca')
self.certclient = pki.cert.CertClient(connection)
if crypto is not None:
crypto.initialize()
def _get_request_id(self, order_id, plugin_meta, operation):
request_id = plugin_meta.get(self.REQUEST_ID, None)
if not request_id:
raise cm.CertificateGeneralException(
"{0} not found for {1} for order_id {2}".format(
self.REQUEST_ID, operation, order_id))
return request_id
@_catch_request_exception
def _get_request(self, request_id):
try:
return self.certclient.get_request(request_id)
except pki.RequestNotFoundException:
return None
@_catch_request_exception
def _get_cert(self, cert_id):
try:
return self.certclient.get_cert(cert_id)
except pki.CertNotFoundException:
return None
def check_certificate_status(self, order_id, order_meta, plugin_meta):
"""Check the status of a certificate request.
:param order_id: ID of the order associated with this request
:param order_meta: order_metadata associated with this order
:param plugin_meta: data populated by previous calls for this order,
in particular the request_id
:return: cm.ResultDTO
"""
request_id = self._get_request_id(order_id, plugin_meta, "checking")
request = self._get_request(request_id)
if not request:
raise cm.CertificateGeneralException(
"No request found for request_id {0} for order {1}".format(
request_id, order_id))
request_status = request.request_status
if request_status == pki.cert.CertRequestStatus.REJECTED:
return cm.ResultDTO(
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
status_message=request.error_message)
elif request_status == pki.cert.CertRequestStatus.CANCELED:
return cm.ResultDTO(
cm.CertificateStatus.REQUEST_CANCELED)
elif request_status == pki.cert.CertRequestStatus.PENDING:
return cm.ResultDTO(
cm.CertificateStatus.WAITING_FOR_CA)
elif request_status == pki.cert.CertRequestStatus.COMPLETE:
# get the cert
cert_id = request.cert_id
if not cert_id:
raise cm.CertificateGeneralException(
"Request {0} reports status_complete, but no cert_id "
"has been returned".format(request_id))
cert = self._get_cert(cert_id)
if not cert:
raise cm.CertificateGeneralException(
"Certificate not found for cert_id: {0}".format(cert_id))
return cm.ResultDTO(
cm.CertificateStatus.CERTIFICATE_GENERATED,
certificate=cert.encoded,
intermediates=cert.pkcs7_cert_chain)
else:
raise cm.CertificateGeneralException(
"Invalid request_status returned by CA")
@_catch_request_exception
def issue_certificate_request(self, order_id, order_meta, plugin_meta):
"""Issue a certificate request to Dogtag CA
For now, we assume that we are talking to the Dogtag CA that
is deployed with the KRA back-end, and we are connected as a
CA agent. This means that we can use the agent convenience
method to automatically approve the certificate request.
:param order_id: ID of the order associated with this request
:param order_meta: dict containing all the inputs required for a
particular profile. One of these must be the profile_id.
The exact fields (both optional and mandatory) depend on the
profile, but they will be exposed to the user in a method to
expose syntax. Depending on the profile, only the relevant fields
will be populated in the request. All others will be ignored.
:param plugin_meta: Used to store data for status check.
:return: cm.ResultDTO
"""
profile_id = order_meta.get(self.PROFILE_ID, None)
if not profile_id:
return cm.ResultDTO(
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
status_message="No profile_id specified")
try:
enrollment_results = self.certclient.enroll_cert(
profile_id, order_meta)
# Although it is possible to create multiple certs in an invocation
# of enroll_cert, Barbican cannot handle this case. Assume
# only once cert and request generated for now.
enrollment_result = enrollment_results[0]
request = enrollment_result.request
if not request:
raise cm.CertificateGeneralException(
"No request returned in enrollment_results")
#store the request_id in the plugin metadata
plugin_meta[self.REQUEST_ID] = request.request_id
cert = enrollment_result.cert
if not cert:
request_status = request.request_status
if request_status == pki.cert.CertRequestStatus.REJECTED:
return cm.ResultDTO(
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
status_message=request.error_message)
elif request_status == pki.cert.CertRequestStatus.CANCELED:
return cm.ResultDTO(
cm.CertificateStatus.REQUEST_CANCELED)
elif request_status == pki.cert.CertRequestStatus.PENDING:
return cm.ResultDTO(
cm.CertificateStatus.WAITING_FOR_CA)
elif request_status == pki.cert.CertRequestStatus.COMPLETE:
raise cm.CertificateGeneralException(
"request_id {0} returns COMPLETE but no cert returned"
.format(request.request_id))
else:
raise cm.CertificateGeneralException(
"Invalid request_status {0} for request_id {1}"
.format(request_status, request.request_id))
return cm.ResultDTO(
cm.CertificateStatus.CERTIFICATE_GENERATED,
certificate=cert.encoded,
intermediates=cert.pkcs7_cert_chain)
except pki.BadRequestException as e:
return cm.ResultDTO(
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
status_message=e.message)
except pki.PKIException as e:
raise cm.CertificateGeneralException(
"Exception thrown by enroll_cert: {0}".format(e.message))
def modify_certificate_request(self, order_id, order_meta, plugin_meta):
"""Modify a certificate request.
Once a certificate request is generated, it cannot be modified.
The only alternative is to cancel the request (if it has not already
completed) and attempt a fresh enrolment. That is what will be
attempted here.
:param order_id: ID for this order
:param order_meta: order metadata. It is assumed that the newly
modified request data will be present here.
:param plugin_meta: data stored on behalf of the plugin for further
operations
:return: ResultDTO:
"""
result_dto = self.cancel_certificate_request(
order_id, order_meta, plugin_meta)
if result_dto.status == cm.CertificateStatus.REQUEST_CANCELED:
return self.issue_certificate_request(
order_id, order_meta, plugin_meta)
elif result_dto.status == cm.CertificateStatus.INVALID_OPERATION:
return cm.ResultDTO(
cm.CertificateStatus.INVALID_OPERATION,
status_message="Modify request: unable to cancel: {0}"
.format(result_dto.status_message))
else:
# other status (ca_unavailable, client_data_issue)
# return result from cancel operation
return result_dto
@_catch_request_exception
def cancel_certificate_request(self, order_id, order_meta, plugin_meta):
"""Cancel a certificate request.
:param order_id: ID for the order associated with this request
:param order_meta: order metadata fdr this request
:param plugin_meta: data stored by plugin for further processing.
In particular, the request_id
:return: cm.ResultDTO:
"""
request_id = self._get_request_id(order_id, plugin_meta, "cancelling")
try:
review_response = self.certclient.review_request(request_id)
self.certclient.cancel_request(request_id, review_response)
return cm.ResultDTO(cm.CertificateStatus.REQUEST_CANCELED)
except pki.RequestNotFoundException:
return cm.ResultDTO(
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
status_message="no request found for this order")
except pki.ConflictingOperationException as e:
return cm.ResultDTO(
cm.CertificateStatus.INVALID_OPERATION,
status_message=e.message)
def supports(self, certificate_spec):
return True

View File

@ -66,6 +66,16 @@ class CertificateStatusNotSupported(exception.BarbicanException):
self.status = status
class CertificateGeneralException(exception.BarbicanException):
"""Raised when a system fault has occurred."""
def __init__(self, reason=u._('Unknown')):
super(CertificateGeneralException, self).__init__(
u._('Problem seen during certificate processing - '
'Reason: {0}').format(reason)
)
self.reason = reason
@six.add_metaclass(abc.ABCMeta)
class CertificatePluginBase(object):
"""Base class for certificate plugins.

View File

@ -18,9 +18,15 @@ import os
import tempfile
import testtools
from requests import exceptions as request_exceptions
try:
import barbican.plugin.dogtag as dogtag_import
import barbican.plugin.interface.certificate_manager as cm
import barbican.plugin.interface.secret_store as sstore
import pki
import pki.cert as dogtag_cert
imports_ok = True
except ImportError:
# dogtag imports probably not available
@ -28,12 +34,12 @@ except ImportError:
@testtools.skipIf(not imports_ok, "Dogtag imports not available")
class WhenTestingDogtagPlugin(testtools.TestCase):
class WhenTestingDogtagKRAPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingDogtagPlugin, self).setUp()
super(WhenTestingDogtagKRAPlugin, self).setUp()
self.keyclient_mock = mock.MagicMock(name="KeyClient mock")
self.patcher = mock.patch('pki.cryptoutil.NSSCryptoUtil')
self.patcher = mock.patch('pki.crypto.NSSCryptoProvider')
self.patcher.start()
# create nss db for test only
@ -42,11 +48,11 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.dogtag_plugin = mock.MagicMock(
nss_db_path=self.nss_dir)
self.plugin = dogtag_import.DogtagPlugin(self.cfg_mock)
self.plugin = dogtag_import.DogtagKRAPlugin(self.cfg_mock)
self.plugin.keyclient = self.keyclient_mock
def tearDown(self):
super(WhenTestingDogtagPlugin, self).tearDown()
super(WhenTestingDogtagKRAPlugin, self).tearDown()
self.patcher.stop()
os.rmdir(self.nss_dir)
@ -73,19 +79,10 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
def test_raises_error_with_no_pem_path(self):
m = mock.MagicMock()
m.dogtag_plugin = mock.MagicMock(pem_path=None)
m.dogtag_plugin = mock.MagicMock(pem_path=None, nss_db_path='/tmp')
self.assertRaises(
ValueError,
dogtag_import.DogtagPlugin,
m,
)
def test_raises_error_with_no_pem_password(self):
m = mock.MagicMock()
m.dogtag_plugin = mock.MagicMock(pem_password=None)
self.assertRaises(
ValueError,
dogtag_import.DogtagPlugin,
dogtag_import.DogtagKRAPlugin,
m,
)
@ -94,7 +91,7 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
m.dogtag_plugin = mock.MagicMock(nss_password=None)
self.assertRaises(
ValueError,
dogtag_import.DogtagPlugin,
dogtag_import.DogtagKRAPlugin,
m,
)
@ -140,10 +137,10 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
key_spec = mock.MagicMock()
context = mock.MagicMock()
secret_metadata = {
dogtag_import.DogtagPlugin.SECRET_TYPE:
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.SYMMETRIC,
dogtag_import.DogtagPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagPlugin.KEY_ID: 'key1'
dogtag_import.DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1'
}
self.plugin.get_secret(secret_metadata, context)
@ -154,10 +151,10 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
context = mock.MagicMock()
twsk = mock.MagicMock()
secret_metadata = {
dogtag_import.DogtagPlugin.SECRET_TYPE:
dogtag_import.DogtagKRAPlugin.SECRET_TYPE:
sstore.SecretType.SYMMETRIC,
dogtag_import.DogtagPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagPlugin.KEY_ID: 'key1',
dogtag_import.DogtagKRAPlugin.SECRET_KEYSPEC: key_spec,
dogtag_import.DogtagKRAPlugin.KEY_ID: 'key1',
'trans_wrapped_session_key': twsk
}
self.plugin.get_secret(secret_metadata, context)
@ -187,3 +184,565 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
self.assertFalse(
self.plugin.generate_supports(key_spec)
)
@testtools.skipIf(not imports_ok, "Dogtag imports not available")
class WhenTestingDogtagCAPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingDogtagCAPlugin, self).setUp()
self.certclient_mock = mock.MagicMock(name="CertClient mock")
self.patcher = mock.patch('pki.crypto.NSSCryptoProvider')
self.patcher.start()
# create nss db for test only
self.nss_dir = tempfile.mkdtemp()
self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.dogtag_plugin = mock.MagicMock(
nss_db_path=self.nss_dir)
self.plugin = dogtag_import.DogtagCAPlugin(self.cfg_mock)
self.plugin.certclient = self.certclient_mock
self.order_id = mock.MagicMock()
self.profile_id = mock.MagicMock()
#request generated
self.request = mock.MagicMock()
self.request_id_mock = mock.MagicMock()
self.request.request_id = self.request_id_mock
self.request.request_status = dogtag_cert.CertRequestStatus.COMPLETE
self.cert_id_mock = mock.MagicMock()
self.request.cert_id = self.cert_id_mock
#cert generated
self.cert = mock.MagicMock()
self.cert_encoded_mock = mock.MagicMock()
self.cert.encoded = self.cert_encoded_mock
self.cert_pkcs7_mock = mock.MagicMock()
self.cert.pkcs7_cert_chain = self.cert_pkcs7_mock
# for cancel/modify
self.review_response = mock.MagicMock()
# modified request
self.modified_request = mock.MagicMock()
self.modified_request_id_mock = mock.MagicMock()
self.modified_request.request_id = self.modified_request_id_mock
self.modified_request.request_status = \
dogtag_cert.CertRequestStatus.COMPLETE
self.modified_request.cert_id = self.cert_id_mock
def tearDown(self):
super(WhenTestingDogtagCAPlugin, self).tearDown()
self.patcher.stop()
os.rmdir(self.nss_dir)
def test_issue_certificate_request(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, self.cert)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CERTIFICATE_GENERATED,
"result_dto status incorrect")
self.assertEqual(result_dto.certificate,
self.cert_encoded_mock)
self.assertEqual(result_dto.intermediates,
self.cert_pkcs7_mock)
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_return_data_error_with_no_profile_id(self):
order_meta = {}
plugin_meta = {}
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto status incorrect")
self.assertEqual(result_dto.status_message,
"No profile_id specified")
def test_issue_return_data_error_with_request_rejected(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.request.request_status = dogtag_cert.CertRequestStatus.REJECTED
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, None)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto status incorrect")
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_return_canceled_with_request_canceled(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.request.request_status = dogtag_cert.CertRequestStatus.CANCELED
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, None)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.REQUEST_CANCELED,
"result_dto status incorrect")
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_return_waiting_with_request_pending(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.request.request_status = dogtag_cert.CertRequestStatus.PENDING
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, None)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.WAITING_FOR_CA,
"result_dto status incorrect")
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_raises_error_request_complete_no_cert(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, None)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.issue_certificate_request,
self.order_id,
order_meta,
plugin_meta
)
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_raises_error_request_unknown_status(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.request.request_status = "unknown_status"
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.request, None)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.issue_certificate_request,
self.order_id,
order_meta,
plugin_meta
)
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.request_id_mock
)
def test_issue_return_client_error_bad_request_exception(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.certclient_mock.enroll_cert.side_effect = \
pki.BadRequestException("bad request")
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto status incorrect")
def test_issue_raises_error_pki_exception(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.certclient_mock.enroll_cert.side_effect = \
pki.PKIException("generic enrollment error")
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.issue_certificate_request,
self.order_id,
order_meta,
plugin_meta
)
def test_issue_return_ca_unavailable(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID: self.profile_id}
plugin_meta = {}
self.certclient_mock.enroll_cert.side_effect = \
request_exceptions.RequestException()
result_dto = self.plugin.issue_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST,
"result_dto status incorrect")
def test_cancel_request(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.cancel_request.return_value = None
self.certclient_mock.review_request.return_value = self.review_response
result_dto = self.plugin.cancel_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.cancel_request.assert_called_once_with(
self.request_id_mock,
self.review_response)
self.assertEqual(result_dto.status,
cm.CertificateStatus.REQUEST_CANCELED,
"result_dto_status incorrect")
def test_cancel_no_request_found(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.side_effect = \
pki.RequestNotFoundException("request_not_found")
result_dto = self.plugin.cancel_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.review_request.assert_called_once_with(
self.request_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto_status incorrect")
def test_cancel_conflicting_operation(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.return_value = self.review_response
self.certclient_mock.cancel_request.side_effect = \
pki.ConflictingOperationException("conflicting_operation")
result_dto = self.plugin.cancel_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.cancel_request.assert_called_once_with(
self.request_id_mock,
self.review_response)
self.assertEqual(result_dto.status,
cm.CertificateStatus.INVALID_OPERATION,
"result_dto_status incorrect")
def test_cancel_ca_unavailable(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.side_effect = \
request_exceptions.RequestException("request_exception")
result_dto = self.plugin.cancel_certificate_request(
self.order_id, order_meta, plugin_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST,
"result_dto_status incorrect")
def test_cancel_raise_error_no_request_id(self):
order_meta = mock.ANY
plugin_meta = {}
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.cancel_certificate_request,
self.order_id,
order_meta,
plugin_meta
)
def test_check_status(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.get_request.return_value = self.request
self.certclient_mock.get_cert.return_value = self.cert
result_dto = self.plugin.check_certificate_status(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.get_request.assert_called_once_with(
self.request_id_mock)
self.certclient_mock.get_cert.assert_called_once_with(
self.cert_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CERTIFICATE_GENERATED,
"result_dto_status incorrect")
self.assertEqual(result_dto.certificate,
self.cert_encoded_mock)
def test_check_status_raise_error_no_request_id(self):
order_meta = mock.ANY
plugin_meta = {}
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.check_certificate_status,
self.order_id,
order_meta,
plugin_meta
)
def test_check_status_rejected(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.request.request_status = dogtag_cert.CertRequestStatus.REJECTED
self.certclient_mock.get_request.return_value = self.request
result_dto = self.plugin.check_certificate_status(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.get_request.assert_called_once_with(
self.request_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto_status incorrect")
self.assertEqual(result_dto.certificate,
None)
def test_check_status_canceled(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.request.request_status = dogtag_cert.CertRequestStatus.CANCELED
self.certclient_mock.get_request.return_value = self.request
result_dto = self.plugin.check_certificate_status(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.get_request.assert_called_once_with(
self.request_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.REQUEST_CANCELED,
"result_dto_status incorrect")
self.assertEqual(result_dto.certificate,
None)
def test_check_status_pending(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.request.request_status = dogtag_cert.CertRequestStatus.PENDING
self.certclient_mock.get_request.return_value = self.request
result_dto = self.plugin.check_certificate_status(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.get_request.assert_called_once_with(
self.request_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.WAITING_FOR_CA,
"result_dto_status incorrect")
self.assertEqual(result_dto.certificate,
None)
def test_check_status_raises_error_complete_no_cert(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.get_request.return_value = self.request
self.certclient_mock.get_cert.return_value = None
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.check_certificate_status,
self.order_id,
order_meta,
plugin_meta
)
def test_modify_request(self):
order_meta = {dogtag_import.DogtagCAPlugin.PROFILE_ID:
self.profile_id}
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.cancel_request.return_value = None
self.certclient_mock.review_request.return_value = self.review_response
enrollment_result = dogtag_cert.CertEnrollmentResult(
self.modified_request, self.cert)
enrollment_results = [enrollment_result]
self.certclient_mock.enroll_cert.return_value = enrollment_results
result_dto = self.plugin.modify_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.cancel_request.assert_called_once_with(
self.request_id_mock,
self.review_response)
self.certclient_mock.enroll_cert.assert_called_once_with(
self.profile_id,
order_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CERTIFICATE_GENERATED,
"result_dto_status incorrect")
self.assertEqual(result_dto.certificate,
self.cert_encoded_mock)
self.assertEqual(result_dto.intermediates,
self.cert_pkcs7_mock)
self.assertEqual(
plugin_meta.get(dogtag_import.DogtagCAPlugin.REQUEST_ID),
self.modified_request_id_mock
)
def test_modify_no_request_found(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.side_effect = \
pki.RequestNotFoundException("request_not_found")
result_dto = self.plugin.modify_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.review_request.assert_called_once_with(
self.request_id_mock)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
"result_dto_status incorrect")
def test_modify_conflicting_operation(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.return_value = self.review_response
self.certclient_mock.cancel_request.side_effect = \
pki.ConflictingOperationException("conflicting_operation")
result_dto = self.plugin.modify_certificate_request(
self.order_id, order_meta, plugin_meta)
self.certclient_mock.cancel_request.assert_called_once_with(
self.request_id_mock,
self.review_response)
self.assertEqual(result_dto.status,
cm.CertificateStatus.INVALID_OPERATION,
"result_dto_status incorrect")
def test_modify_ca_unavailable(self):
order_meta = mock.ANY
plugin_meta = {dogtag_import.DogtagCAPlugin.REQUEST_ID:
self.request_id_mock}
self.certclient_mock.review_request.side_effect = \
request_exceptions.RequestException("request_exception")
result_dto = self.plugin.modify_certificate_request(
self.order_id, order_meta, plugin_meta)
self.assertEqual(result_dto.status,
cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST,
"result_dto_status incorrect")
def test_modify_raise_error_no_request_id(self):
order_meta = mock.ANY
plugin_meta = {}
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.modify_certificate_request,
self.order_id,
order_meta,
plugin_meta
)

View File

@ -155,25 +155,16 @@ server_name = 'barbican.queue'
[crypto]
namespace = barbican.crypto.plugin
enabled_crypto_plugins = simple_crypto
#enabled_crypto_plugins = dogtag_crypto
[simple_crypto_plugin]
# the kek should be a 32-byte value which is base64 encoded
kek = 'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY='
[dogtag_crypto_plugin]
pem_path = '/etc/barbican/drm_admin_cert.pem'
pem_password = 'password123'
drm_host = localhost
drm_port = 8443
nss_db_path = '/etc/barbican/alias'
nss_password = 'password123'
[dogtag_plugin]
pem_path = '/etc/barbican/drm_admin_cert.pem'
pem_path = '/etc/barbican/kra_admin_cert.pem'
pem_password = 'password123'
drm_host = localhost
drm_port = 8443
dogtag_host = localhost
dogtag_port = 8443
nss_db_path = '/etc/barbican/alias'
nss_password = 'password123'

View File

@ -29,7 +29,7 @@ scripts =
[entry_points]
barbican.secretstore.plugin =
store_crypto = barbican.plugin.store_crypto:StoreCryptoAdapterPlugin
dogtag_crypto = barbican.plugin.dogtag:DogtagPlugin
dogtag_crypto = barbican.plugin.dogtag:DogtagKRAPlugin
barbican.crypto.plugin =
p11_crypto = barbican.plugin.crypto.p11_crypto:P11CryptoPlugin
simple_crypto = barbican.plugin.crypto.simple_crypto:SimpleCryptoPlugin