diff --git a/octavia_tempest_plugin/common/barbican_client_mgr.py b/octavia_tempest_plugin/common/barbican_client_mgr.py index e93f9030..eba1715a 100644 --- a/octavia_tempest_plugin/common/barbican_client_mgr.py +++ b/octavia_tempest_plugin/common/barbican_client_mgr.py @@ -63,15 +63,15 @@ class BarbicanClientManager(object): # Setup the barbican client self.barbican = client.Client(session=id_session) - def store_secret(self, pkcs12_secret): + def store_secret(self, secret): """Store a secret in barbican. - :param pkcs12_secret: A pkcs12 secret. + :param secret: A pkcs12 secret. :returns: The barbican secret_ref. """ p12_secret = self.barbican.secrets.create() - p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12") - p12_secret.payload = pkcs12_secret + p12_secret.name = data_utils.rand_name("lb_member_barbican") + p12_secret.payload = secret secret_ref = p12_secret.store() LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref)) return secret_ref diff --git a/octavia_tempest_plugin/common/cert_utils.py b/octavia_tempest_plugin/common/cert_utils.py index dcdd6f08..bb8cdb3f 100644 --- a/octavia_tempest_plugin/common/cert_utils.py +++ b/octavia_tempest_plugin/common/cert_utils.py @@ -58,6 +58,13 @@ def generate_ca_cert_and_key(): ).add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True, + ).add_extension( + # KeyUsage(digital_signature, content_commitment, key_encipherment, + # data_encipherment, key_agreement, key_cert_sign, crl_sign, + # encipher_only, decipher_only) + x509.KeyUsage(True, False, False, False, False, + True, True, False, False), + critical=True, ).sign(ca_key, hashes.SHA256(), default_backend()) return ca_cert, ca_key @@ -104,11 +111,66 @@ def generate_server_cert_and_key(ca_cert, ca_key, server_uuid): ).add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, + ).add_extension( + # KeyUsage(digital_signature, content_commitment, key_encipherment, + # data_encipherment, key_agreement, key_cert_sign, crl_sign, + # encipher_only, decipher_only) + x509.KeyUsage(True, False, True, False, False, + False, False, False, False), + critical=True, ).sign(ca_key, hashes.SHA256(), default_backend()) return server_cert, server_key +def generate_client_cert_and_key(ca_cert, ca_key, client_uuid): + """Creates a client cert and key for testing. + + :param ca_cert: A cryptography CA certificate (x509) object. + :param ca_key: A cryptography CA key (x509) object. + :param client_uuid: A UUID identifying the client. + :returns: The cryptography server cert and key objects. + """ + + client_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend()) + + subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"), + x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)), + ]) + + client_cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + ca_cert.subject + ).public_key( + client_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=10) + ).add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ).add_extension( + # KeyUsage(digital_signature, content_commitment, key_encipherment, + # data_encipherment, key_agreement, key_cert_sign, crl_sign, + # encipher_only, decipher_only) + x509.KeyUsage(True, True, True, False, False, False, + False, False, False), + critical=True, + ).sign(ca_key, hashes.SHA256(), default_backend()) + + return client_cert, client_key + + def generate_pkcs12_bundle(server_cert, server_key): """Creates a pkcs12 formated bundle. @@ -128,3 +190,28 @@ def generate_pkcs12_bundle(server_cert, server_key): OpenSSL.crypto.PKey.from_cryptography_key(server_key)) pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert)) return pkcs12.export() + + +def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke): + """Create a certificate revocation list with a revoked certificate. + + :param ca_cert: A cryptography CA certificate (x509) object. + :param ca_key: A cryptography CA key (x509) object. + :param cert_to_revoke: A cryptography CA certificate (x509) object. + :returns: A signed certificate revocation list. + """ + crl_builder = x509.CertificateRevocationListBuilder() + crl_builder = crl_builder.issuer_name(ca_cert.subject) + crl_builder = crl_builder.last_update(datetime.datetime.today()) + crl_builder = crl_builder.next_update(datetime.datetime.today() + + datetime.timedelta(1, 0, 0)) + + revoked_cert = x509.RevokedCertificateBuilder().serial_number( + cert_to_revoke.serial_number + ).revocation_date( + datetime.datetime.today() + ).build(default_backend()) + + crl_builder = crl_builder.add_revoked_certificate(revoked_cert) + return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(), + backend=default_backend()) diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py index d4e1755f..6c65245f 100644 --- a/octavia_tempest_plugin/common/constants.py +++ b/octavia_tempest_plugin/common/constants.py @@ -20,6 +20,12 @@ AVAILABILITY_ZONE_PROFILE_ID = 'availability_zone_profile_id' ADMIN_STATE_UP = 'admin_state_up' BYTES_IN = 'bytes_in' BYTES_OUT = 'bytes_out' +CLIENT_AUTHENTICATION = 'client_authentication' +CLIENT_AUTH_NONE = 'NONE' +CLIENT_AUTH_OPTIONAL = 'OPTIONAL' +CLIENT_AUTH_MANDATORY = 'MANDATORY' +CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref' +CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref' CREATED_AT = 'created_at' DESCRIPTION = 'description' FLAVOR_DATA = 'flavor_data' diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py index de84bd14..1cc17ff4 100644 --- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py +++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py @@ -35,7 +35,10 @@ class ListenerClient(base_client.BaseLBaaSClient): timeout_member_data=Unset, timeout_tcp_inspect=Unset, insert_headers=Unset, default_pool_id=Unset, default_tls_container_ref=Unset, - sni_container_refs=Unset, return_object_only=True): + sni_container_refs=Unset, client_authentication=Unset, + client_ca_tls_container_ref=Unset, + client_crl_container_ref=Unset, + return_object_only=True): """Create a listener. :param protocol: The protocol for the resource. @@ -70,6 +73,17 @@ class ListenerClient(base_client.BaseLBaaSClient): secrets containing PKCS12 format certificate/key bundles for TERMINATED_TLS listeners. + :param client_authentication: The TLS client authentication mode. One + of the options NONE, OPTIONAL or + MANDATORY. + :param client_ca_tls_container_ref: The ref of the key manager service + secret containing a PEM format + client CA certificate bundle for + TERMINATED_HTTPS listeners. + :param client_crl_container_ref: The URI of the key manager service + secret containing a PEM format CA + revocation list file for + TERMINATED_HTTPS listeners. :param return_object_only: If True, the response returns the object inside the root tag. False returns the full response from the API. @@ -190,7 +204,10 @@ class ListenerClient(base_client.BaseLBaaSClient): timeout_member_data=Unset, timeout_tcp_inspect=Unset, insert_headers=Unset, default_pool_id=Unset, default_tls_container_ref=Unset, - sni_container_refs=Unset, return_object_only=True): + sni_container_refs=Unset, client_authentication=Unset, + client_ca_tls_container_ref=Unset, + client_crl_container_ref=Unset, + return_object_only=True): """Update a listener. :param listener_id: The listener ID to update. @@ -223,6 +240,17 @@ class ListenerClient(base_client.BaseLBaaSClient): secrets containing PKCS12 format certificate/key bundles for TERMINATED_TLS listeners. + :param client_authentication: The TLS client authentication mode. One + of the options NONE, OPTIONAL or + MANDATORY. + :param client_ca_tls_container_ref: The ref of the key manager service + secret containing a PEM format + client CA certificate bundle for + TERMINATED_HTTPS listeners. + :param client_crl_container_ref: The URI of the key manager service + secret containing a PEM format CA + revocation list file for + TERMINATED_HTTPS listeners. :param return_object_only: If True, the response returns the object inside the root tag. False returns the full response from the API. diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py index 0fe1d818..5f7ad51e 100644 --- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py +++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py @@ -13,7 +13,9 @@ # under the License. import base64 +import requests import socket +import tempfile from cryptography.hazmat.primitives import serialization from OpenSSL.crypto import X509 @@ -52,6 +54,25 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): raise cls.skipException('TLS with Barbican tests require the ' 'barbican service.') + @classmethod + def _store_secret(cls, barbican_mgr, secret): + new_secret_ref = barbican_mgr.store_secret(secret) + cls.addClassResourceCleanup(barbican_mgr.delete_secret, + new_secret_ref) + + # Set the barbican ACL if the Octavia API version doesn't do it + # automatically. + if not cls.mem_lb_client.is_version_supported( + cls.api_version, '2.1'): + user_list = cls.os_admin.users_v3_client.list_users( + name=CONF.load_balancer.octavia_svc_username) + msg = 'Only one user named "{0}" should exist, {1} found.'.format( + CONF.load_balancer.octavia_svc_username, + len(user_list['users'])) + assert 1 == len(user_list['users']), msg + barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id']) + return new_secret_ref + @classmethod def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name): new_cert, new_key = cert_utils.generate_server_cert_and_key( @@ -72,20 +93,8 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key) LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12)) - new_secret_ref = barbican_mgr.store_secret(pkcs12) - cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref) + new_secret_ref = cls._store_secret(barbican_mgr, pkcs12) - # Set the barbican ACL if the Octavia API version doesn't do it - # automatically. - if not cls.mem_lb_client.is_version_supported( - cls.api_version, '2.1'): - user_list = cls.os_admin.users_v3_client.list_users( - name=CONF.load_balancer.octavia_svc_username) - msg = 'Only one user named "{0}" should exist, {1} found.'.format( - CONF.load_balancer.octavia_svc_username, - len(user_list['users'])) - assert 1 == len(user_list['users']), msg - barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id']) return new_cert, new_key, new_secret_ref @classmethod @@ -108,7 +117,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): # Load the secret into the barbican service under the # os_roles_lb_member tenant - barbican_mgr = barbican_client_mgr.BarbicanClientManager( + cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager( cls.os_roles_lb_member) # Create a server cert and key @@ -117,7 +126,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): LOG.debug('Server (default) UUID: %s' % cls.server_uuid) server_cert, server_key, cls.server_secret_ref = ( - cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert, ca_key, cls.server_uuid)) # Create the SNI1 cert and key @@ -125,7 +134,7 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid) SNI1_cert, SNI1_key, cls.SNI1_secret_ref = ( - cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert, ca_key, cls.SNI1_uuid)) # Create the SNI2 cert and key @@ -133,9 +142,37 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid) SNI2_cert, SNI2_key, cls.SNI2_secret_ref = ( - cls._generate_load_certificate(barbican_mgr, cls.ca_cert, + cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert, ca_key, cls.SNI2_uuid)) + # Create the client authentication CA + cls.client_ca_cert, client_ca_key = ( + cert_utils.generate_ca_cert_and_key()) + + cls.client_ca_cert_ref = cls._store_secret( + cls.barbican_mgr, + cls.client_ca_cert.public_bytes(serialization.Encoding.PEM)) + + # Create client cert and key + cls.client_cn = uuidutils.generate_uuid() + cls.client_cert, cls.client_key = ( + cert_utils.generate_client_cert_and_key( + cls.client_ca_cert, client_ca_key, cls.client_cn)) + + # Create revoked client cert and key + cls.revoked_client_cn = uuidutils.generate_uuid() + cls.revoked_client_cert, cls.revoked_client_key = ( + cert_utils.generate_client_cert_and_key( + cls.client_ca_cert, client_ca_key, cls.revoked_client_cn)) + + # Create certificate revocation list and revoke cert + cls.client_crl = cert_utils.generate_certificate_revocation_list( + cls.client_ca_cert, client_ca_key, cls.revoked_client_cert) + + cls.client_crl_ref = cls._store_secret( + cls.barbican_mgr, + cls.client_crl.public_bytes(serialization.Encoding.PEM)) + # Setup a load balancer for the tests to use lb_name = data_utils.rand_name("lb_member_lb1-tls") lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider, @@ -618,3 +655,384 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute): sock.connect((self.lb_vip_address, 8443)) # Validate the certificate is signed by the ca_cert we created sock.do_handshake() + + @decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331') + def test_tls_client_auth_mandatory(self): + if not self.mem_listener_client.is_version_supported( + self.api_version, '2.8'): + raise self.skipException('TLS client authentication ' + 'is only available on Octavia API ' + 'version 2.8 or newer.') + LISTENER1_TCP_PORT = '443' + listener_name = data_utils.rand_name( + "lb_member_listener1-client-auth-mand") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: LISTENER1_TCP_PORT, + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY, + const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref, + const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref, + } + listener = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener_id = listener[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test that no client certificate fails to connect + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*certificate required.*", + requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT), + timeout=12, verify=False) + + # Test that a revoked client certificate fails to connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.revoked_client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.revoked_client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*revoked.*", requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a valid client certificate can connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + response = requests.get( + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + self.assertEqual(200, response.status_code) + + @decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3') + def test_tls_client_auth_optional(self): + if not self.mem_listener_client.is_version_supported( + self.api_version, '2.8'): + raise self.skipException('TLS client authentication ' + 'is only available on Octavia API ' + 'version 2.8 or newer.') + LISTENER1_TCP_PORT = '443' + listener_name = data_utils.rand_name( + "lb_member_listener1-client-auth-optional") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: LISTENER1_TCP_PORT, + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL, + const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref, + const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref, + } + listener = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener_id = listener[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test that no client certificate connects + response = requests.get( + 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT), + timeout=12, verify=False) + self.assertEqual(200, response.status_code) + + # Test that a revoked client certificate fails to connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.revoked_client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.revoked_client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*revoked.*", requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a valid client certificate can connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + response = requests.get( + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + self.assertEqual(200, response.status_code) + + @decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438') + def test_tls_multi_listener_client_auth(self): + """Test client authentication in a multi-listener LB. + + Validates that certificates and CRLs don't get cross configured + between multiple listeners on the same load balancer. + """ + if not self.mem_listener_client.is_version_supported( + self.api_version, '2.8'): + raise self.skipException('TLS client authentication ' + 'is only available on Octavia API ' + 'version 2.8 or newer.') + # Create the client2 authentication CA + client2_ca_cert, client2_ca_key = ( + cert_utils.generate_ca_cert_and_key()) + + client2_ca_cert_ref = self._store_secret( + self.barbican_mgr, + client2_ca_cert.public_bytes(serialization.Encoding.PEM)) + + # Create client2 cert and key + client2_cn = uuidutils.generate_uuid() + client2_cert, client2_key = ( + cert_utils.generate_client_cert_and_key( + client2_ca_cert, client2_ca_key, client2_cn)) + + # Create revoked client2 cert and key + revoked_client2_cn = uuidutils.generate_uuid() + revoked_client2_cert, revoked_client2_key = ( + cert_utils.generate_client_cert_and_key( + client2_ca_cert, client2_ca_key, revoked_client2_cn)) + + # Create certificate revocation list and revoke cert + client2_crl = cert_utils.generate_certificate_revocation_list( + client2_ca_cert, client2_ca_key, revoked_client2_cert) + + client2_crl_ref = self._store_secret( + self.barbican_mgr, + client2_crl.public_bytes(serialization.Encoding.PEM)) + + LISTENER1_TCP_PORT = '443' + listener_name = data_utils.rand_name( + "lb_member_listener1-multi-list-client-auth") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: LISTENER1_TCP_PORT, + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY, + const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref, + const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref, + } + listener = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener_id = listener[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + LISTENER2_TCP_PORT = '8443' + listener_name = data_utils.rand_name( + "lb_member_listener2-multi-list-client-auth") + listener_kwargs = { + const.NAME: listener_name, + const.PROTOCOL: const.TERMINATED_HTTPS, + const.PROTOCOL_PORT: LISTENER2_TCP_PORT, + const.LOADBALANCER_ID: self.lb_id, + const.DEFAULT_POOL_ID: self.pool_id, + const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, + const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY, + const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref, + const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref, + } + listener2 = self.mem_listener_client.create_listener(**listener_kwargs) + self.listener2_id = listener2[const.ID] + self.addCleanup( + self.mem_listener_client.cleanup_listener, + self.listener2_id, + lb_client=self.mem_lb_client, lb_id=self.lb_id) + + waiters.wait_for_status(self.mem_lb_client.show_loadbalancer, + self.lb_id, const.PROVISIONING_STATUS, + const.ACTIVE, + CONF.load_balancer.build_interval, + CONF.load_balancer.build_timeout) + + # Test that no client certificate fails to connect to listener1 + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*certificate required.*", + requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT), + timeout=12, verify=False) + + # Test that no client certificate fails to connect to listener2 + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*certificate required.*", + requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT), + timeout=12, verify=False) + + # Test that a revoked client certificate fails to connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.revoked_client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.revoked_client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*revoked.*", requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a revoked client2 certificate fails to connect + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(revoked_client2_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(revoked_client2_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*revoked.*", requests.get, + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER2_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a valid client certificate can connect to listener1 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + response = requests.get( + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + self.assertEqual(200, response.status_code) + + # Test that a valid client2 certificate can connect to listener2 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(client2_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(client2_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + response = requests.get( + 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER2_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + self.assertEqual(200, response.status_code) + + # Test that a valid client1 certificate can not connect to listener2 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*decrypt error.*", + requests.get, 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER2_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a valid client2 certificate can not connect to listener1 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(client2_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(client2_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*decrypt error.*", + requests.get, 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a revoked client1 certificate can not connect to listener2 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(self.revoked_client_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(self.revoked_client_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*decrypt error.*", + requests.get, 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER2_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) + + # Test that a revoked client2 certificate can not connect to listener1 + with tempfile.NamedTemporaryFile(buffering=0) as cert_file: + cert_file.write(revoked_client2_cert.public_bytes( + serialization.Encoding.PEM)) + with tempfile.NamedTemporaryFile(buffering=0) as key_file: + key_file.write(revoked_client2_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption())) + self.assertRaisesRegex( + requests.exceptions.SSLError, ".*decrypt error.*", + requests.get, 'https://{0}:{1}'.format(self.lb_vip_address, + LISTENER1_TCP_PORT), + timeout=12, verify=False, cert=(cert_file.name, + key_file.name)) diff --git a/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml new file mode 100644 index 00000000..3e44be4f --- /dev/null +++ b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adds scenario tests for listener client authentication.