Test new HSTS feature

Test HTTP Strict Transport Security with Octavia.

Partial-Bug: #2017972
Change-Id: Ie54714015e943fd1cb75ca95f8cf241fbc99268c
This commit is contained in:
Tom Weininger 2023-04-26 12:30:08 +02:00
parent ceac0f54dc
commit 4678f68e18
6 changed files with 100 additions and 16 deletions

View File

@ -72,6 +72,9 @@ SNI_CONTAINER_REFS = 'sni_container_refs'
DEFAULT_POOL_ID = 'default_pool_id'
L7_POLICIES = 'l7_policies'
ALPN_PROTOCOLS = 'alpn_protocols'
HSTS_MAX_AGE = 'hsts_max_age'
HSTS_INCLUDE_SUBDOMAINS = 'hsts_include_subdomains'
HSTS_PRELOAD = 'hsts_preload'
LB_ALGORITHM = 'lb_algorithm'
LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN'

View File

@ -41,7 +41,8 @@ class ListenerClient(base_client.BaseLBaaSClient):
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset,
alpn_protocols=Unset,
alpn_protocols=Unset, hsts_max_age=Unset,
hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True):
"""Create a listener.
@ -92,6 +93,12 @@ class ListenerClient(base_client.BaseLBaaSClient):
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners.
:param hsts_include_subdomains: Defines whether the
`include_subdomains` directive is used for HSTS or not
:param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
and sets the `max_age` directive to given value
:param hsts_preload: Defines whether the `hsts_preload` directive
is used for HSTS or not
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
@ -218,7 +225,8 @@ class ListenerClient(base_client.BaseLBaaSClient):
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset,
alpn_protocols=Unset,
alpn_protocols=Unset, hsts_max_age=Unset,
hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True):
"""Update a listener.
@ -267,6 +275,12 @@ class ListenerClient(base_client.BaseLBaaSClient):
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners.
:param hsts_include_subdomains: Defines whether the
`include_subdomains` directive is used for HSTS or not
:param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
and sets the `max_age` directive to given value
:param hsts_preload: Defines whether the `hsts_preload` directive
is used for HSTS or not
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.

View File

@ -24,6 +24,7 @@ from tempest.lib import decorators
from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.services.load_balancer import v2
from octavia_tempest_plugin.tests import test_base
from octavia_tempest_plugin.tests import waiters
@ -35,6 +36,7 @@ LOG = logging.getLogger(__name__)
CONF.validation.run_validation,
'Active-Standby tests will not work without run_validation enabled.')
class ActiveStandbyScenarioTest(test_base.LoadBalancerBaseTestWithCompute):
mem_listener_client: v2.ListenerClient
@classmethod
def resource_setup(cls):

View File

@ -297,6 +297,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
listener_name = data_utils.rand_name("lb_member_listener1-create")
listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@ -351,9 +353,13 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
exceptions.BadRequest,
self.mem_listener_client.create_listener,
**listener_kwargs)
listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
if hsts_supported:
listener_kwargs[const.HSTS_PRELOAD] = True
listener_kwargs[const.HSTS_MAX_AGE] = 10000
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
# Test that a user without the loadbalancer role cannot
# create a listener.
expected_allowed = []
@ -406,6 +412,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
equal_items.append(const.TIMEOUT_MEMBER_DATA)
equal_items.append(const.TIMEOUT_TCP_INSPECT)
if hsts_supported:
equal_items.append(const.HSTS_PRELOAD)
equal_items.append(const.HSTS_MAX_AGE)
equal_items.append(const.HSTS_INCLUDE_SUBDOMAINS)
for item in equal_items:
self.assertEqual(listener_kwargs[item], listener[item])
@ -1005,6 +1016,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
if self.mem_listener_client.is_version_supported(
self.api_version, '2.12'):
show_listener_response_fields.append('allowed_cidrs')
if self.mem_listener_client.is_version_supported(
self.api_version, '2.27'):
show_listener_response_fields.append(const.HSTS_PRELOAD)
show_listener_response_fields.append(const.HSTS_MAX_AGE)
show_listener_response_fields.append(const.HSTS_INCLUDE_SUBDOMAINS)
for field in show_listener_response_fields:
if field in (const.DEFAULT_POOL_ID, const.L7_POLICIES):
continue
@ -1137,6 +1153,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
listener_name = data_utils.rand_name("lb_member_listener1-show")
listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@ -1163,6 +1181,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
self.SNI2_secret_ref],
})
if hsts_supported:
listener_kwargs[const.HSTS_PRELOAD] = True
listener_kwargs[const.HSTS_MAX_AGE] = 10000
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
if self.mem_listener_client.is_version_supported(
self.api_version, '2.1'):
listener_kwargs.update({
@ -1253,6 +1276,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
self.api_version, '2.12'):
self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
if hsts_supported:
self.assertTrue(listener[const.HSTS_PRELOAD])
self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
# Test that the appropriate users can see or not see the listener
# based on the API RBAC.
expected_allowed = []
@ -1330,6 +1358,8 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
listener_name = data_utils.rand_name("lb_member_listener1-update")
listener_description = data_utils.arbitrary_string(size=255)
hsts_supported = self.mem_listener_client.is_version_supported(
self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@ -1507,6 +1537,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
new_cidrs = ['2001:db8::/64']
listener_update_kwargs.update({const.ALLOWED_CIDRS: new_cidrs})
if hsts_supported:
listener_update_kwargs[const.HSTS_PRELOAD] = True
listener_update_kwargs[const.HSTS_MAX_AGE] = 10000
listener_update_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
listener = self.mem_listener_client.update_listener(
listener[const.ID], **listener_update_kwargs)
@ -1572,6 +1607,11 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest):
expected_cidrs = ['2001:db8::/64']
self.assertEqual(expected_cidrs, listener[const.ALLOWED_CIDRS])
if hsts_supported:
self.assertTrue(listener[const.HSTS_PRELOAD])
self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
@decorators.idempotent_id('16f11c82-f069-4592-8954-81b35a98e3b7')
def test_http_listener_delete(self):
self._test_listener_delete(const.HTTP, 8070)

View File

@ -1204,7 +1204,8 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
self.assertEqual(expected_proto, selected_proto)
def _test_http_versions_tls_traffic(self, http_version, alpn_protos):
def _test_http_versions_tls_traffic(self, http_version, alpn_protos,
hsts: bool = False):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.20'):
raise self.skipException('ALPN protocols are only available on '
@ -1219,6 +1220,12 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.ALPN_PROTOCOLS: alpn_protos,
}
if self.mem_listener_client.is_version_supported(
self.api_version, '2.27'):
listener_kwargs[const.HSTS_MAX_AGE] = 100 if hsts else None
listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = hsts
listener_kwargs[const.HSTS_PRELOAD] = hsts
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
@ -1240,6 +1247,12 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
client = httpx.Client(http2=(http_version == 'HTTP/2'), verify=context)
r = client.get(url)
self.assertEqual(http_version, r.http_version)
if hsts:
self.assertIn('strict-transport-security', r.headers)
self.assertEqual('max-age=100; includeSubDomains; preload;',
r.headers['strict-transport-security'])
else:
self.assertNotIn('strict-transport-security', r.headers)
@decorators.idempotent_id('9965828d-24af-4fa0-91ae-21c6bc47ab4c')
def test_http_2_tls_traffic(self):
@ -1250,6 +1263,15 @@ class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
self._test_http_versions_tls_traffic(
'HTTP/1.1', ['http/1.1', 'http/1.0'])
@decorators.idempotent_id('7436c6b7-44be-4544-a40b-31d2b7b2ad0b')
def test_http_1_1_tls_hsts_traffic(self):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.27'):
raise self.skipException('HSTS is only available on '
'Octavia API version 2.27 or newer.')
self._test_http_versions_tls_traffic(
'HTTP/1.1', ['http/1.1', 'http/1.0'], hsts=True)
@decorators.idempotent_id('ee0faf71-d11e-4323-8673-e5e15779749b')
def test_pool_reencryption(self):
if not self.mem_listener_client.is_version_supported(

View File

@ -34,6 +34,7 @@ import tenacity
from octavia_tempest_plugin.common import cert_utils
from octavia_tempest_plugin.common import constants as const
import octavia_tempest_plugin.services.load_balancer.v2 as lbv2
from octavia_tempest_plugin.tests import RBAC_tests
from octavia_tempest_plugin.tests import validators
from octavia_tempest_plugin.tests import waiters
@ -182,27 +183,29 @@ class LoadBalancerBaseTest(validators.ValidatorsMixin,
cls.os_roles_lb_member.security_group_rules_client)
cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
cls.mem_lb_client = (
cls.mem_lb_client: lbv2.LoadbalancerClient = (
cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
cls.mem_listener_client = (
cls.mem_listener_client: lbv2.ListenerClient = (
cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
cls.mem_pool_client = (
cls.mem_pool_client: lbv2.PoolClient = (
cls.os_roles_lb_member.load_balancer_v2.PoolClient())
cls.mem_member_client = (
cls.mem_member_client: lbv2.MemberClient = (
cls.os_roles_lb_member.load_balancer_v2.MemberClient())
cls.mem_healthmonitor_client = (
cls.mem_healthmonitor_client: lbv2.HealthMonitorClient = (
cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
cls.mem_l7policy_client = (
cls.mem_l7policy_client: lbv2.L7PolicyClient = (
cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
cls.mem_l7rule_client = (
cls.mem_l7rule_client: lbv2.L7RuleClient = (
cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient()
cls.lb_admin_flavor_profile_client = (
cls.lb_admin_amphora_client: lbv2.AmphoraClient = (
lb_admin_prefix.AmphoraClient())
cls.lb_admin_flavor_profile_client: lbv2.FlavorProfileClient = (
lb_admin_prefix.FlavorProfileClient())
cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient()
cls.mem_flavor_client = (
cls.lb_admin_flavor_client: lbv2.FlavorClient = (
lb_admin_prefix.FlavorClient())
cls.mem_flavor_client: lbv2.FlavorClient = (
cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
cls.mem_provider_client = (
cls.mem_provider_client: lbv2.ProviderClient = (
cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
cls.os_admin_servers_client = cls.os_admin.servers_client
cls.os_admin_routers_client = cls.os_admin.routers_client