Hooking TLS together

Updated components to support TLS

Change-Id: Ia75e1d85709d37fc8cc6a7bba2f59684ce9b76ba
This commit is contained in:
ptoohill1 2015-05-26 03:51:49 -05:00 committed by Brandon Logan
parent adfc6977c6
commit 5ad396cbb2
13 changed files with 164 additions and 75 deletions

View File

@ -36,7 +36,7 @@ ACTIVE_PENDING_STATUSES = constants.SUPPORTED_PROVISIONING_STATUSES + (
constants.DEGRADED,)
BASE_PATH = '/var/lib/octavia'
BASE_CRT_DIR = '/listeners'
BASE_CRT_DIR = BASE_PATH + '/certs'
HAPROXY_TEMPLATE = os.path.abspath(
os.path.join(os.path.dirname(__file__),
@ -72,8 +72,6 @@ class JinjaTemplater(object):
self.base_crt_dir = base_crt_dir if base_crt_dir else BASE_CRT_DIR
self.haproxy_template = (haproxy_template if haproxy_template
else HAPROXY_TEMPLATE)
self.cert_store_path = '{0}{1}'.format(self.base_amp_path,
self.base_crt_dir)
self.log_http = log_http
self.log_server = log_server
self.timeout_client = timeout_client
@ -163,11 +161,12 @@ class JinjaTemplater(object):
if listener.connection_limit and listener.connection_limit > -1:
ret_value['connection_limit'] = listener.connection_limit
if listener.tls_certificate_id:
ret_value['default_tls_path'] = '%s/%s/%s.pem' % (
self.cert_store_path, listener.id, tls_cert.primary_cn)
ret_value['default_tls_path'] = '%s.pem' % (
os.path.join(self.base_crt_dir,
listener.id,
tls_cert.primary_cn))
if listener.sni_containers:
ret_value['crt_dir'] = '%s/%s' % (
self.cert_store_path, listener.id)
ret_value['crt_dir'] = os.path.join(self.base_crt_dir, listener.id)
if listener.default_pool:
ret_value['default_pool'] = self._transform_pool(
listener.default_pool)

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import socket
import tempfile
import time
@ -238,27 +239,31 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
"""
tls_cert = None
sni_certs = []
cert_dir = '{0}/{1}/certificates'.format(self.amp_config.base_path,
listener.id)
data = []
if listener.tls_certificate_id:
tls_cert = self._map_cert_tls_container(
self.barbican_client.get_cert(listener.tls_certificate_id))
data.append(self._build_pem(tls_cert))
self.barbican_client.get_cert(listener.tls_certificate_id,
check_only=True))
data.append(cert_parser.build_pem(tls_cert))
if listener.sni_containers:
for sni_cont in listener.sni_containers:
bbq_container = self._map_cert_tls_container(
self.barbican_client.get_cert(sni_cont.tls_container.id))
self.barbican_client.get_cert(sni_cont.tls_container.id,
check_only=True))
sni_certs.append(bbq_container)
data.append(self._build_pem(bbq_container))
data.append(cert_parser.build_pem(bbq_container))
if data:
cert_dir = os.path.join(self.amp_config.base_cert_dir, listener.id)
listener_cert = '{0}/{1}.pem'.format(cert_dir, tls_cert.primary_cn)
self._exec_on_amphorae(
listener.load_balancer.amphorae, [
'chmod 600 {0}/*.pem'.format(cert_dir)],
make_dir=cert_dir, data=data, upload_dir=cert_dir)
make_dir=cert_dir,
data=data, upload_dir=listener_cert)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}
@ -273,17 +278,6 @@ class HaproxyManager(driver_base.AmphoraLoadBalancerDriver):
certificate=cert.get_certificate(),
intermediates=cert.get_intermediates())
def _build_pem(self, tls_cert):
"""Concatenate TLS Certificate fields to create a PEM
encoded certificate file
"""
# TODO(ptoohill): Maybe this should be part of utils or manager?
pem = tls_cert.intermediates[:]
pem.extend([tls_cert.certificate, tls_cert.private_key])
return "\n".join(pem)
def _exec_on_amphorae(self, amphorae, commands, make_dir=None, data=None,
upload_dir=None):
data = data or []

View File

@ -35,7 +35,7 @@ class ListenerResponse(base.BaseType):
protocol = wtypes.wsattr(wtypes.text)
protocol_port = wtypes.wsattr(wtypes.IntegerType())
connection_limit = wtypes.wsattr(wtypes.IntegerType())
tls_certificate_id = wtypes.wsattr(wtypes.UuidType())
tls_certificate_id = wtypes.wsattr(wtypes.StringType(max_length=255))
class ListenerPOST(base.BaseType):
@ -47,7 +47,7 @@ class ListenerPOST(base.BaseType):
protocol = wtypes.wsattr(wtypes.StringType(), mandatory=True)
protocol_port = wtypes.wsattr(wtypes.IntegerType(), mandatory=True)
connection_limit = wtypes.wsattr(wtypes.IntegerType())
tls_certificate_id = wtypes.wsattr(wtypes.UuidType())
tls_certificate_id = wtypes.wsattr(wtypes.StringType(max_length=255))
tls_termination = wtypes.wsattr(TLSTermination)
@ -59,5 +59,5 @@ class ListenerPUT(base.BaseType):
protocol = wtypes.wsattr(wtypes.StringType())
protocol_port = wtypes.wsattr(wtypes.IntegerType())
connection_limit = wtypes.wsattr(wtypes.IntegerType())
tls_certificate_id = wtypes.wsattr(wtypes.UuidType())
tls_certificate_id = wtypes.wsattr(wtypes.StringType(max_length=255))
tls_termination = wtypes.wsattr(TLSTermination)

View File

@ -43,15 +43,19 @@ class BarbicanCert(cert.Cert):
self._cert_container = cert_container
def get_certificate(self):
if self._cert_container.certificate:
return self._cert_container.certificate.payload
def get_intermediates(self):
if self._cert_container.intermediates:
return self._cert_container.intermediates.payload
def get_private_key(self):
if self._cert_container.private_key:
return self._cert_container.private_key.payload
def get_private_key_passphrase(self):
if self._cert_container.private_key_passphrase:
return self._cert_container.private_key_passphrase.payload

View File

@ -35,7 +35,7 @@ class BarbicanCertManager(cert_mgr.CertManager):
@staticmethod
def store_cert(certificate, private_key, intermediates=None,
private_key_passphrase=None, expiration=None,
name='Octavia TLS Cert', **kwargs):
name='Octavia TLS Cert'):
"""Stores a certificate in the certificate manager.
:param certificate: PEM encoded TLS certificate
@ -113,14 +113,14 @@ class BarbicanCertManager(cert_mgr.CertManager):
).format(str(e)))
@staticmethod
def get_cert(cert_ref, service_name='Octavia', resource_ref=None,
check_only=False, **kwargs):
def get_cert(cert_ref, resource_ref=None, check_only=False,
service_name='Octavia'):
"""Retrieves the specified cert and registers as a consumer.
:param cert_ref: the UUID of the cert to retrieve
:param service_name: Friendly name for the consuming service
:param resource_ref: Full HATEOAS reference to the consuming resource
:param check_only: Read Certificate data without registering
:param service_name: Friendly name for the consuming service
:return: octavia.certificates.common.Cert representation of the
certificate data
@ -150,13 +150,12 @@ class BarbicanCertManager(cert_mgr.CertManager):
).format(cert_ref, str(e)))
@staticmethod
def delete_cert(cert_ref, service_name='Octavia', resource_ref=None,
**kwargs):
def delete_cert(cert_ref, resource_ref=None, service_name='Octavia'):
"""Deregister as a consumer for the specified cert.
:param cert_ref: the UUID of the cert to retrieve
:param service_name: Friendly name for the consuming service
:param resource_ref: Full HATEOAS reference to the consuming resource
:param service_name: Friendly name for the consuming service
:raises Exception: if deregistration fails
"""

View File

@ -30,7 +30,8 @@ class CertManager(object):
@abc.abstractmethod
def store_cert(self, certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
private_key_passphrase=None, expiration=None,
name=None):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert and returns its UUID that
@ -41,7 +42,8 @@ class CertManager(object):
pass
@abc.abstractmethod
def get_cert(self, cert_ref, check_only=False, **kwargs):
def get_cert(self, cert_ref, resource_ref=None, check_only=False,
service_name=None):
"""Retrieves the specified cert.
If check_only is True, don't perform any sort of registration.
@ -51,7 +53,7 @@ class CertManager(object):
pass
@abc.abstractmethod
def delete_cert(self, cert_ref, **kwargs):
def delete_cert(self, cert_ref, resource_ref, service_name=None):
"""Deletes the specified cert.
If the specified cert does not exist, a CertificateStorageException

View File

@ -35,7 +35,8 @@ PROTOCOL_TCP = 'TCP'
PROTOCOL_HTTP = 'HTTP'
PROTOCOL_HTTPS = 'HTTPS'
PROTOCOL_TERMINATED_HTTPS = 'TERMINATED_HTTPS'
SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP)
SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP,
PROTOCOL_TERMINATED_HTTPS)
# Note: The database Amphora table has a foreign key constraint against
# the provisioning_status table

View File

@ -17,6 +17,7 @@ from cryptography.hazmat import backends
from cryptography import x509
from OpenSSL import crypto
from OpenSSL import SSL
from oslo_log import log as logging
import six
import octavia.common.exceptions as exceptions
@ -25,6 +26,8 @@ import octavia.common.exceptions as exceptions
X509_BEG = "-----BEGIN CERTIFICATE-----"
X509_END = "-----END CERTIFICATE-----"
LOG = logging.getLogger(__name__)
def validate_cert(certificate, private_key=None,
private_key_passphrase=None, intermediates=None):
@ -76,7 +79,7 @@ def _split_x509s(xstr):
"""
curr_pem_block = []
inside_x509 = False
for line in xstr.replace(six.b("\r"), six.b("")).split(six.b("\n")):
for line in xstr.replace("\r", "").split("\n"):
if inside_x509:
curr_pem_block.append(line)
if line == X509_END:
@ -101,19 +104,27 @@ def get_host_names(certificate):
the SubjectAltNames of the certificate.
"""
try:
certificate = certificate.encode('ascii')
cert = x509.load_pem_x509_certificate(certificate,
backends.default_backend())
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
host_names = {
'cn': cn.value.lower(),
'dns_names': []
}
try:
ext = cert.extensions.get_extension_for_oid(
x509.OID_SUBJECT_ALTERNATIVE_NAME
)
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
host_names['dns_names'] = ext.value.get_values_for_type(
x509.DNSName)
except x509.ExtensionNotFound:
LOG.debug("{0} extension not found".format(
x509.OID_SUBJECT_ALTERNATIVE_NAME))
host_names = {
'cn': cn.value.lower(),
'dns_names': ext.value.get_values_for_type(x509.DNSName)
}
return host_names
except Exception:
except Exception as e:
LOG.exception(e)
raise exceptions.UnreadableCert
@ -129,3 +140,18 @@ def _get_x509_from_pem_bytes(certificate_pem):
except Exception:
raise exceptions.UnreadableCert
return x509
def build_pem(tls_container):
"""Concatenate TLS container fields to create a PEM
encoded certificate file
:param tls_container: Object container TLS certificates
:returns: Pem encoded certificate file
"""
pem = []
if tls_container.intermediates:
pem = tls_container.intermediates[:]
pem.extend([tls_container.certificate, tls_container.private_key])
return "\n".join(pem)

View File

@ -0,0 +1,48 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Adding TERMINATED_HTTPS support and TLS ref ID char length increase
Revision ID: 2351ea316465
Revises: 48660b6643f0
Create Date: 2015-05-22 11:57:04.703910
"""
# revision identifiers, used by Alembic.
revision = '2351ea316465'
down_revision = '357d17a6d5ac'
from alembic import op
import sqlalchemy as sa
from sqlalchemy import sql
new_protocol = 'TERMINATED_HTTPS'
def upgrade():
insert_table = sql.table(
u'protocol',
sql.column(u'name', sa.String),
sql.column(u'description', sa.String)
)
op.bulk_insert(
insert_table,
[
{'name': new_protocol}
]
)
op.alter_column(u'listener', u'tls_certificate_id',
existing_type=sa.String(255), nullable=True)

View File

@ -25,13 +25,16 @@ BUILTINS = '__builtin__'
if six.PY3:
BUILTINS = 'builtins'
BASE_AMP_PATH = '/var/lib/octavia'
BASE_CRT_PATH = BASE_AMP_PATH + '/certs'
class ListenerTestCase(base.TestCase):
def setUp(self):
super(ListenerTestCase, self).setUp()
self.jinja_cfg = jinja_cfg.JinjaTemplater(
base_amp_path='/var/lib/octavia',
base_crt_dir='/listeners')
base_amp_path=BASE_AMP_PATH,
base_crt_dir=BASE_CRT_PATH)
def test_parse_haproxy_config(self):
# template_tls
@ -51,7 +54,7 @@ class ListenerTestCase(base.TestCase):
self.assertEqual('/var/lib/octavia/sample_listener_id_1.sock',
res['stats_socket'])
self.assertEqual(
'/var/lib/octavia/listeners/sample_listener_id_1/FakeCN.pem',
'/var/lib/octavia/certs/sample_listener_id_1/FakeCN.pem',
res['ssl_crt'])
# render_template_tls_no_sni
@ -68,10 +71,10 @@ class ListenerTestCase(base.TestCase):
with mock.patch('%s.open' % BUILTINS, m, create=True):
res = listener._parse_haproxy_file('123')
self.assertEqual('TERMINATED_HTTPS', res['mode'])
self.assertEqual('/var/lib/octavia/sample_listener_id_1.sock',
self.assertEqual(BASE_AMP_PATH + '/sample_listener_id_1.sock',
res['stats_socket'])
self.assertEqual(
'/var/lib/octavia/listeners/sample_listener_id_1/FakeCN.pem',
BASE_CRT_PATH + '/sample_listener_id_1/FakeCN.pem',
res['ssl_crt'])
# render_template_http
@ -82,7 +85,7 @@ class ListenerTestCase(base.TestCase):
with mock.patch('%s.open' % BUILTINS, m, create=True):
res = listener._parse_haproxy_file('123')
self.assertEqual('HTTP', res['mode'])
self.assertEqual('/var/lib/octavia/sample_listener_id_1.sock',
self.assertEqual(BASE_AMP_PATH + '/sample_listener_id_1.sock',
res['stats_socket'])
self.assertIsNone(res['ssl_crt'])
@ -94,7 +97,7 @@ class ListenerTestCase(base.TestCase):
with mock.patch('%s.open' % BUILTINS, m, create=True):
res = listener._parse_haproxy_file('123')
self.assertEqual('TCP', res['mode'])
self.assertEqual('/var/lib/octavia/sample_listener_id_1.sock',
self.assertEqual(BASE_AMP_PATH + '/sample_listener_id_1.sock',
res['stats_socket'])
self.assertIsNone(res['ssl_crt'])

View File

@ -23,7 +23,7 @@ class TestHaproxyCfg(base.TestCase):
super(TestHaproxyCfg, self).setUp()
self.jinja_cfg = jinja_cfg.JinjaTemplater(
base_amp_path='/var/lib/octavia',
base_crt_dir='/listeners')
base_crt_dir='/var/lib/octavia/certs')
def test_get_template(self):
template = self.jinja_cfg._get_template()
@ -35,9 +35,9 @@ class TestHaproxyCfg(base.TestCase):
" maxconn 98\n"
" option forwardfor\n"
" bind 10.0.0.2:443 "
"ssl crt /var/lib/octavia/listeners/"
"ssl crt /var/lib/octavia/certs/"
"sample_listener_id_1/FakeCN.pem "
"crt /var/lib/octavia/listeners/sample_listener_id_1\n"
"crt /var/lib/octavia/certs/sample_listener_id_1\n"
" mode http\n"
" default_backend sample_pool_id_1\n\n")
be = ("backend sample_pool_id_1\n"
@ -73,7 +73,7 @@ class TestHaproxyCfg(base.TestCase):
" maxconn 98\n"
" option forwardfor\n"
" bind 10.0.0.2:443 "
"ssl crt /var/lib/octavia/listeners/"
"ssl crt /var/lib/octavia/certs/"
"sample_listener_id_1/FakeCN.pem\n"
" mode http\n"
" default_backend sample_pool_id_1\n\n")

View File

@ -21,6 +21,7 @@ from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg
from octavia.amphorae.drivers.haproxy import ssh_driver
from octavia.certificates.manager import barbican
from octavia.common import data_models
from octavia.common import keystone
from octavia.common.tls_utils import cert_parser
from octavia.db import models as models
from octavia.network import data_models as network_models
@ -46,8 +47,11 @@ MOCK_CIDR = '10.0.0.0/24'
class TestSshDriver(base.TestCase):
FAKE_UUID_1 = uuidutils.generate_uuid()
def setUp(self):
@mock.patch('octavia.common.keystone.get_session',
return_value=mock.MagicMock)
def setUp(self, mock_session):
super(TestSshDriver, self).setUp()
mock.MagicMock(keystone.get_session())
self.driver = ssh_driver.HaproxyManager()
self.listener = sample_configs.sample_listener_tuple()
self.vip = sample_configs.sample_vip_tuple()
@ -169,27 +173,28 @@ class TestSshDriver(base.TestCase):
def test_process_tls_certificates(self):
listener = sample_configs.sample_listener_tuple(tls=True, sni=True)
with mock.patch.object(self.driver, '_build_pem') as pem:
with mock.patch.object(cert_parser, 'build_pem') as pem:
with mock.patch.object(self.driver.barbican_client,
'get_cert') as bbq:
with mock.patch.object(cert_parser,
'get_host_names') as cp:
cp.return_value = {'cn': 'fakeCN'}
pem.return_value = 'imapem'
pem.return_value = 'imapem.pem'
self.driver._process_tls_certificates(listener)
# Ensure upload_cert is called three times
calls_bbq = [mock.call(listener.default_tls_container.id),
calls_bbq = [mock.call(listener.default_tls_container.id,
check_only=True),
mock.call().get_certificate(),
mock.call().get_private_key(),
mock.call().get_certificate(),
mock.call().get_intermediates(),
mock.call('cont_id_2'),
mock.call('cont_id_2', check_only=True),
mock.call().get_certificate(),
mock.call().get_private_key(),
mock.call().get_certificate(),
mock.call().get_intermediates(),
mock.call('cont_id_3'),
mock.call('cont_id_3', check_only=True),
mock.call().get_certificate(),
mock.call().get_private_key(),
mock.call().get_certificate(),
@ -240,7 +245,7 @@ class TestSshDriver(base.TestCase):
tls_tupe = sample_configs.sample_tls_container_tuple(
certificate='imacert', private_key='imakey',
intermediates=['imainter', 'imainter2'])
self.assertEqual(expected, self.driver._build_pem(tls_tupe))
self.assertEqual(expected, cert_parser.build_pem(tls_tupe))
@mock.patch.object(ssh_driver.HaproxyManager, '_execute_command')
def test_post_vip_plug_no_down_links(self, exec_command):

View File

@ -18,9 +18,10 @@ import six
import octavia.common.exceptions as exceptions
import octavia.common.tls_utils.cert_parser as cert_parser
from octavia.tests.unit import base
from octavia.tests.unit.common.sample_configs import sample_configs
ALT_EXT_CRT = b"""-----BEGIN CERTIFICATE-----
ALT_EXT_CRT = """-----BEGIN CERTIFICATE-----
MIIGqjCCBZKgAwIBAgIJAIApBg8slSSiMA0GCSqGSIb3DQEBBQUAMIGLMQswCQYD
VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w
HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g
@ -60,7 +61,7 @@ elu2c/X7MR4ObOjhDfaVGQ8kMhYf5hx69qyNDsGi
-----END CERTIFICATE-----
"""
SOME_OTHER_RSA_KEY = b"""
SOME_OTHER_RSA_KEY = """
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDDnJL9dAdDpjoq4tksTJmdM0AjIHa7Y2yc8XwU7YkgrOR0m4Po
r7El0NwWf5i/LFudX1cOkfwemMIPwQ+67k0BVu/W3SR+g9ZzVKZtTBJnDoqMZ4RJ
@ -78,7 +79,7 @@ q3NRSsf/nRLY1NtMdwJAVKOdUCwZKGpGyOUZPRbZZAPlojIff2CxJ6E2Pr0RbShD
-----END RSA PRIVATE KEY-----
"""
ALT_EXT_CRT_KEY = b"""
ALT_EXT_CRT_KEY = """
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAsvWeZsM9QOmzziLWwzeuEetz4OW7Q3/ApBYpkV6JZS0X+mi3
X1XejTJcOmyDtblGQsxMWRkRydCnIZ2kAaNOPOY1cxnD30TPGyatHeXqFQQhKJ9V
@ -110,7 +111,7 @@ iMwJYgm98P27s4TEMdhlPNVJrj1FrD+4VrgpOsoM20EkZnTvel9s
ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE = "test_passphrase"
ENCRYPTED_PKCS8_CRT_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
ENCRYPTED_PKCS8_CRT_KEY = """-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6TAbBgkqhkiG9w0BBQMwDgQIT04zko6pmJICAggABIIEyL/79sqzTQ7BsEjY
ao2Uhh3//mpNJfCDhjSZOmWL7s4+161cEqpxrfxo4bHH8fkZ60VZUQP8CjwwQUhP
4iwpv2bYbQwzlttZwTC6s28wh7FRtgVoVPTwvXJa6fl2zAjLtsjwLZ/556ez9xIJ
@ -141,7 +142,7 @@ p7cuYY1cAyI7gFfl5A==
-----END ENCRYPTED PRIVATE KEY-----
"""
UNENCRYPTED_PKCS8_CRT_KEY = b"""-----BEGIN PRIVATE KEY-----
UNENCRYPTED_PKCS8_CRT_KEY = """-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCy9Z5mwz1A6bPO
ItbDN64R63Pg5btDf8CkFimRXollLRf6aLdfVd6NMlw6bIO1uUZCzExZGRHJ0Kch
naQBo0485jVzGcPfRM8bJq0d5eoVBCEon1W7xLn7WGU+oz8TOTC+lgIxTWgRGT1r
@ -173,7 +174,7 @@ P7hWuCk6ygzbQSRmdO96X2w=
EXPECTED_IMD_SUBJS = ["IMD3", "IMD2", "IMD1"]
X509_IMDS = b"""Junk
X509_IMDS = """Junk
-----BEGIN CERTIFICATE-----
MIIBhDCCAS6gAwIBAgIGAUo7hO/eMA0GCSqGSIb3DQEBCwUAMA8xDTALBgNVBAMT
BElNRDIwHhcNMTQxMjExMjI0MjU1WhcNMjUxMTIzMjI0MjU1WjAPMQ0wCwYDVQQD
@ -255,3 +256,10 @@ class TestTLSParseUtils(base.TestCase):
for i in six.moves.xrange(0, len(imds)):
self.assertEqual(EXPECTED_IMD_SUBJS[i], imds[i].get_subject().CN)
def test_build_pem(self):
expected = 'imainter\nimainter2\nimacert\nimakey'
tls_tupe = sample_configs.sample_tls_container_tuple(
certificate='imacert', private_key='imakey',
intermediates=['imainter', 'imainter2'])
self.assertEqual(expected, cert_parser.build_pem(tls_tupe))