Fixes Octavia not working with Barbican

Adds conversion of the Barbicna payload (see changes at
https://docs.openstack.org/developer/python-barbicanclient/usage.html)
by using oslo's encodeutils

Change-Id: Ibc9fdc8b1bb19b07e70581c6aaa25c5e45bdb1ba
Closes-Bug: #1681595
This commit is contained in:
German Eichberger 2017-04-21 16:56:35 -04:00
parent c41a2fc427
commit 38a3d4f318
3 changed files with 73 additions and 25 deletions

View File

@ -18,9 +18,10 @@ Common classes for Barbican certificate handling
"""
import abc
import six
from barbicanclient import client as barbican_client
import six
from oslo_utils import encodeutils
from octavia.certificates.common import cert
from octavia.common.tls_utils import cert_parser
@ -39,21 +40,25 @@ class BarbicanCert(cert.Cert):
def get_certificate(self):
if self._cert_container.certificate:
return self._cert_container.certificate.payload
return encodeutils.to_utf8(
self._cert_container.certificate.payload)
def get_intermediates(self):
if self._cert_container.intermediates:
intermediates = self._cert_container.intermediates.payload
intermediates = encodeutils.to_utf8(
self._cert_container.intermediates.payload)
return [imd for imd in cert_parser.get_intermediates_pems(
intermediates)]
def get_private_key(self):
if self._cert_container.private_key:
return self._cert_container.private_key.payload
return encodeutils.to_utf8(
self._cert_container.private_key.payload)
def get_private_key_passphrase(self):
if self._cert_container.private_key_passphrase:
return self._cert_container.private_key_passphrase.payload
return encodeutils.to_utf8(
self._cert_container.private_key_passphrase.payload)
@six.add_metaclass(abc.ABCMeta)

View File

@ -14,6 +14,7 @@
from barbicanclient import client as barbican_client
import mock
import six
import octavia.certificates.common.barbican as barbican_common
import octavia.tests.unit.base as base
@ -22,13 +23,7 @@ import octavia.tests.unit.common.sample_configs.sample_certs as sample
class TestBarbicanCert(base.TestCase):
def setUp(self):
# Certificate data
self.certificate = sample.X509_CERT
self.intermediates = sample.X509_IMDS_LIST
self.private_key = sample.X509_CERT_KEY_ENCRYPTED
self.private_key_passphrase = sample.X509_CERT_KEY_PASSPHRASE
def _prepare(self):
self.certificate_secret = barbican_client.secrets.Secret(
api=mock.MagicMock(),
payload=self.certificate
@ -46,9 +41,14 @@ class TestBarbicanCert(base.TestCase):
payload=self.private_key_passphrase
)
super(TestBarbicanCert, self).setUp()
def test_barbican_cert(self):
# Certificate data
self.certificate = six.binary_type(sample.X509_CERT)
self.intermediates = sample.X509_IMDS_LIST
self.private_key = six.binary_type(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key_passphrase = sample.X509_CERT_KEY_PASSPHRASE
self._prepare()
container = barbican_client.containers.CertificateContainer(
api=mock.MagicMock(),
certificate=self.certificate_secret,
@ -62,8 +62,39 @@ class TestBarbicanCert(base.TestCase):
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(), self.certificate)
self.assertEqual(cert.get_intermediates(), self.intermediates)
self.assertEqual(cert.get_private_key(), self.private_key)
self.assertEqual(cert.get_certificate(), sample.X509_CERT)
self.assertEqual(cert.get_intermediates(), sample.X509_IMDS_LIST)
self.assertEqual(cert.get_private_key(),
sample.X509_CERT_KEY_ENCRYPTED)
self.assertEqual(cert.get_private_key_passphrase(),
self.private_key_passphrase)
six.b(sample.X509_CERT_KEY_PASSPHRASE))
def test_barbican_cert_text(self):
# Certificate data
self.certificate = six.text_type(sample.X509_CERT)
self.intermediates = six.text_type(sample.X509_IMDS_LIST)
self.private_key = six.text_type(sample.X509_CERT_KEY_ENCRYPTED)
self.private_key_passphrase = six.text_type(
sample.X509_CERT_KEY_PASSPHRASE)
self._prepare()
container = barbican_client.containers.CertificateContainer(
api=mock.MagicMock(),
certificate=self.certificate_secret,
intermediates=self.intermediates_secret,
private_key=self.private_key_secret,
private_key_passphrase=self.private_key_passphrase_secret
)
# Create a cert
cert = barbican_common.BarbicanCert(
cert_container=container
)
# Validate the cert functions
self.assertEqual(cert.get_certificate(),
six.b(six.text_type(sample.X509_CERT)))
self.assertEqual(cert.get_intermediates(), sample.X509_IMDS_LIST)
self.assertEqual(cert.get_private_key(), six.b(six.text_type(
sample.X509_CERT_KEY_ENCRYPTED)))
self.assertEqual(cert.get_private_key_passphrase(),
six.b(sample.X509_CERT_KEY_PASSPHRASE))

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import uuid
from barbicanclient import containers
@ -40,11 +41,22 @@ class TestBarbicanManager(base.TestCase):
)
self.name = 'My Fancy Cert'
self.private_key = mock.Mock(spec=secrets.Secret)
self.certificate = mock.Mock(spec=secrets.Secret)
self.intermediates = mock.Mock(spec=secrets.Secret)
self.intermediates.payload = sample.X509_IMDS
self.private_key_passphrase = mock.Mock(spec=secrets.Secret)
self.certificate = secrets.Secret(
api=mock.MagicMock(),
payload=sample.X509_CERT
)
self.intermediates = secrets.Secret(
api=mock.MagicMock(),
payload=sample.X509_IMDS
)
self.private_key = secrets.Secret(
api=mock.MagicMock(),
payload=sample.X509_CERT_KEY_ENCRYPTED
)
self.private_key_passphrase = secrets.Secret(
api=mock.MagicMock(),
payload=sample.X509_CERT_KEY_PASSPHRASE
)
container = mock.Mock(spec=containers.CertificateContainer)
container.container_ref = self.container_ref
@ -184,7 +196,7 @@ class TestBarbicanManager(base.TestCase):
self.assertEqual(data.get_intermediates(),
sample.X509_IMDS_LIST)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
six.b(self.private_key_passphrase.payload))
def test_get_cert_no_registration(self):
self.bc.containers.get.return_value = self.container
@ -209,7 +221,7 @@ class TestBarbicanManager(base.TestCase):
self.assertEqual(data.get_intermediates(),
sample.X509_IMDS_LIST)
self.assertEqual(data.get_private_key_passphrase(),
self.private_key_passphrase.payload)
six.b(self.private_key_passphrase.payload))
def test_delete_cert(self):
# Attempt to deregister as a consumer