Fix py3x gates and functional

WIP - This patch attempts to fix the py3x gates.
Please add to it as you find issues.

Closes-Bug: #1659064

Co-Authored-By: Adam Harwell <flux.adam@gmail.com>
Depends-On: If6b6f19130c965436a637a03a4cf72203e0786b0

Change-Id: If642f7ddcb886b4e9fd04a12397f26c72b3485a4
This commit is contained in:
Michael Johnson 2017-01-24 14:58:29 -08:00 committed by Adam Harwell
parent 67866ea193
commit 119e223750
17 changed files with 99 additions and 120 deletions

View File

@ -184,7 +184,7 @@ class HaproxyAmphoraLoadBalancerDriver(
for cert in certs:
pem = cert_parser.build_pem(cert)
md5 = hashlib.md5(six.b(pem)).hexdigest() # nosec
md5 = hashlib.md5(pem).hexdigest() # nosec
name = '{cn}.pem'.format(cn=cert.primary_cn)
self._apply(self._upload_cert, listener, pem, md5, name)

View File

@ -47,7 +47,7 @@ class LocalCertGenerator(cert_gen.CertGenerator):
if not ca_cert:
LOG.info(_LI("Using CA Certificate from config."))
try:
ca_cert = open(CONF.certificates.ca_certificate).read()
ca_cert = open(CONF.certificates.ca_certificate, 'rb').read()
except IOError:
raise exceptions.CertificateGenerationException(
msg="Failed to load CA Certificate {0}."
@ -56,7 +56,7 @@ class LocalCertGenerator(cert_gen.CertGenerator):
if not ca_key:
LOG.info(_LI("Using CA Private Key from config."))
try:
ca_key = open(CONF.certificates.ca_private_key).read()
ca_key = open(CONF.certificates.ca_private_key, 'rb').read()
except IOError:
raise exceptions.CertificateGenerationException(
msg="Failed to load CA Private Key {0}."
@ -105,13 +105,14 @@ class LocalCertGenerator(cert_gen.CertGenerator):
)
if not ca_cert:
with open(CONF.certificates.ca_certificate, 'r') as f:
with open(CONF.certificates.ca_certificate, 'rb') as f:
ca_cert = f.read()
if not ca_key:
with open(CONF.certificates.ca_private_key, 'r') as f:
with open(CONF.certificates.ca_private_key, 'rb') as f:
ca_key = f.read()
if not ca_key_pass:
ca_key_pass = CONF.certificates.ca_private_key_passphrase
ca_key_pass = ca_key_pass.encode('utf-8')
try:
lo_cert = x509.load_pem_x509_certificate(

View File

@ -29,10 +29,10 @@ import octavia.common.exceptions as exceptions
from octavia.i18n import _LE
X509_BEG = '-----BEGIN CERTIFICATE-----'
X509_END = '-----END CERTIFICATE-----'
PKCS7_BEG = '-----BEGIN PKCS7-----'
PKCS7_END = '-----END PKCS7-----'
X509_BEG = b'-----BEGIN CERTIFICATE-----'
X509_END = b'-----END CERTIFICATE-----'
PKCS7_BEG = b'-----BEGIN PKCS7-----'
PKCS7_END = b'-----END PKCS7-----'
LOG = logging.getLogger(__name__)
@ -73,14 +73,12 @@ def _read_private_key(private_key_pem, passphrase=None):
:returns: a RSAPrivatekey object
"""
if passphrase:
if six.PY2:
passphrase = passphrase.encode("utf-8")
elif six.PY3:
passphrase = six.b(passphrase)
passphrase = passphrase.encode("utf-8")
if type(private_key_pem) == six.text_type:
private_key_pem = private_key_pem.encode('utf-8')
try:
pkey = private_key_pem.encode('ascii')
return serialization.load_pem_private_key(pkey, passphrase,
return serialization.load_pem_private_key(private_key_pem, passphrase,
backends.default_backend())
except Exception:
LOG.exception(_LE("Passphrase required."))
@ -97,8 +95,7 @@ def prepare_private_key(private_key, passphrase=None):
return pk.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()).decode(
'unicode_escape').strip()
encryption_algorithm=serialization.NoEncryption()).strip()
def get_intermediates_pems(intermediates=None):
@ -109,7 +106,7 @@ def get_intermediates_pems(intermediates=None):
X509 pem block surrounded by BEGIN CERTIFICATE,
END CERTIFICATE block tags
"""
if X509_BEG in str(intermediates):
if X509_BEG in intermediates:
for x509Pem in _split_x509s(intermediates):
yield _prepare_x509_cert(_get_x509_from_pem_bytes(x509Pem))
else:
@ -123,8 +120,7 @@ def _prepare_x509_cert(cert=None):
:param intermediates: X509Certificate object
:returns: A PEM-encoded X509 certificate
"""
return cert.public_bytes(encoding=serialization.Encoding.PEM).decode(
'unicode_escape').strip()
return cert.public_bytes(encoding=serialization.Encoding.PEM).strip()
def _split_x509s(xstr):
@ -137,16 +133,18 @@ def _split_x509s(xstr):
"""
curr_pem_block = []
inside_x509 = False
if type(xstr) == six.binary_type:
xstr = xstr.decode('utf-8')
for line in xstr.replace("\r", "").split("\n"):
if inside_x509:
curr_pem_block.append(line)
if line == X509_END:
yield "\n".join(curr_pem_block)
if line == X509_END.decode('utf-8'):
yield six.b("\n".join(curr_pem_block))
curr_pem_block = []
inside_x509 = False
continue
else:
if line == X509_BEG:
if line == X509_BEG.decode('utf-8'):
curr_pem_block.append(line)
inside_x509 = True
@ -158,9 +156,9 @@ def _parse_pkcs7_bundle(pkcs7):
:returns: A list of individual DER-encoded certificates
"""
# Look for PEM encoding
if PKCS7_BEG in str(pkcs7):
if PKCS7_BEG in pkcs7:
try:
for substrate in _read_pem_blocks(pkcs7, (PKCS7_BEG, PKCS7_END)):
for substrate in _read_pem_blocks(pkcs7):
for cert in _get_certs_from_pkcs7_substrate(substrate):
yield cert
except Exception:
@ -173,7 +171,7 @@ def _parse_pkcs7_bundle(pkcs7):
yield cert
def _read_pem_blocks(data, *markers):
def _read_pem_blocks(data):
"""Parse a series of PEM-encoded blocks
This method is based on pyasn1-modules.pem.readPemBlocksFromFile, but
@ -186,15 +184,12 @@ def _read_pem_blocks(data, *markers):
"""
stSpam, stHam, stDump = 0, 1, 2
startMarkers = dict(map(lambda x: (x[1], x[0]),
enumerate(map(lambda x: x[0], markers))))
stopMarkers = dict(map(lambda x: (x[1], x[0]),
enumerate(map(lambda x: x[1], markers))))
startMarkers = {PKCS7_BEG.decode('utf-8'): 0}
stopMarkers = {PKCS7_END.decode('utf-8'): 0}
idx = -1
state = stSpam
if six.PY3:
data = str(data)
if type(data) == six.binary_type:
data = data.decode('utf-8')
for certLine in data.replace('\r', '').split('\n'):
if not certLine:
continue
@ -211,12 +206,7 @@ def _read_pem_blocks(data, *markers):
else:
certLines.append(certLine)
if state == stDump:
if six.PY2:
yield ''.join([
base64.b64decode(x) for x in certLines])
elif six.PY3:
yield ''.encode().join([
base64.b64decode(x) for x in certLines])
yield b''.join([base64.b64decode(x) for x in certLines])
state = stSpam
@ -260,8 +250,6 @@ def get_host_names(certificate):
the SubjectAltNames of the certificate.
"""
try:
certificate = certificate.encode('ascii')
cert = x509.load_pem_x509_certificate(certificate,
backends.default_backend())
cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0]
@ -292,9 +280,7 @@ def get_cert_expiration(certificate_pem):
:returns: Expiration date of certificate_pem
"""
try:
certificate = certificate_pem.encode('ascii')
cert = x509.load_pem_x509_certificate(certificate,
cert = x509.load_pem_x509_certificate(certificate_pem,
backends.default_backend())
return cert.not_valid_after
except Exception:
@ -308,10 +294,10 @@ def _get_x509_from_pem_bytes(certificate_pem):
:param certificate_pem: Certificate in PEM format
:returns: crypto high-level x509 data from the PEM string
"""
if type(certificate_pem) == six.text_type:
certificate_pem = certificate_pem.encode('utf-8')
try:
certificate = certificate_pem.encode('ascii')
x509cert = x509.load_pem_x509_certificate(certificate,
x509cert = x509.load_pem_x509_certificate(certificate_pem,
backends.default_backend())
except Exception:
LOG.exception(_LE('Unreadable Certificate.'))
@ -345,7 +331,7 @@ def build_pem(tls_container):
pem = [tls_container.certificate, tls_container.private_key]
if tls_container.intermediates:
pem.extend(tls_container.intermediates[:])
return '\n'.join(pem) + '\n'
return b'\n'.join(pem) + b'\n'
def load_certificates_data(cert_mngr, listener):

View File

@ -413,9 +413,6 @@ class Repositories(object):
'object: {obj}'.format(quant=quantity, proj=project_id,
obj=str(_class)))
if not project_id:
raise exceptions.MissingProjectID()
# Lock the project record in the database to block other quota checks
try:
quotas = lock_session.query(models.Quotas).filter_by(
@ -428,7 +425,8 @@ class Repositories(object):
clss=type(_class), proj=project_id))
return
if _class == data_models.LoadBalancer:
if quotas.in_use_load_balancer > 0:
if (quotas.in_use_load_balancer is not None and
quotas.in_use_load_balancer > 0):
quotas.in_use_load_balancer = (
quotas.in_use_load_balancer - quantity)
else:
@ -439,7 +437,8 @@ class Repositories(object):
'quota.').format(clss=type(_class),
proj=project_id))
if _class == data_models.Listener:
if quotas.in_use_listener > 0:
if (quotas.in_use_listener is not None and
quotas.in_use_listener > 0):
quotas.in_use_listener = (
quotas.in_use_listener - quantity)
else:
@ -450,7 +449,8 @@ class Repositories(object):
'quota.').format(clss=type(_class),
proj=project_id))
if _class == data_models.Pool:
if quotas.in_use_pool > 0:
if (quotas.in_use_pool is not None and
quotas.in_use_pool > 0):
quotas.in_use_pool = (
quotas.in_use_pool - quantity)
else:
@ -461,7 +461,8 @@ class Repositories(object):
'quota.').format(clss=type(_class),
proj=project_id))
if _class == data_models.HealthMonitor:
if quotas.in_use_health_monitor > 0:
if (quotas.in_use_health_monitor is not None and
quotas.in_use_health_monitor > 0):
quotas.in_use_health_monitor = (
quotas.in_use_health_monitor - quantity)
else:
@ -472,7 +473,8 @@ class Repositories(object):
'quota.').format(clss=type(_class),
proj=project_id))
if _class == data_models.Member:
if quotas.in_use_member > 0:
if (quotas.in_use_member is not None and
quotas.in_use_member > 0):
quotas.in_use_member = (
quotas.in_use_member - quantity)
else:

View File

@ -297,7 +297,7 @@ class TestServerTestCase(base.TestCase):
def test_info(self, mock_subbprocess, mock_hostname):
mock_hostname.side_effect = ['test-host']
mock_subbprocess.side_effect = [
"""Package: haproxy
b"""Package: haproxy
Status: install ok installed
Priority: optional
Section: net

View File

@ -121,6 +121,7 @@ class TestHealthMonitor(base.BaseAPITest):
def test_create_over_quota(self):
self.check_quota_met_true_mock.start()
self.addCleanup(self.check_quota_met_true_mock.stop)
self.post(self.hm_path,
body={'type': constants.HEALTH_MONITOR_HTTP,
'delay': 1, 'timeout': 1, 'fall_threshold': 1,

View File

@ -209,6 +209,7 @@ class TestListener(base.BaseAPITest):
'protocol_port': 80,
'project_id': self.project_id}
self.check_quota_met_true_mock.start()
self.addCleanup(self.check_quota_met_true_mock.stop)
self.post(self.listeners_path, lb_listener, status=403)
def test_update(self):

View File

@ -65,6 +65,7 @@ class TestLoadBalancer(base.BaseAPITest):
def test_create_over_quota(self):
lb_json = {'name': 'test1', 'vip': {}, 'project_id': self.project_id}
self.check_quota_met_true_mock.start()
self.addCleanup(self.check_quota_met_true_mock.stop)
self.post(self.LBS_PATH, lb_json, status=403)
def test_get_all(self):

View File

@ -210,6 +210,7 @@ class TestMember(base.BaseAPITest):
def test_create_over_quota(self):
self.check_quota_met_true_mock.start()
self.addCleanup(self.check_quota_met_true_mock.stop)
body = {'ip_address': '10.0.0.3', 'protocol_port': 81}
self.post(self.members_path, body, status=403)

View File

@ -225,6 +225,7 @@ class TestPool(base.BaseAPITest):
def test_create_over_quota(self):
self.check_quota_met_true_mock.start()
self.addCleanup(self.check_quota_met_true_mock.stop)
body = {'protocol': constants.PROTOCOL_HTTP,
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'project_id': self.project_id}

View File

@ -16,6 +16,8 @@ from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_db.sqlalchemy import test_base
# needed for tests to function when run independently:
from octavia.common import config # noqa: F401
from octavia.common import constants
from octavia.db import api as db_api
from octavia.db import base_models

View File

@ -1468,14 +1468,6 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
self.session, project_id=project_id).in_use_member)
def test_decrement_quota(self):
project_id = uuidutils.generate_uuid()
# Test check for missing project_id
self.assertRaises(exceptions.MissingProjectID,
self.repos.decrement_quota,
self.session, models.LoadBalancer, None)
# Test decrement on non-existent quota with noauth
project_id = uuidutils.generate_uuid()
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))

View File

@ -258,7 +258,7 @@ class BaseTestCase(manager.NetworkScenarioTest):
private_key=private_key)
with tempfile.NamedTemporaryFile() as key:
key.write(private_key)
key.write(private_key.encode('utf-8'))
key.flush()
self.copy_file_to_host(httpd,
"/dev/shm/httpd",
@ -591,6 +591,8 @@ class BaseTestCase(manager.NetworkScenarioTest):
3. Check that no unexpected members were balanced.
"""
members = members or ['server1_0', 'server1_1']
members = list(map(
lambda x: six.b(x) if type(x) == six.text_type else x, members))
LOG.info(_('Checking all members are balanced...'))
self._wait_for_http_service(self.vip_ip)
LOG.info(_('Connection to {vip} is valid').format(vip=self.vip_ip))
@ -696,19 +698,6 @@ class BaseTestCase(manager.NetworkScenarioTest):
total_counters[server] += ct.counters[server]
return total_counters
def _traffic_validation_after_stopping_server(self):
"""Check that the requests are sent to the only ACTIVE server."""
LOG.info(('Starting traffic_validation_after_stopping_server...'))
counters = self._send_requests(self.vip_ip, ["server1", "server2"])
LOG.info(('Counters is: {0}'.format(counters)))
# Assert that no traffic is sent to server1.
for member, counter in six.iteritems(counters):
if member == 'server1':
self.assertEqual(counter, 0,
'Member %s is not balanced' % member)
def _check_load_balancing_after_deleting_resources(self):
"""Check load balancer after deleting resources
@ -818,7 +807,7 @@ class BaseTestCase(manager.NetworkScenarioTest):
return self.execute(cmd)
def execute(self, cmd, cwd=None):
args = shlex.split(cmd.encode('utf-8'))
args = shlex.split(cmd)
subprocess_args = {'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'cwd': cwd}

View File

@ -40,7 +40,7 @@ class TestListenerBasic(base.BaseTestCase):
listener = self._create_listener(lb_id, default_pool_id=pool['id'])
self._create_members(lb_id, pool['id'], 'server1',
subnet_id=self.subnet['id'])
self._check_members_balanced()
self._check_members_balanced(['server1_0', 'server1_1'])
self._cleanup_pool(pool['id'], lb_id)
self._cleanup_listener(listener['id'], lb_id)
self._check_load_balancing_after_deleting_resources()

View File

@ -109,15 +109,15 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.client.get_cert_md5sum.assert_called_with(
self.amp, self.sl.id, sample_certs.X509_CERT_CN_3 + '.pem')
# this is called three times (last MD5 matches)
fp1 = '\n'.join([sample_certs.X509_CERT,
sample_certs.X509_CERT_KEY,
sample_certs.X509_IMDS]) + '\n'
fp2 = '\n'.join([sample_certs.X509_CERT_2,
sample_certs.X509_CERT_KEY_2,
sample_certs.X509_IMDS]) + '\n'
fp3 = '\n'.join([sample_certs.X509_CERT_3,
sample_certs.X509_CERT_KEY_3,
sample_certs.X509_IMDS]) + '\n'
fp1 = b'\n'.join([sample_certs.X509_CERT,
sample_certs.X509_CERT_KEY,
sample_certs.X509_IMDS]) + b'\n'
fp2 = b'\n'.join([sample_certs.X509_CERT_2,
sample_certs.X509_CERT_KEY_2,
sample_certs.X509_IMDS]) + b'\n'
fp3 = b'\n'.join([sample_certs.X509_CERT_3,
sample_certs.X509_CERT_KEY_3,
sample_certs.X509_IMDS]) + b'\n'
ucp_calls = [
mock.call(self.amp, self.sl.id,
sample_certs.X509_CERT_CN + '.pem', fp1),
@ -259,6 +259,8 @@ class TestAmphoraAPIClientTest(base.TestCase):
'gateway': FAKE_GATEWAY,
'mac_address': FAKE_MAC_ADDRESS,
'vrrp_ip': self.amp.vrrp_ip}
patcher = mock.patch('time.sleep').start()
self.addCleanup(patcher.stop)
def test_base_url(self):
url = self.driver._base_url(FAKE_IP)

View File

@ -20,7 +20,7 @@ import six
X509_CERT_CN = 'www.example.com'
X509_CERT = """-----BEGIN CERTIFICATE-----
X509_CERT = b"""-----BEGIN CERTIFICATE-----
MIIE8TCCAtmgAwIBAgICEAEwDQYJKoZIhvcNAQELBQAwIzEhMB8GA1UEAwwYY2Et
aW50QHNiYWx1a29mZi5pYm0uY29tMB4XDTE2MDkyNzA4MjkzNFoXDTI2MDkyNTA4
MjkzNFowGjEYMBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0B
@ -50,7 +50,7 @@ Vi/XwUwVUqRURyQtuP8QJdPh9KD7uX6xHjqBALdwzCYAFaqelPue7TJ7R/I5+02A
DV8BnY7U3zPtHtPf6i8vdYwgAOJG
-----END CERTIFICATE-----"""
X509_CERT_KEY = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEA34asqEe1MexBKGmBcrco08LYYFfJjpmW8m1yKJsmS2nmHNhJ
y4Fl+3cPDyHYOiVxnsaMIv1Q8ZMRpjYH2LhvzLt2doyMiiJrqA3ScdhZVlGKaURv
ASSj9dmbRBMqdXZBvTZnMH4aSkL4DalU7NiW+jbMb5Gmf+bozE4ZAOES6eXsP5+y
@ -78,7 +78,7 @@ nUBaaqVibLaROn4V1QnlSOA2vjc2jMMDKMfnjawtqBC018tQDVcE75sun7UzyxtS
OWaQy6KhqrKpPy3tS1wt1vAYPWZw/EIo4dDXYBo55REI5mSBZrM=
-----END RSA PRIVATE KEY-----"""
X509_CERT_KEY_ENCRYPTED = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY_ENCRYPTED = b"""-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-256-CBC,086BA545587FF5F6F4DD9AACC122603A
@ -113,7 +113,7 @@ X509_CERT_KEY_PASSPHRASE = """asdf"""
X509_CERT_CN_2 = 'www2.example.com'
X509_CERT_2 = """-----BEGIN CERTIFICATE-----
X509_CERT_2 = b"""-----BEGIN CERTIFICATE-----
MIIEbjCCAlagAwIBAgICEAIwDQYJKoZIhvcNAQELBQAwIzEhMB8GA1UEAwwYY2Et
aW50QHNiYWx1a29mZi5pYm0uY29tMB4XDTE2MDkyOTIzNDk0MFoXDTI2MDkyNzIz
NDk0MFowGzEZMBcGA1UEAwwQd3d3Mi5leGFtcGxlLmNvbTCBnzANBgkqhkiG9w0B
@ -140,7 +140,7 @@ HULxFHp3QLrnbQEvPIcD0EWppJ1GMqb/Gv8jORzOks56UtOIfavrzGrcvRSKoC4Q
lDApYKCiRvvBSVfgpoiVungh2NWSmNW5bn2uOkPt+vTjcA==
-----END CERTIFICATE-----"""
X509_CERT_KEY_2 = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY_2 = b"""-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCnyr3JsiVM/4CRrWNosTbaATvzi3etDQoLzc4NJjTbzdFBfoF8
esBVOPVy3K/PQPqHFeJhfCL2Zdce8HPPFJw/EiZDuNu9jI/QK1qBDhAaPvD5dVZj
3htjtflwXk2UYeXQQgcE7YbTBzjnkyDOVqSR+hruT1gkzfuF/CUFXohf1QIDAQAB
@ -156,7 +156,7 @@ yNDu6ayAqhUGOTDVMqkCQG9Vk7xjpe8iLkI4h7PaxaqiSwY+pyY3QoErlumALffM
t3c9Zw9YGbij+605loxv5jREFeSQMYgp2GK7rO7DTbI=
-----END RSA PRIVATE KEY-----"""
X509_CERT_KEY_ENCRYPTED_2 = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY_ENCRYPTED_2 = b"""-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-256-CBC,3CAEB474D1526248CA20B5E4F84A6BB7
@ -180,7 +180,7 @@ X509_CERT_KEY_PASSPHRASE_2 = """asdf"""
# Wildcard cert for testing
X509_CERT_CN_3 = '*.www3.example.com'
X509_CERT_3 = """-----BEGIN CERTIFICATE-----
X509_CERT_3 = b"""-----BEGIN CERTIFICATE-----
MIIFJTCCAw2gAwIBAgICEAUwDQYJKoZIhvcNAQELBQAwIzEhMB8GA1UEAwwYY2Et
aW50QHNiYWx1a29mZi5pYm0uY29tMB4XDTE2MDkzMDE3MDkyNloXDTI2MDkyODE3
MDkyNlowHTEbMBkGA1UEAwwSKi53d3czLmV4YW1wbGUuY29tMIIBIjANBgkqhkiG
@ -211,7 +211,7 @@ LwW88v99ZsWWIkE6O22+MmJGs4kxPXBFhlDUCC9zPBn2UBK8dXSYL0+F3O7cjWQ7
UUddoYPP4r24JRrqzBEldSDzWeNSORpUkg==
-----END CERTIFICATE-----"""
X509_CERT_KEY_3 = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY_3 = b"""-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA6v/973etopk2Vz95DUcx8X6hLfJ5m8s+scn7nMZ37fSqAGPF
0veGpqyqxorwh+GYLjlrvZkhVi7IZJAsLU2ztG4+MEoYzbyhgJerFmepBC7xPIJE
jh8FKhtpvxVOMFcXJ1CZT89Ww0rVPnaoE09DS0DRo5s+lW0dD6TaQW0S/6RCZ5Rp
@ -239,7 +239,7 @@ Mg0ePQKBgFn2yh/gKf0InDtR6IlIG9HVI+lMKxyU5iRH/9MQ7GS+sSjiAXdOtGJJ
KX8jFiCL+HcZX+pqAaUuifgwnqd88EX7MPoU6Yjq02To9ZAPA+SA
-----END RSA PRIVATE KEY-----"""
X509_CERT_KEY_ENCRYPTED_3 = """-----BEGIN RSA PRIVATE KEY-----
X509_CERT_KEY_ENCRYPTED_3 = b"""-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-256-CBC,088E3835B2238C332FC7AED391C9CF8D
@ -278,7 +278,7 @@ X509_CERT_KEY_PASSPHRASE_3 = """asdf"""
# intermediate certificate chains into barbican.
X509_IMDS_LIST = [
"""-----BEGIN CERTIFICATE-----
b"""-----BEGIN CERTIFICATE-----
MIIFcjCCA1qgAwIBAgICEAAwDQYJKoZIhvcNAQELBQAwbTELMAkGA1UEBhMCVVMx
EzARBgNVBAgMCldhc2hpbmd0b24xEDAOBgNVBAcMB1NlYXR0bGUxDDAKBgNVBAoM
A0lCTTEpMCcGA1UEAwwgbWFzdGVyLWNhLXRlc3RAc2JhbHVrb2ZmLmlibS5jb20w
@ -310,7 +310,7 @@ XO6erkwabZxCVfGgvIk9hE4x6+Cu+jdOLTpAwq1mcQroAp1+CInHrZeHdnhz0zR8
sJlV015O7iIu22bowsDcF9RfvkdHNULrClWI12sRspXF9VmRjbDyG4eASBiulJQV
bk9D26vP
-----END CERTIFICATE-----""",
"""-----BEGIN CERTIFICATE-----
b"""-----BEGIN CERTIFICATE-----
MIIFwDCCA6igAwIBAgIJAJLWg/Z3x5xpMA0GCSqGSIb3DQEBCwUAMG0xCzAJBgNV
BAYTAlVTMRMwEQYDVQQIDApXYXNoaW5ndG9uMRAwDgYDVQQHDAdTZWF0dGxlMQww
CgYDVQQKDANJQk0xKTAnBgNVBAMMIG1hc3Rlci1jYS10ZXN0QHNiYWx1a29mZi5p
@ -344,9 +344,9 @@ C6WXGJPCEOfOYsxdZMDbD7q9CqgT5P4kI8VfryB5iqaLfDtUwjT8GPoTybFiWHMk
n3yPHeLbGBLg9jphH7MMmsn57Z9fYjJADOOLFKG+W6txAQV3
-----END CERTIFICATE-----"""]
X509_IMDS = '\n'.join(X509_IMDS_LIST)
X509_IMDS = b'\n'.join(X509_IMDS_LIST)
PKCS7_PEM = """This line of spam should be ignored, as should the next line.
PKCS7_PEM = b"""This line of spam should be ignored, as should the next line.
-----BEGIN PKCS7-----
MIILZwYJKoZIhvcNAQcCoIILWDCCC1QCAQExADALBgkqhkiG9w0BBwGgggs6MIIF
@ -487,7 +487,7 @@ PKCS7_DER = b64decode(
# Keys for the above CA certs, logged here to make it simple to sign other
# certs for testing purposes in the future.
INTERMEDIATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
INTERMEDIATE_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
MIIJJwIBAAKCAgEAvComYtbfb1/CAbYjoDlx8Mk6OJY2Dqs/dgasiI+K/2CTR8Xi
eyRzYjNvXruivy3SZXjpA11fptCw3IZ+qTH9f1Sef2aa+3lk6sikmj+c01WyDiNS
LetDEKbEohm3H1Tye068MzSrcgV0RB8AvphLyLhMA0R3fA5YyRuSdgqUt0XvwoMU
@ -539,7 +539,7 @@ qH3e8e1WlIfA7FAqE1Dtae97oV/5wM9qp1rnijwq5jlZX+AqYq7GQ8J5x2ypGhZX
+N7I5RuaLjkJJs3i/EzCDwp8F3ZXZRiILaWSaGZlrZ8jgVtlNhNfVYVFuQ==
-----END RSA PRIVATE KEY-----"""
CA_KEY = """-----BEGIN RSA PRIVATE KEY-----
CA_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
-----BEGIN RSA PRIVATE KEY-----
MIIJKwIBAAKCAgEA3WLWYLlM0aaISfBiNvygl2cXehc5mbaqys9SYWcUleeloWJ2
JY80IEbD1/JcVFgv50ptd3I4rXyhv07wzU7Om2/be1ZGDNJNHsLwK3DUUkkf2Bzt
@ -595,7 +595,7 @@ r187A8Q9L5pB57JnuY9nO7MvrINJVNbLPYjanqrkqvwDjiPkzETVm50mVtFYLWgw
# An expired self-signed cert for testing.
X509_EXPIRED_CN = 'www.example.com'
X509_EXPIRED = """-----BEGIN CERTIFICATE-----
X509_EXPIRED = b"""-----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIJAKByYqy5fAfLMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAlVTMQ0wCwYDVQQIDAREZWFkMRAwDgYDVQQHDAdUb3RhbGx5MQwwCgYDVQQK
DANJQk0xGDAWBgNVBAMMD3d3dy5leGFtcGxlLmNvbTAeFw0xNjA5MjQxODAxNTRa
@ -617,7 +617,7 @@ S+aDmoFsO3i/E+x+qm5H0swjU9dLCvdMjo0VUpk5f1aJJ10xpeKTUYOB55haalJI
j+/EXRZyEna+vPrS8mCl0GMvlFm0ZWFdWaWPR7l3J/J4is0=
-----END CERTIFICATE-----"""
X509_EXPIRED_KEY = """-----BEGIN RSA PRIVATE KEY-----
X509_EXPIRED_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEApleCh4ju3MwotSzQWn2oScgtlMfZePlD+PQpOQsPkDdUtE4m
h/WUxvy+gCQXXqOcIL4DhG6mEQ5C0RQBTCVu9i/SutIRYpi2QYqkQCDuYfTn8GKI
tK694hCJ2jq7jrn0UnduBg0T8/TFvy8y7HLgzskfU4XxdF4jTlR8uoavmKyOuL8h
@ -646,7 +646,7 @@ Je8uvLnAPRLL95ZhclaSw2vAxmaiGIsm7WGhjnRQ2Vntgd6fNgY9
-----END RSA PRIVATE KEY-----"""
# Other certificates and keys used in tests.
ALT_EXT_CRT = """-----BEGIN CERTIFICATE-----
ALT_EXT_CRT = b"""-----BEGIN CERTIFICATE-----
MIIGqjCCBZKgAwIBAgIJAIApBg8slSSiMA0GCSqGSIb3DQEBBQUAMIGLMQswCQYD
VQQGEwJVUzEOMAwGA1UECAwFVGV4YXMxFDASBgNVBAcMC1NhbiBBbnRvbmlvMR4w
HAYDVQQKDBVPcGVuU3RhY2sgRXhwZXJpbWVudHMxFjAUBgNVBAsMDU5ldXRyb24g
@ -685,7 +685,7 @@ tL4Drm+OCXJwTrE7ClTMCwcrZnLl4tI+Z+X3DV92WQB8ldST/QFjz1hgs/4zrADA
elu2c/X7MR4ObOjhDfaVGQ8kMhYf5hx69qyNDsGi
-----END CERTIFICATE-----"""
ALT_EXT_CRT_KEY = """
ALT_EXT_CRT_KEY = b"""
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAsvWeZsM9QOmzziLWwzeuEetz4OW7Q3/ApBYpkV6JZS0X+mi3
X1XejTJcOmyDtblGQsxMWRkRydCnIZ2kAaNOPOY1cxnD30TPGyatHeXqFQQhKJ9V
@ -716,7 +716,7 @@ iMwJYgm98P27s4TEMdhlPNVJrj1FrD+4VrgpOsoM20EkZnTvel9s
ENCRYPTED_PKCS8_CRT_KEY_PASSPHRASE = 'test_passphrase'
ENCRYPTED_PKCS8_CRT_KEY = """-----BEGIN ENCRYPTED PRIVATE KEY-----
ENCRYPTED_PKCS8_CRT_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6TAbBgkqhkiG9w0BBQMwDgQIT04zko6pmJICAggABIIEyL/79sqzTQ7BsEjY
ao2Uhh3//mpNJfCDhjSZOmWL7s4+161cEqpxrfxo4bHH8fkZ60VZUQP8CjwwQUhP
4iwpv2bYbQwzlttZwTC6s28wh7FRtgVoVPTwvXJa6fl2zAjLtsjwLZ/556ez9xIJ
@ -746,7 +746,7 @@ WwMJugHFk5NQuse3P4Hh9smQrRrv1dvnpt7s4yKStKolXUaFWcXJvXVaDfR5266Y
p7cuYY1cAyI7gFfl5A==
-----END ENCRYPTED PRIVATE KEY-----"""
UNENCRYPTED_PKCS8_CRT_KEY = """-----BEGIN PRIVATE KEY-----
UNENCRYPTED_PKCS8_CRT_KEY = b"""-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCy9Z5mwz1A6bPO
ItbDN64R63Pg5btDf8CkFimRXollLRf6aLdfVd6NMlw6bIO1uUZCzExZGRHJ0Kch
naQBo0485jVzGcPfRM8bJq0d5eoVBCEon1W7xLn7WGU+oz8TOTC+lgIxTWgRGT1r
@ -777,7 +777,7 @@ P7hWuCk6ygzbQSRmdO96X2w=
EXPECTED_IMD_TEST_SUBJS = ["IMD3", "IMD2", "IMD1"]
TEST_X509_IMDS = """Junk
TEST_X509_IMDS = b"""Junk
-----BEGIN CERTIFICATE-----
MIIBhDCCAS6gAwIBAgIGAUo7hO/eMA0GCSqGSIb3DQEBCwUAMA8xDTALBgNVBAMT
BElNRDIwHhcNMTQxMjExMjI0MjU1WhcNMjUxMTIzMjI0MjU1WjAPMQ0wCwYDVQQD

View File

@ -80,7 +80,7 @@ class TestTLSParseUtils(base.TestCase):
sample_certs.X509_CERT,
private_key=sample_certs.X509_CERT_KEY,
intermediates=(sample_certs.TEST_X509_IMDS +
"\nParser should ignore junk\n")))
b"\nParser should ignore junk\n")))
self.assertRaises(exceptions.MisMatchedKey,
cert_parser.validate_cert,
sample_certs.X509_CERT,
@ -112,7 +112,7 @@ class TestTLSParseUtils(base.TestCase):
self.assertRaises(
exceptions.UnreadableCert,
lambda: list(cert_parser.get_intermediates_pems(
'-----BEGIN PKCS7-----\nbad data\n-----END PKCS7-----')))
b'-----BEGIN PKCS7-----\nbad data\n-----END PKCS7-----')))
def test_get_intermediates_pkcs7_der(self):
self.assertEqual(
@ -124,7 +124,7 @@ class TestTLSParseUtils(base.TestCase):
self.assertRaises(
exceptions.UnreadableCert,
lambda: list(cert_parser.get_intermediates_pems(
'\xfe\xfe\xff\xff')))
b'\xfe\xfe\xff\xff')))
def test_get_x509_from_der_bytes_bad(self):
self.assertRaises(
@ -178,11 +178,11 @@ class TestTLSParseUtils(base.TestCase):
cert_mock).intermediates)
def test_build_pem(self):
expected = 'imacert\nimakey\nimainter\nimainter2\n'
tls_tupe = sample_configs.sample_tls_container_tuple(
certificate='imacert', private_key='imakey',
intermediates=['imainter', 'imainter2'])
self.assertEqual(expected, cert_parser.build_pem(tls_tupe))
expected = b'imacert\nimakey\nimainter\nimainter2\n'
tls_tuple = sample_configs.sample_tls_container_tuple(
certificate=b'imacert', private_key=b'imakey',
intermediates=[b'imainter', b'imainter2'])
self.assertEqual(expected, cert_parser.build_pem(tls_tuple))
def test_get_primary_cn(self):
cert = mock.MagicMock()