Use subCA when specified to sign CSRs

Snakeoil was always using the root CA to sign CSRs.  With this
commit it will use the CA whose reference was passed.

Change-Id: I6afee4414aa4d2878f619f0f3650361a7b1d6ea8
Closes-Bug: 1499874
Partially-implements: blueprint add-cas
This commit is contained in:
Dave McCowan 2015-09-25 16:23:04 -04:00
parent 9a8cbf7897
commit 5c67eabd7c
3 changed files with 73 additions and 2 deletions

View File

@ -359,7 +359,7 @@ class SnakeoilCACertificatePlugin(cert_manager.CertificatePluginBase):
status_message=u._("No request_data specified"))
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, encoded_csr)
ca_id = plugin_meta.get('plugin_ca_id')
ca_id = barbican_meta_dto.plugin_ca_id
if ca_id:
ca = self.cas.get(ca_id)
if ca is None:

View File

@ -225,6 +225,7 @@ class SnakeoilCAPluginTestCase(BaseTestCase):
req_enc = base64.b64encode(req_enc)
order_meta = {'request_data': req_enc}
plugin_meta = {'plugin_ca_id': self.plugin.get_default_ca_name()}
self.barbican_meta_dto.plugin_ca_id = self.plugin.get_default_ca_name()
resp = self.plugin.issue_certificate_request(self.order_id,
order_meta,
plugin_meta,
@ -239,6 +240,7 @@ class SnakeoilCAPluginTestCase(BaseTestCase):
req_enc = base64.b64encode(req_enc)
order_meta = {'request_data': req_enc}
plugin_meta = {'plugin_ca_id': "invalid_ca_id"}
self.barbican_meta_dto.plugin_ca_id = "invalid_ca_id"
self.assertRaises(
cm.CertificateGeneralException,
self.plugin.issue_certificate_request,
@ -366,6 +368,26 @@ class SnakeoilCAPluginTestCase(BaseTestCase):
# TODO(alee) Verify that ca cert is signed by parent CA
def test_issue_certificate_request_with_subca_id(self):
subca_dict = self._create_subca()
req = certificate_utils.get_valid_csr_object()
req_enc = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
req_enc = base64.b64encode(req_enc)
order_meta = {'request_data': req_enc}
plugin_meta = {'plugin_ca_id': subca_dict.get(cm.PLUGIN_CA_ID)}
self.barbican_meta_dto.plugin_ca_id = subca_dict.get(cm.PLUGIN_CA_ID)
resp = self.plugin.issue_certificate_request(self.order_id,
order_meta,
plugin_meta,
self.barbican_meta_dto)
new_cert = crypto.load_certificate(
crypto.FILETYPE_PEM, resp.certificate.decode('base64'))
signing_cert = crypto.load_certificate(
crypto.FILETYPE_PEM, subca_dict['ca_signing_certificate'])
self.assertEqual(signing_cert.get_subject(), new_cert.get_issuer())
def test_delete_ca(self):
subca_dict = self._create_subca()
ca_id = subca_dict.get(cm.PLUGIN_CA_ID)

View File

@ -16,6 +16,7 @@
import base64
import copy
import re
import time
from OpenSSL import crypto
@ -24,7 +25,9 @@ from barbican.plugin.interface import certificate_manager as cert_interface
from barbican.tests import certificate_utils as certutil
from functionaltests.api import base
from functionaltests.api.v1.behaviors import ca_behaviors
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import order_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import ca_models
from functionaltests.api.v1.models import order_models
from functionaltests.common import config
@ -92,7 +95,9 @@ class CATestCommon(base.TestCase):
super(CATestCommon, self).setUp()
self.order_behaviors = order_behaviors.OrderBehaviors(self.client)
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
self.container_behaviors = container_behaviors.ContainerBehaviors(
self.client)
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data)
def tearDown(self):
@ -114,6 +119,15 @@ class CATestCommon(base.TestCase):
self.assertEqual(expected_return, create_resp.status_code)
if expected_return == 202:
self.assertIsNotNone(order_ref)
return order_ref
def wait_for_order(self, order_resp, order_ref):
# Make sure we have an active order
time_count = 1
while order_resp.model.status != "ACTIVE" and time_count <= 4:
time.sleep(1)
time_count += 1
order_resp = self.behaviors.get_order(order_ref)
def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
@ -297,6 +311,41 @@ class CertificateAuthoritiesTestCase(CATestCommon):
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_send_cert_order_and_verify_cert(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
order_ref = self.send_test_order(ca_ref)
order_resp = self.order_behaviors.get_order(order_ref=order_ref)
self.assertEqual(200, order_resp.status_code)
self.wait_for_order(order_resp=order_resp, order_ref=order_ref)
container_resp = self.container_behaviors.get_container(
order_resp.model.container_ref)
self.assertEqual(container_resp.status_code, 200)
secret_dict = {}
for secret in container_resp.model.secret_refs:
self.assertIsNotNone(secret.secret_ref)
secret_resp = self.secret_behaviors.get_secret(
secret.secret_ref, "application/octet-stream")
self.assertIsNotNone(secret_resp)
secret_dict[secret.name] = secret_resp.content
certificate = secret_dict['certificate']
new_cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
signing_cert = self.get_signing_cert(ca_ref)
issuer = new_cert.get_issuer()
expected_issuer = signing_cert.get_subject()
self.assertEqual(expected_issuer, issuer)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref)
self.assertEqual(204, resp.status_code)
class ListingCAsTestCase(CATestCommon):
"""Tests for listing CAs.