Merge "Fixes Octavia not working with Barbican"
This commit is contained in:
commit
3098cf18fb
@ -18,9 +18,10 @@ Common classes for Barbican certificate handling
|
||||
"""
|
||||
|
||||
import abc
|
||||
import six
|
||||
|
||||
from barbicanclient import client as barbican_client
|
||||
import six
|
||||
from oslo_utils import encodeutils
|
||||
|
||||
from octavia.certificates.common import cert
|
||||
from octavia.common.tls_utils import cert_parser
|
||||
@ -39,21 +40,25 @@ class BarbicanCert(cert.Cert):
|
||||
|
||||
def get_certificate(self):
|
||||
if self._cert_container.certificate:
|
||||
return self._cert_container.certificate.payload
|
||||
return encodeutils.to_utf8(
|
||||
self._cert_container.certificate.payload)
|
||||
|
||||
def get_intermediates(self):
|
||||
if self._cert_container.intermediates:
|
||||
intermediates = self._cert_container.intermediates.payload
|
||||
intermediates = encodeutils.to_utf8(
|
||||
self._cert_container.intermediates.payload)
|
||||
return [imd for imd in cert_parser.get_intermediates_pems(
|
||||
intermediates)]
|
||||
|
||||
def get_private_key(self):
|
||||
if self._cert_container.private_key:
|
||||
return self._cert_container.private_key.payload
|
||||
return encodeutils.to_utf8(
|
||||
self._cert_container.private_key.payload)
|
||||
|
||||
def get_private_key_passphrase(self):
|
||||
if self._cert_container.private_key_passphrase:
|
||||
return self._cert_container.private_key_passphrase.payload
|
||||
return encodeutils.to_utf8(
|
||||
self._cert_container.private_key_passphrase.payload)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
from barbicanclient import client as barbican_client
|
||||
import mock
|
||||
import six
|
||||
|
||||
import octavia.certificates.common.barbican as barbican_common
|
||||
import octavia.tests.unit.base as base
|
||||
@ -22,13 +23,7 @@ import octavia.tests.unit.common.sample_configs.sample_certs as sample
|
||||
|
||||
class TestBarbicanCert(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Certificate data
|
||||
self.certificate = sample.X509_CERT
|
||||
self.intermediates = sample.X509_IMDS_LIST
|
||||
self.private_key = sample.X509_CERT_KEY_ENCRYPTED
|
||||
self.private_key_passphrase = sample.X509_CERT_KEY_PASSPHRASE
|
||||
|
||||
def _prepare(self):
|
||||
self.certificate_secret = barbican_client.secrets.Secret(
|
||||
api=mock.MagicMock(),
|
||||
payload=self.certificate
|
||||
@ -46,9 +41,14 @@ class TestBarbicanCert(base.TestCase):
|
||||
payload=self.private_key_passphrase
|
||||
)
|
||||
|
||||
super(TestBarbicanCert, self).setUp()
|
||||
|
||||
def test_barbican_cert(self):
|
||||
# Certificate data
|
||||
self.certificate = six.binary_type(sample.X509_CERT)
|
||||
self.intermediates = sample.X509_IMDS_LIST
|
||||
self.private_key = six.binary_type(sample.X509_CERT_KEY_ENCRYPTED)
|
||||
self.private_key_passphrase = sample.X509_CERT_KEY_PASSPHRASE
|
||||
self._prepare()
|
||||
|
||||
container = barbican_client.containers.CertificateContainer(
|
||||
api=mock.MagicMock(),
|
||||
certificate=self.certificate_secret,
|
||||
@ -62,8 +62,39 @@ class TestBarbicanCert(base.TestCase):
|
||||
)
|
||||
|
||||
# Validate the cert functions
|
||||
self.assertEqual(cert.get_certificate(), self.certificate)
|
||||
self.assertEqual(cert.get_intermediates(), self.intermediates)
|
||||
self.assertEqual(cert.get_private_key(), self.private_key)
|
||||
self.assertEqual(cert.get_certificate(), sample.X509_CERT)
|
||||
self.assertEqual(cert.get_intermediates(), sample.X509_IMDS_LIST)
|
||||
self.assertEqual(cert.get_private_key(),
|
||||
sample.X509_CERT_KEY_ENCRYPTED)
|
||||
self.assertEqual(cert.get_private_key_passphrase(),
|
||||
self.private_key_passphrase)
|
||||
six.b(sample.X509_CERT_KEY_PASSPHRASE))
|
||||
|
||||
def test_barbican_cert_text(self):
|
||||
# Certificate data
|
||||
self.certificate = six.text_type(sample.X509_CERT)
|
||||
self.intermediates = six.text_type(sample.X509_IMDS_LIST)
|
||||
self.private_key = six.text_type(sample.X509_CERT_KEY_ENCRYPTED)
|
||||
self.private_key_passphrase = six.text_type(
|
||||
sample.X509_CERT_KEY_PASSPHRASE)
|
||||
self._prepare()
|
||||
|
||||
container = barbican_client.containers.CertificateContainer(
|
||||
api=mock.MagicMock(),
|
||||
certificate=self.certificate_secret,
|
||||
intermediates=self.intermediates_secret,
|
||||
private_key=self.private_key_secret,
|
||||
private_key_passphrase=self.private_key_passphrase_secret
|
||||
)
|
||||
# Create a cert
|
||||
cert = barbican_common.BarbicanCert(
|
||||
cert_container=container
|
||||
)
|
||||
|
||||
# Validate the cert functions
|
||||
self.assertEqual(cert.get_certificate(),
|
||||
six.b(six.text_type(sample.X509_CERT)))
|
||||
self.assertEqual(cert.get_intermediates(), sample.X509_IMDS_LIST)
|
||||
self.assertEqual(cert.get_private_key(), six.b(six.text_type(
|
||||
sample.X509_CERT_KEY_ENCRYPTED)))
|
||||
self.assertEqual(cert.get_private_key_passphrase(),
|
||||
six.b(sample.X509_CERT_KEY_PASSPHRASE))
|
||||
|
@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
import uuid
|
||||
|
||||
from barbicanclient import containers
|
||||
@ -40,11 +41,22 @@ class TestBarbicanManager(base.TestCase):
|
||||
)
|
||||
|
||||
self.name = 'My Fancy Cert'
|
||||
self.private_key = mock.Mock(spec=secrets.Secret)
|
||||
self.certificate = mock.Mock(spec=secrets.Secret)
|
||||
self.intermediates = mock.Mock(spec=secrets.Secret)
|
||||
self.intermediates.payload = sample.X509_IMDS
|
||||
self.private_key_passphrase = mock.Mock(spec=secrets.Secret)
|
||||
self.certificate = secrets.Secret(
|
||||
api=mock.MagicMock(),
|
||||
payload=sample.X509_CERT
|
||||
)
|
||||
self.intermediates = secrets.Secret(
|
||||
api=mock.MagicMock(),
|
||||
payload=sample.X509_IMDS
|
||||
)
|
||||
self.private_key = secrets.Secret(
|
||||
api=mock.MagicMock(),
|
||||
payload=sample.X509_CERT_KEY_ENCRYPTED
|
||||
)
|
||||
self.private_key_passphrase = secrets.Secret(
|
||||
api=mock.MagicMock(),
|
||||
payload=sample.X509_CERT_KEY_PASSPHRASE
|
||||
)
|
||||
|
||||
container = mock.Mock(spec=containers.CertificateContainer)
|
||||
container.container_ref = self.container_ref
|
||||
@ -184,7 +196,7 @@ class TestBarbicanManager(base.TestCase):
|
||||
self.assertEqual(data.get_intermediates(),
|
||||
sample.X509_IMDS_LIST)
|
||||
self.assertEqual(data.get_private_key_passphrase(),
|
||||
self.private_key_passphrase.payload)
|
||||
six.b(self.private_key_passphrase.payload))
|
||||
|
||||
def test_get_cert_no_registration(self):
|
||||
self.bc.containers.get.return_value = self.container
|
||||
@ -209,7 +221,7 @@ class TestBarbicanManager(base.TestCase):
|
||||
self.assertEqual(data.get_intermediates(),
|
||||
sample.X509_IMDS_LIST)
|
||||
self.assertEqual(data.get_private_key_passphrase(),
|
||||
self.private_key_passphrase.payload)
|
||||
six.b(self.private_key_passphrase.payload))
|
||||
|
||||
def test_delete_cert(self):
|
||||
# Attempt to deregister as a consumer
|
||||
|
Loading…
Reference in New Issue
Block a user