Merge "Add listener client authentication scenario tests"
This commit is contained in:
commit
7cd22d2598
@ -63,15 +63,15 @@ class BarbicanClientManager(object):
|
||||
# Setup the barbican client
|
||||
self.barbican = client.Client(session=id_session)
|
||||
|
||||
def store_secret(self, pkcs12_secret):
|
||||
def store_secret(self, secret):
|
||||
"""Store a secret in barbican.
|
||||
|
||||
:param pkcs12_secret: A pkcs12 secret.
|
||||
:param secret: A pkcs12 secret.
|
||||
:returns: The barbican secret_ref.
|
||||
"""
|
||||
p12_secret = self.barbican.secrets.create()
|
||||
p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12")
|
||||
p12_secret.payload = pkcs12_secret
|
||||
p12_secret.name = data_utils.rand_name("lb_member_barbican")
|
||||
p12_secret.payload = secret
|
||||
secret_ref = p12_secret.store()
|
||||
LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref))
|
||||
return secret_ref
|
||||
|
@ -58,6 +58,13 @@ def generate_ca_cert_and_key():
|
||||
).add_extension(
|
||||
x509.BasicConstraints(ca=True, path_length=None),
|
||||
critical=True,
|
||||
).add_extension(
|
||||
# KeyUsage(digital_signature, content_commitment, key_encipherment,
|
||||
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
|
||||
# encipher_only, decipher_only)
|
||||
x509.KeyUsage(True, False, False, False, False,
|
||||
True, True, False, False),
|
||||
critical=True,
|
||||
).sign(ca_key, hashes.SHA256(), default_backend())
|
||||
|
||||
return ca_cert, ca_key
|
||||
@ -104,11 +111,66 @@ def generate_server_cert_and_key(ca_cert, ca_key, server_uuid):
|
||||
).add_extension(
|
||||
x509.BasicConstraints(ca=False, path_length=None),
|
||||
critical=True,
|
||||
).add_extension(
|
||||
# KeyUsage(digital_signature, content_commitment, key_encipherment,
|
||||
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
|
||||
# encipher_only, decipher_only)
|
||||
x509.KeyUsage(True, False, True, False, False,
|
||||
False, False, False, False),
|
||||
critical=True,
|
||||
).sign(ca_key, hashes.SHA256(), default_backend())
|
||||
|
||||
return server_cert, server_key
|
||||
|
||||
|
||||
def generate_client_cert_and_key(ca_cert, ca_key, client_uuid):
|
||||
"""Creates a client cert and key for testing.
|
||||
|
||||
:param ca_cert: A cryptography CA certificate (x509) object.
|
||||
:param ca_key: A cryptography CA key (x509) object.
|
||||
:param client_uuid: A UUID identifying the client.
|
||||
:returns: The cryptography server cert and key objects.
|
||||
"""
|
||||
|
||||
client_key = rsa.generate_private_key(
|
||||
public_exponent=65537, key_size=2048, backend=default_backend())
|
||||
|
||||
subject = x509.Name([
|
||||
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
|
||||
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"),
|
||||
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"),
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)),
|
||||
])
|
||||
|
||||
client_cert = x509.CertificateBuilder().subject_name(
|
||||
subject
|
||||
).issuer_name(
|
||||
ca_cert.subject
|
||||
).public_key(
|
||||
client_key.public_key()
|
||||
).serial_number(
|
||||
x509.random_serial_number()
|
||||
).not_valid_before(
|
||||
datetime.datetime.utcnow()
|
||||
).not_valid_after(
|
||||
datetime.datetime.utcnow() + datetime.timedelta(days=10)
|
||||
).add_extension(
|
||||
x509.BasicConstraints(ca=False, path_length=None),
|
||||
critical=True,
|
||||
).add_extension(
|
||||
# KeyUsage(digital_signature, content_commitment, key_encipherment,
|
||||
# data_encipherment, key_agreement, key_cert_sign, crl_sign,
|
||||
# encipher_only, decipher_only)
|
||||
x509.KeyUsage(True, True, True, False, False, False,
|
||||
False, False, False),
|
||||
critical=True,
|
||||
).sign(ca_key, hashes.SHA256(), default_backend())
|
||||
|
||||
return client_cert, client_key
|
||||
|
||||
|
||||
def generate_pkcs12_bundle(server_cert, server_key):
|
||||
"""Creates a pkcs12 formated bundle.
|
||||
|
||||
@ -128,3 +190,28 @@ def generate_pkcs12_bundle(server_cert, server_key):
|
||||
OpenSSL.crypto.PKey.from_cryptography_key(server_key))
|
||||
pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert))
|
||||
return pkcs12.export()
|
||||
|
||||
|
||||
def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke):
|
||||
"""Create a certificate revocation list with a revoked certificate.
|
||||
|
||||
:param ca_cert: A cryptography CA certificate (x509) object.
|
||||
:param ca_key: A cryptography CA key (x509) object.
|
||||
:param cert_to_revoke: A cryptography CA certificate (x509) object.
|
||||
:returns: A signed certificate revocation list.
|
||||
"""
|
||||
crl_builder = x509.CertificateRevocationListBuilder()
|
||||
crl_builder = crl_builder.issuer_name(ca_cert.subject)
|
||||
crl_builder = crl_builder.last_update(datetime.datetime.today())
|
||||
crl_builder = crl_builder.next_update(datetime.datetime.today() +
|
||||
datetime.timedelta(1, 0, 0))
|
||||
|
||||
revoked_cert = x509.RevokedCertificateBuilder().serial_number(
|
||||
cert_to_revoke.serial_number
|
||||
).revocation_date(
|
||||
datetime.datetime.today()
|
||||
).build(default_backend())
|
||||
|
||||
crl_builder = crl_builder.add_revoked_certificate(revoked_cert)
|
||||
return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(),
|
||||
backend=default_backend())
|
||||
|
@ -20,6 +20,12 @@ AVAILABILITY_ZONE_PROFILE_ID = 'availability_zone_profile_id'
|
||||
ADMIN_STATE_UP = 'admin_state_up'
|
||||
BYTES_IN = 'bytes_in'
|
||||
BYTES_OUT = 'bytes_out'
|
||||
CLIENT_AUTHENTICATION = 'client_authentication'
|
||||
CLIENT_AUTH_NONE = 'NONE'
|
||||
CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
|
||||
CLIENT_AUTH_MANDATORY = 'MANDATORY'
|
||||
CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref'
|
||||
CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref'
|
||||
CREATED_AT = 'created_at'
|
||||
DESCRIPTION = 'description'
|
||||
FLAVOR_DATA = 'flavor_data'
|
||||
|
@ -35,7 +35,10 @@ class ListenerClient(base_client.BaseLBaaSClient):
|
||||
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
|
||||
insert_headers=Unset, default_pool_id=Unset,
|
||||
default_tls_container_ref=Unset,
|
||||
sni_container_refs=Unset, return_object_only=True):
|
||||
sni_container_refs=Unset, client_authentication=Unset,
|
||||
client_ca_tls_container_ref=Unset,
|
||||
client_crl_container_ref=Unset,
|
||||
return_object_only=True):
|
||||
"""Create a listener.
|
||||
|
||||
:param protocol: The protocol for the resource.
|
||||
@ -70,6 +73,17 @@ class ListenerClient(base_client.BaseLBaaSClient):
|
||||
secrets containing PKCS12 format
|
||||
certificate/key bundles for TERMINATED_TLS
|
||||
listeners.
|
||||
:param client_authentication: The TLS client authentication mode. One
|
||||
of the options NONE, OPTIONAL or
|
||||
MANDATORY.
|
||||
:param client_ca_tls_container_ref: The ref of the key manager service
|
||||
secret containing a PEM format
|
||||
client CA certificate bundle for
|
||||
TERMINATED_HTTPS listeners.
|
||||
:param client_crl_container_ref: The URI of the key manager service
|
||||
secret containing a PEM format CA
|
||||
revocation list file for
|
||||
TERMINATED_HTTPS listeners.
|
||||
:param return_object_only: If True, the response returns the object
|
||||
inside the root tag. False returns the full
|
||||
response from the API.
|
||||
@ -190,7 +204,10 @@ class ListenerClient(base_client.BaseLBaaSClient):
|
||||
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
|
||||
insert_headers=Unset, default_pool_id=Unset,
|
||||
default_tls_container_ref=Unset,
|
||||
sni_container_refs=Unset, return_object_only=True):
|
||||
sni_container_refs=Unset, client_authentication=Unset,
|
||||
client_ca_tls_container_ref=Unset,
|
||||
client_crl_container_ref=Unset,
|
||||
return_object_only=True):
|
||||
"""Update a listener.
|
||||
|
||||
:param listener_id: The listener ID to update.
|
||||
@ -223,6 +240,17 @@ class ListenerClient(base_client.BaseLBaaSClient):
|
||||
secrets containing PKCS12 format
|
||||
certificate/key bundles for TERMINATED_TLS
|
||||
listeners.
|
||||
:param client_authentication: The TLS client authentication mode. One
|
||||
of the options NONE, OPTIONAL or
|
||||
MANDATORY.
|
||||
:param client_ca_tls_container_ref: The ref of the key manager service
|
||||
secret containing a PEM format
|
||||
client CA certificate bundle for
|
||||
TERMINATED_HTTPS listeners.
|
||||
:param client_crl_container_ref: The URI of the key manager service
|
||||
secret containing a PEM format CA
|
||||
revocation list file for
|
||||
TERMINATED_HTTPS listeners.
|
||||
:param return_object_only: If True, the response returns the object
|
||||
inside the root tag. False returns the full
|
||||
response from the API.
|
||||
|
@ -13,7 +13,9 @@
|
||||
# under the License.
|
||||
|
||||
import base64
|
||||
import requests
|
||||
import socket
|
||||
import tempfile
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from OpenSSL.crypto import X509
|
||||
@ -52,6 +54,25 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
raise cls.skipException('TLS with Barbican tests require the '
|
||||
'barbican service.')
|
||||
|
||||
@classmethod
|
||||
def _store_secret(cls, barbican_mgr, secret):
|
||||
new_secret_ref = barbican_mgr.store_secret(secret)
|
||||
cls.addClassResourceCleanup(barbican_mgr.delete_secret,
|
||||
new_secret_ref)
|
||||
|
||||
# Set the barbican ACL if the Octavia API version doesn't do it
|
||||
# automatically.
|
||||
if not cls.mem_lb_client.is_version_supported(
|
||||
cls.api_version, '2.1'):
|
||||
user_list = cls.os_admin.users_v3_client.list_users(
|
||||
name=CONF.load_balancer.octavia_svc_username)
|
||||
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
|
||||
CONF.load_balancer.octavia_svc_username,
|
||||
len(user_list['users']))
|
||||
assert 1 == len(user_list['users']), msg
|
||||
barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
|
||||
return new_secret_ref
|
||||
|
||||
@classmethod
|
||||
def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
|
||||
new_cert, new_key = cert_utils.generate_server_cert_and_key(
|
||||
@ -72,20 +93,8 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
|
||||
LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
|
||||
|
||||
new_secret_ref = barbican_mgr.store_secret(pkcs12)
|
||||
cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
|
||||
new_secret_ref = cls._store_secret(barbican_mgr, pkcs12)
|
||||
|
||||
# Set the barbican ACL if the Octavia API version doesn't do it
|
||||
# automatically.
|
||||
if not cls.mem_lb_client.is_version_supported(
|
||||
cls.api_version, '2.1'):
|
||||
user_list = cls.os_admin.users_v3_client.list_users(
|
||||
name=CONF.load_balancer.octavia_svc_username)
|
||||
msg = 'Only one user named "{0}" should exist, {1} found.'.format(
|
||||
CONF.load_balancer.octavia_svc_username,
|
||||
len(user_list['users']))
|
||||
assert 1 == len(user_list['users']), msg
|
||||
barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
|
||||
return new_cert, new_key, new_secret_ref
|
||||
|
||||
@classmethod
|
||||
@ -108,7 +117,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
|
||||
# Load the secret into the barbican service under the
|
||||
# os_roles_lb_member tenant
|
||||
barbican_mgr = barbican_client_mgr.BarbicanClientManager(
|
||||
cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager(
|
||||
cls.os_roles_lb_member)
|
||||
|
||||
# Create a server cert and key
|
||||
@ -117,7 +126,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
|
||||
|
||||
server_cert, server_key, cls.server_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.server_uuid))
|
||||
|
||||
# Create the SNI1 cert and key
|
||||
@ -125,7 +134,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
|
||||
|
||||
SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.SNI1_uuid))
|
||||
|
||||
# Create the SNI2 cert and key
|
||||
@ -133,9 +142,37 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
|
||||
|
||||
SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
|
||||
cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
|
||||
cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
|
||||
ca_key, cls.SNI2_uuid))
|
||||
|
||||
# Create the client authentication CA
|
||||
cls.client_ca_cert, client_ca_key = (
|
||||
cert_utils.generate_ca_cert_and_key())
|
||||
|
||||
cls.client_ca_cert_ref = cls._store_secret(
|
||||
cls.barbican_mgr,
|
||||
cls.client_ca_cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Create client cert and key
|
||||
cls.client_cn = uuidutils.generate_uuid()
|
||||
cls.client_cert, cls.client_key = (
|
||||
cert_utils.generate_client_cert_and_key(
|
||||
cls.client_ca_cert, client_ca_key, cls.client_cn))
|
||||
|
||||
# Create revoked client cert and key
|
||||
cls.revoked_client_cn = uuidutils.generate_uuid()
|
||||
cls.revoked_client_cert, cls.revoked_client_key = (
|
||||
cert_utils.generate_client_cert_and_key(
|
||||
cls.client_ca_cert, client_ca_key, cls.revoked_client_cn))
|
||||
|
||||
# Create certificate revocation list and revoke cert
|
||||
cls.client_crl = cert_utils.generate_certificate_revocation_list(
|
||||
cls.client_ca_cert, client_ca_key, cls.revoked_client_cert)
|
||||
|
||||
cls.client_crl_ref = cls._store_secret(
|
||||
cls.barbican_mgr,
|
||||
cls.client_crl.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Setup a load balancer for the tests to use
|
||||
lb_name = data_utils.rand_name("lb_member_lb1-tls")
|
||||
lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider,
|
||||
@ -618,3 +655,384 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
|
||||
sock.connect((self.lb_vip_address, 8443))
|
||||
# Validate the certificate is signed by the ca_cert we created
|
||||
sock.do_handshake()
|
||||
|
||||
@decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331')
|
||||
def test_tls_client_auth_mandatory(self):
|
||||
if not self.mem_listener_client.is_version_supported(
|
||||
self.api_version, '2.8'):
|
||||
raise self.skipException('TLS client authentication '
|
||||
'is only available on Octavia API '
|
||||
'version 2.8 or newer.')
|
||||
LISTENER1_TCP_PORT = '443'
|
||||
listener_name = data_utils.rand_name(
|
||||
"lb_member_listener1-client-auth-mand")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
|
||||
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
|
||||
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test that no client certificate fails to connect
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*certificate required.*",
|
||||
requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False)
|
||||
|
||||
# Test that a revoked client certificate fails to connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.revoked_client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.revoked_client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*revoked.*", requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a valid client certificate can connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
response = requests.get(
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
@decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3')
|
||||
def test_tls_client_auth_optional(self):
|
||||
if not self.mem_listener_client.is_version_supported(
|
||||
self.api_version, '2.8'):
|
||||
raise self.skipException('TLS client authentication '
|
||||
'is only available on Octavia API '
|
||||
'version 2.8 or newer.')
|
||||
LISTENER1_TCP_PORT = '443'
|
||||
listener_name = data_utils.rand_name(
|
||||
"lb_member_listener1-client-auth-optional")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL,
|
||||
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
|
||||
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test that no client certificate connects
|
||||
response = requests.get(
|
||||
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False)
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
# Test that a revoked client certificate fails to connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.revoked_client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.revoked_client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*revoked.*", requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a valid client certificate can connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
response = requests.get(
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
@decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438')
|
||||
def test_tls_multi_listener_client_auth(self):
|
||||
"""Test client authentication in a multi-listener LB.
|
||||
|
||||
Validates that certificates and CRLs don't get cross configured
|
||||
between multiple listeners on the same load balancer.
|
||||
"""
|
||||
if not self.mem_listener_client.is_version_supported(
|
||||
self.api_version, '2.8'):
|
||||
raise self.skipException('TLS client authentication '
|
||||
'is only available on Octavia API '
|
||||
'version 2.8 or newer.')
|
||||
# Create the client2 authentication CA
|
||||
client2_ca_cert, client2_ca_key = (
|
||||
cert_utils.generate_ca_cert_and_key())
|
||||
|
||||
client2_ca_cert_ref = self._store_secret(
|
||||
self.barbican_mgr,
|
||||
client2_ca_cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Create client2 cert and key
|
||||
client2_cn = uuidutils.generate_uuid()
|
||||
client2_cert, client2_key = (
|
||||
cert_utils.generate_client_cert_and_key(
|
||||
client2_ca_cert, client2_ca_key, client2_cn))
|
||||
|
||||
# Create revoked client2 cert and key
|
||||
revoked_client2_cn = uuidutils.generate_uuid()
|
||||
revoked_client2_cert, revoked_client2_key = (
|
||||
cert_utils.generate_client_cert_and_key(
|
||||
client2_ca_cert, client2_ca_key, revoked_client2_cn))
|
||||
|
||||
# Create certificate revocation list and revoke cert
|
||||
client2_crl = cert_utils.generate_certificate_revocation_list(
|
||||
client2_ca_cert, client2_ca_key, revoked_client2_cert)
|
||||
|
||||
client2_crl_ref = self._store_secret(
|
||||
self.barbican_mgr,
|
||||
client2_crl.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
LISTENER1_TCP_PORT = '443'
|
||||
listener_name = data_utils.rand_name(
|
||||
"lb_member_listener1-multi-list-client-auth")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
|
||||
const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
|
||||
const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
|
||||
}
|
||||
listener = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener_id = listener[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
LISTENER2_TCP_PORT = '8443'
|
||||
listener_name = data_utils.rand_name(
|
||||
"lb_member_listener2-multi-list-client-auth")
|
||||
listener_kwargs = {
|
||||
const.NAME: listener_name,
|
||||
const.PROTOCOL: const.TERMINATED_HTTPS,
|
||||
const.PROTOCOL_PORT: LISTENER2_TCP_PORT,
|
||||
const.LOADBALANCER_ID: self.lb_id,
|
||||
const.DEFAULT_POOL_ID: self.pool_id,
|
||||
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
|
||||
const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
|
||||
const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref,
|
||||
const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref,
|
||||
}
|
||||
listener2 = self.mem_listener_client.create_listener(**listener_kwargs)
|
||||
self.listener2_id = listener2[const.ID]
|
||||
self.addCleanup(
|
||||
self.mem_listener_client.cleanup_listener,
|
||||
self.listener2_id,
|
||||
lb_client=self.mem_lb_client, lb_id=self.lb_id)
|
||||
|
||||
waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
|
||||
self.lb_id, const.PROVISIONING_STATUS,
|
||||
const.ACTIVE,
|
||||
CONF.load_balancer.build_interval,
|
||||
CONF.load_balancer.build_timeout)
|
||||
|
||||
# Test that no client certificate fails to connect to listener1
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*certificate required.*",
|
||||
requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False)
|
||||
|
||||
# Test that no client certificate fails to connect to listener2
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*certificate required.*",
|
||||
requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT),
|
||||
timeout=12, verify=False)
|
||||
|
||||
# Test that a revoked client certificate fails to connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.revoked_client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.revoked_client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*revoked.*", requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a revoked client2 certificate fails to connect
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(revoked_client2_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(revoked_client2_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*revoked.*", requests.get,
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER2_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a valid client certificate can connect to listener1
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
response = requests.get(
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
# Test that a valid client2 certificate can connect to listener2
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(client2_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(client2_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
response = requests.get(
|
||||
'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER2_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
self.assertEqual(200, response.status_code)
|
||||
|
||||
# Test that a valid client1 certificate can not connect to listener2
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*decrypt error.*",
|
||||
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER2_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a valid client2 certificate can not connect to listener1
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(client2_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(client2_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*decrypt error.*",
|
||||
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a revoked client1 certificate can not connect to listener2
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(self.revoked_client_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(self.revoked_client_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*decrypt error.*",
|
||||
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER2_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
||||
# Test that a revoked client2 certificate can not connect to listener1
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
|
||||
cert_file.write(revoked_client2_cert.public_bytes(
|
||||
serialization.Encoding.PEM))
|
||||
with tempfile.NamedTemporaryFile(buffering=0) as key_file:
|
||||
key_file.write(revoked_client2_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
serialization.NoEncryption()))
|
||||
self.assertRaisesRegex(
|
||||
requests.exceptions.SSLError, ".*decrypt error.*",
|
||||
requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
|
||||
LISTENER1_TCP_PORT),
|
||||
timeout=12, verify=False, cert=(cert_file.name,
|
||||
key_file.name))
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Adds scenario tests for listener client authentication.
|
Loading…
Reference in New Issue
Block a user