Test new HSTS feature

Test HTTP Strict Transport Security with Octavia.

Partial-Bug: #2017972
Change-Id: Ie54714015e943fd1cb75ca95f8cf241fbc99268c
This commit is contained in:
Tom Weininger 2023-04-26 12:30:08 +02:00
parent 2ac5e8169f
commit d3eb97624a
6 changed files with 100 additions and 16 deletions

View File

@ -72,6 +72,9 @@ SNI_CONTAINER_REFS = 'sni_container_refs'
DEFAULT_POOL_ID = 'default_pool_id' DEFAULT_POOL_ID = 'default_pool_id'
L7_POLICIES = 'l7_policies' L7_POLICIES = 'l7_policies'
ALPN_PROTOCOLS = 'alpn_protocols' ALPN_PROTOCOLS = 'alpn_protocols'
HSTS_MAX_AGE = 'hsts_max_age'
HSTS_INCLUDE_SUBDOMAINS = 'hsts_include_subdomains'
HSTS_PRELOAD = 'hsts_preload'
LB_ALGORITHM = 'lb_algorithm' LB_ALGORITHM = 'lb_algorithm'
LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN' LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN'

View File

@ -41,7 +41,8 @@ class ListenerClient(base_client.BaseLBaaSClient):
sni_container_refs=Unset, client_authentication=Unset, sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset, client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset, client_crl_container_ref=Unset, allowed_cidrs=Unset,
alpn_protocols=Unset, alpn_protocols=Unset, hsts_max_age=Unset,
hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True): return_object_only=True):
"""Create a listener. """Create a listener.
@ -92,6 +93,12 @@ class ListenerClient(base_client.BaseLBaaSClient):
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs. :param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS :param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners. listeners.
:param hsts_include_subdomains: Defines whether the
`include_subdomains` directive is used for HSTS or not
:param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
and sets the `max_age` directive to given value
:param hsts_preload: Defines whether the `hsts_preload` directive
is used for HSTS or not
:param return_object_only: If True, the response returns the object :param return_object_only: If True, the response returns the object
inside the root tag. False returns the full inside the root tag. False returns the full
response from the API. response from the API.
@ -218,7 +225,8 @@ class ListenerClient(base_client.BaseLBaaSClient):
sni_container_refs=Unset, client_authentication=Unset, sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset, client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset, client_crl_container_ref=Unset, allowed_cidrs=Unset,
alpn_protocols=Unset, alpn_protocols=Unset, hsts_max_age=Unset,
hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True): return_object_only=True):
"""Update a listener. """Update a listener.
@ -267,6 +275,12 @@ class ListenerClient(base_client.BaseLBaaSClient):
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs. :param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS :param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners. listeners.
:param hsts_include_subdomains: Defines whether the
`include_subdomains` directive is used for HSTS or not
:param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
and sets the `max_age` directive to given value
:param hsts_preload: Defines whether the `hsts_preload` directive
is used for HSTS or not
:param return_object_only: If True, the response returns the object :param return_object_only: If True, the response returns the object
inside the root tag. False returns the full inside the root tag. False returns the full
response from the API. response from the API.

View File

@ -24,6 +24,7 @@ from tempest.lib import decorators
from tempest.lib import exceptions from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.services.load_balancer import v2
from octavia_tempest_plugin.tests import test_base from octavia_tempest_plugin.tests import test_base
from octavia_tempest_plugin.tests import waiters from octavia_tempest_plugin.tests import waiters
@ -35,6 +36,7 @@ LOG = logging.getLogger(__name__)
CONF.validation.run_validation, CONF.validation.run_validation,
'Active-Standby tests will not work without run_validation enabled.') 'Active-Standby tests will not work without run_validation enabled.')
class ActiveStandbyScenarioTest(test_base.LoadBalancerBaseTestWithCompute): class ActiveStandbyScenarioTest(test_base.LoadBalancerBaseTestWithCompute):
mem_listener_client: v2.ListenerClient
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):

View File

@ -308,6 +308,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
""" """
listener_name = data_utils.rand_name("lb_member_listener1-create") listener_name = data_utils.rand_name("lb_member_listener1-create")
listener_description = data_utils.arbitrary_string(size=255) listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.17') and protocol == const.TERMINATED_HTTPS
listener_kwargs = { listener_kwargs = {
const.NAME: listener_name, const.NAME: listener_name,
@ -362,9 +364,13 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
exceptions.BadRequest, exceptions.BadRequest,
self.mem_listener_client.create_listener, self.mem_listener_client.create_listener,
**listener_kwargs) **listener_kwargs)
listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs}) listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
if hsts_supported:
listener_kwargs[const.HSTS_PRELOAD] = True
listener_kwargs[const.HSTS_MAX_AGE] = 10000
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
# Test that a user without the loadbalancer role cannot # Test that a user without the loadbalancer role cannot
# create a listener. # create a listener.
expected_allowed = [] expected_allowed = []
@ -417,6 +423,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
equal_items.append(const.TIMEOUT_MEMBER_DATA) equal_items.append(const.TIMEOUT_MEMBER_DATA)
equal_items.append(const.TIMEOUT_TCP_INSPECT) equal_items.append(const.TIMEOUT_TCP_INSPECT)
if hsts_supported:
equal_items.append(const.HSTS_PRELOAD)
equal_items.append(const.HSTS_MAX_AGE)
equal_items.append(const.HSTS_INCLUDE_SUBDOMAINS)
for item in equal_items: for item in equal_items:
self.assertEqual(listener_kwargs[item], listener[item]) self.assertEqual(listener_kwargs[item], listener[item])
@ -937,6 +948,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
if self.mem_listener_client.is_version_supported( if self.mem_listener_client.is_version_supported(
self.api_version, '2.12'): self.api_version, '2.12'):
show_listener_response_fields.append('allowed_cidrs') show_listener_response_fields.append('allowed_cidrs')
if self.mem_listener_client.is_version_supported(
self.api_version, '2.17'):
show_listener_response_fields.append(const.HSTS_PRELOAD)
show_listener_response_fields.append(const.HSTS_MAX_AGE)
show_listener_response_fields.append(const.HSTS_INCLUDE_SUBDOMAINS)
for field in show_listener_response_fields: for field in show_listener_response_fields:
if field in (const.DEFAULT_POOL_ID, const.L7_POLICIES): if field in (const.DEFAULT_POOL_ID, const.L7_POLICIES):
continue continue
@ -1059,6 +1075,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
""" """
listener_name = data_utils.rand_name("lb_member_listener1-show") listener_name = data_utils.rand_name("lb_member_listener1-show")
listener_description = data_utils.arbitrary_string(size=255) listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.17') and protocol == const.TERMINATED_HTTPS
listener_kwargs = { listener_kwargs = {
const.NAME: listener_name, const.NAME: listener_name,
@ -1085,6 +1103,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
self.SNI2_secret_ref], self.SNI2_secret_ref],
}) })
if hsts_supported:
listener_kwargs[const.HSTS_PRELOAD] = True
listener_kwargs[const.HSTS_MAX_AGE] = 10000
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
if self.mem_listener_client.is_version_supported( if self.mem_listener_client.is_version_supported(
self.api_version, '2.1'): self.api_version, '2.1'):
listener_kwargs.update({ listener_kwargs.update({
@ -1175,6 +1198,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
self.api_version, '2.12'): self.api_version, '2.12'):
self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS]) self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
if hsts_supported:
self.assertTrue(listener[const.HSTS_PRELOAD])
self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
# Test that the appropriate users can see or not see the listener # Test that the appropriate users can see or not see the listener
# based on the API RBAC. # based on the API RBAC.
expected_allowed = [] expected_allowed = []
@ -1242,6 +1270,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
""" """
listener_name = data_utils.rand_name("lb_member_listener1-update") listener_name = data_utils.rand_name("lb_member_listener1-update")
listener_description = data_utils.arbitrary_string(size=255) listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.17') and protocol == const.TERMINATED_HTTPS
listener_kwargs = { listener_kwargs = {
const.NAME: listener_name, const.NAME: listener_name,
@ -1419,6 +1449,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
new_cidrs = ['2001:db8::/64'] new_cidrs = ['2001:db8::/64']
listener_update_kwargs.update({const.ALLOWED_CIDRS: new_cidrs}) listener_update_kwargs.update({const.ALLOWED_CIDRS: new_cidrs})
if hsts_supported:
listener_update_kwargs[const.HSTS_PRELOAD] = True
listener_update_kwargs[const.HSTS_MAX_AGE] = 10000
listener_update_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
listener = self.mem_listener_client.update_listener( listener = self.mem_listener_client.update_listener(
listener[const.ID], **listener_update_kwargs) listener[const.ID], **listener_update_kwargs)
@ -1484,6 +1519,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
expected_cidrs = ['2001:db8::/64'] expected_cidrs = ['2001:db8::/64']
self.assertEqual(expected_cidrs, listener[const.ALLOWED_CIDRS]) self.assertEqual(expected_cidrs, listener[const.ALLOWED_CIDRS])
if hsts_supported:
self.assertTrue(listener[const.HSTS_PRELOAD])
self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
@decorators.idempotent_id('16f11c82-f069-4592-8954-81b35a98e3b7') @decorators.idempotent_id('16f11c82-f069-4592-8954-81b35a98e3b7')
def test_http_listener_delete(self): def test_http_listener_delete(self):
self._test_listener_delete(const.HTTP, 8070) self._test_listener_delete(const.HTTP, 8070)

View File

@ -1205,7 +1205,8 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
self.assertEqual(expected_proto, selected_proto) self.assertEqual(expected_proto, selected_proto)
def _test_http_versions_tls_traffic(self, http_version, alpn_protos): def _test_http_versions_tls_traffic(self, http_version, alpn_protos,
hsts: bool = False):
if not self.mem_listener_client.is_version_supported( if not self.mem_listener_client.is_version_supported(
self.api_version, '2.20'): self.api_version, '2.20'):
raise self.skipException('ALPN protocols are only available on ' raise self.skipException('ALPN protocols are only available on '
@ -1220,6 +1221,12 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref, const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.ALPN_PROTOCOLS: alpn_protos, const.ALPN_PROTOCOLS: alpn_protos,
} }
if self.mem_listener_client.is_version_supported(
self.api_version, '2.27'):
listener_kwargs[const.HSTS_MAX_AGE] = 100 if hsts else None
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = hsts
listener_kwargs[const.HSTS_PRELOAD] = hsts
listener = self.mem_listener_client.create_listener(**listener_kwargs) listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID] self.listener_id = listener[const.ID]
self.addCleanup( self.addCleanup(
@ -1241,6 +1248,12 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
client = httpx.Client(http2=(http_version == 'HTTP/2'), verify=context) client = httpx.Client(http2=(http_version == 'HTTP/2'), verify=context)
r = client.get(url) r = client.get(url)
self.assertEqual(http_version, r.http_version) self.assertEqual(http_version, r.http_version)
if hsts:
self.assertIn('strict-transport-security', r.headers)
self.assertEqual('max-age=100; includeSubDomains; preload;',
r.headers['strict-transport-security'])
else:
self.assertNotIn('strict-transport-security', r.headers)
@decorators.idempotent_id('9965828d-24af-4fa0-91ae-21c6bc47ab4c') @decorators.idempotent_id('9965828d-24af-4fa0-91ae-21c6bc47ab4c')
def test_http_2_tls_traffic(self): def test_http_2_tls_traffic(self):
@ -1251,6 +1264,15 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
self._test_http_versions_tls_traffic( self._test_http_versions_tls_traffic(
'HTTP/1.1', ['http/1.1', 'http/1.0']) 'HTTP/1.1', ['http/1.1', 'http/1.0'])
@decorators.idempotent_id('7436c6b7-44be-4544-a40b-31d2b7b2ad0b')
def test_http_1_1_tls_hsts_traffic(self):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.27'):
raise self.skipException('HSTS is only available on '
'Octavia API version 2.27 or newer.')
self._test_http_versions_tls_traffic(
'HTTP/1.1', ['http/1.1', 'http/1.0'], hsts=True)
@decorators.idempotent_id('ee0faf71-d11e-4323-8673-e5e15779749b') @decorators.idempotent_id('ee0faf71-d11e-4323-8673-e5e15779749b')
def test_pool_reencryption(self): def test_pool_reencryption(self):
if not self.mem_listener_client.is_version_supported( if not self.mem_listener_client.is_version_supported(

View File

@ -34,6 +34,7 @@ import tenacity
from octavia_tempest_plugin.common import cert_utils from octavia_tempest_plugin.common import cert_utils
from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.common import constants as const
import octavia_tempest_plugin.services.load_balancer.v2 as lbv2
from octavia_tempest_plugin.tests import RBAC_tests from octavia_tempest_plugin.tests import RBAC_tests
from octavia_tempest_plugin.tests import validators from octavia_tempest_plugin.tests import validators
from octavia_tempest_plugin.tests import waiters from octavia_tempest_plugin.tests import waiters
@ -182,27 +183,29 @@ class LoadBalancerBaseTest(validators.ValidatorsMixin,
cls.os_roles_lb_member.security_group_rules_client) cls.os_roles_lb_member.security_group_rules_client)
cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
cls.mem_lb_client = ( cls.mem_lb_client: lbv2.LoadbalancerClient = (
cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient()) cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
cls.mem_listener_client = ( cls.mem_listener_client: lbv2.ListenerClient = (
cls.os_roles_lb_member.load_balancer_v2.ListenerClient()) cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
cls.mem_pool_client = ( cls.mem_pool_client: lbv2.PoolClient = (
cls.os_roles_lb_member.load_balancer_v2.PoolClient()) cls.os_roles_lb_member.load_balancer_v2.PoolClient())
cls.mem_member_client = ( cls.mem_member_client: lbv2.MemberClient = (
cls.os_roles_lb_member.load_balancer_v2.MemberClient()) cls.os_roles_lb_member.load_balancer_v2.MemberClient())
cls.mem_healthmonitor_client = ( cls.mem_healthmonitor_client: lbv2.HealthMonitorClient = (
cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient()) cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
cls.mem_l7policy_client = ( cls.mem_l7policy_client: lbv2.L7PolicyClient = (
cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient()) cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
cls.mem_l7rule_client = ( cls.mem_l7rule_client: lbv2.L7RuleClient = (
cls.os_roles_lb_member.load_balancer_v2.L7RuleClient()) cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient() cls.lb_admin_amphora_client: lbv2.AmphoraClient = (
cls.lb_admin_flavor_profile_client = ( lb_admin_prefix.AmphoraClient())
cls.lb_admin_flavor_profile_client: lbv2.FlavorProfileClient = (
lb_admin_prefix.FlavorProfileClient()) lb_admin_prefix.FlavorProfileClient())
cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient() cls.lb_admin_flavor_client: lbv2.FlavorClient = (
cls.mem_flavor_client = ( lb_admin_prefix.FlavorClient())
cls.mem_flavor_client: lbv2.FlavorClient = (
cls.os_roles_lb_member.load_balancer_v2.FlavorClient()) cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
cls.mem_provider_client = ( cls.mem_provider_client: lbv2.ProviderClient = (
cls.os_roles_lb_member.load_balancer_v2.ProviderClient()) cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
cls.os_admin_servers_client = cls.os_admin.servers_client cls.os_admin_servers_client = cls.os_admin.servers_client
cls.os_admin_routers_client = cls.os_admin.routers_client cls.os_admin_routers_client = cls.os_admin.routers_client