Merge "Add HTTPS and HTTP ingress routing tests with cert validation"

This commit is contained in:
Zuul 2025-05-09 17:24:01 +00:00 committed by Gerrit Code Review
commit 6ab3d01595
12 changed files with 589 additions and 31 deletions

View File

@ -1,4 +1,10 @@
{
// dns name for the lab
"dns_name": "lab_dns_name"
}
"dns_name": "lab_dns_name",
// ACME server url
"stepca_server_url": "external_acme_server_url",
// ACME server issuer
"stepca_server_issuer": "external_acme_server_issuer"
}

View File

@ -16,6 +16,8 @@ class SecurityConfig:
security_dict = json5.load(json_data)
self.dns_name = security_dict["dns_name"]
self.stepca_server_url = security_dict["stepca_server_url"]
self.stepca_server_issuer = security_dict["stepca_server_issuer"]
def get_dns_name(self) -> str:
"""
@ -25,3 +27,21 @@ class SecurityConfig:
str: the dns name
"""
return self.dns_name
def get_stepca_server_url(self) -> str:
"""
Getter for the stepca server url
Returns:
str: stepca server url
"""
return self.stepca_server_url
def get_stepca_server_issuer(self) -> str:
"""
Getter for the stepca server issuer
Returns:
str: stepca server url
"""
return self.stepca_server_issuer

View File

@ -1,7 +1,7 @@
import os
import yaml
from jinja2 import Environment, FileSystemLoader, Template
from jinja2 import Template
from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
@ -18,25 +18,29 @@ class YamlKeywords(BaseKeyword):
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor
Args:
ssh_connection:
ssh_connection (SSHConnection): The SSH connection object.
"""
self.ssh_connection = ssh_connection
def generate_yaml_file_from_template(self, template_file: str, replacement_dictionary: str, target_file_name: str, target_remote_location: str, copy_to_remote: bool = True) -> str:
def generate_yaml_file_from_template(self, template_file: str, replacement_dictionary: dict, target_file_name: str, target_remote_location: str, copy_to_remote: bool = True) -> str:
"""
This function will generate a YAML file from the specified template. The parameters in the file will get substituted by
using the key-value pairs from the replacement_dictionary. A copy of the file will be stored in the logs folder as 'target_file_name'.
It will then be SCPed over to 'target_remote_location' on the machine to which this SSH connection is connected
This function will generate a YAML file from the specified template.
The parameters in the file will get substituted by using the key-value pairs from the replacement_dictionary. A copy of the file will be stored in the logs folder as 'target_file_name'.
It will then be SCPed over to 'target_remote_location' on the machine to which this SSH connection is connected.
Args:
template_file (str): Path to the template YAML file.
Example: 'resources/cloud_platform/folder/file_name'.
Example: 'resources/cloud_platform/folder/file_name'.
replacement_dictionary (dict): Dictionary containing placeholder keys and their replacement values.
Example: { 'pod_name': 'awesome_pod_name', 'memory': '2Gb' }.
Example: { 'pod_name': 'awesome_pod_name', 'memory': '2Gb' }.
target_file_name (str): Name of the generated YAML file.
target_remote_location (str): Remote directory path where the file will be uploaded if `copy_to_remote` is True.
copy_to_remote (bool, optional): Flag indicating whether to upload the file to a remote location. Defaults to True.
copy_to_remote (bool): Flag indicating whether to upload the file to a remote location. Defaults to True.
Returns:
str: Path to the generated YAML file, either local or remote depending on `copy_to_remote`.
@ -48,8 +52,8 @@ class YamlKeywords(BaseKeyword):
# Render the YAML file by replacing the tokens.
template = Template(yaml_template)
rendered_yaml_string = template.render(replacement_dictionary)
yaml_data = yaml.safe_load(rendered_yaml_string)
rendered_yaml = yaml.dump(yaml_data)
yaml_data_list = list(yaml.safe_load_all(rendered_yaml_string))
rendered_yaml = "---\n".join([yaml.dump(data, default_flow_style=False) for data in yaml_data_list])
# Create the new file in the log folder.
log_folder = ConfigurationManager.get_logger_config().get_test_case_resources_log_location()

View File

@ -55,6 +55,6 @@ class KubectlGetCertStatusKeywords(BaseKeyword):
def get_cert_status():
cert_status = self.get_certificates(namespace).get_cert(certs_name).get_ready()
return bool(cert_status)
return cert_status == "True"
validate_equals_with_retry(get_cert_status, is_ready, "Verify the certs status issued", timeout=600)

View File

@ -1,7 +1,11 @@
import json
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.secret.object.kubectl_get_secret_output import KubectlGetSecretOutput
from keywords.k8s.secret.object.kubectl_secret_object import KubectlSecretObject
class KubectlGetSecretsKeywords(BaseKeyword):
"""
@ -27,7 +31,7 @@ class KubectlGetSecretsKeywords(BaseKeyword):
output = self.ssh_connection.send(export_k8s_config(cmd))
self.validate_success_return_code(self.ssh_connection)
return KubectlGetSecretOutput(output)
def get_secret_names(self, namespace: str = "default") -> list[str]:
"""
Returns a list of secret names in the given namespace.
@ -39,6 +43,41 @@ class KubectlGetSecretsKeywords(BaseKeyword):
secrets_output = self.get_secrets(namespace)
return [secret.get_name() for secret in secrets_output.kubectl_secret]
def get_secret_json_output(self, secret_name: str, namespace: str = "default") -> KubectlSecretObject | None:
"""
Get a secret as a structured object.
Args:
secret_name (str): The name of the Kubernetes secret.
namespace (str): The namespace where the secret resides.
Returns:
KubectlSecretObject | None: The parsed secret object, or None if not found.
"""
command = f"kubectl get secret {secret_name} -n {namespace} -o json"
output = self.ssh_connection.send(export_k8s_config(command))
self.validate_success_return_code(self.ssh_connection)
if isinstance(output, list):
output = "".join(output)
json_obj = json.loads(output)
secret_obj = KubectlSecretObject(secret_name)
secret_obj.load_json(json_obj)
return secret_obj
def get_certificate_issuer(self, secret_name: str, namespace: str = "default") -> str | None:
"""
Extract the certificate issuer from a TLS secret.
Args:
secret_name (str): The name of the TLS secret.
namespace (str): The namespace containing the secret.
Returns:
str | None: The certificate issuer string if found, otherwise None.
"""
secret_output = self.get_secret_json_output(secret_name, namespace)
return secret_output.get_certificate_issuer()
def get_secret_with_custom_output(self, secret_name: str, namespace: str, output_format: str, extra_parameters: str = None, base64: bool = False) -> str:
"""
Get a Kubernetes secret with a custom output format and optional extra parameters.

View File

@ -3,9 +3,10 @@ from keywords.k8s.secret.object.kubectl_secret_object import KubectlSecretObject
class KubectlGetSecretOutput:
"""Parses and stores the output of `kubectl get secret` commands."""
def __init__(self, kubectl_get_secrets_output: str):
"""_summary_
"""Represents parsed output from `kubectl get secret` command
Args:
kubectl_get_secrets_output (str): Raw string output from running "kubectl get secrets" command
@ -15,18 +16,26 @@ class KubectlGetSecretOutput:
output_values_list = kubectl_get_secrets_table_parser.get_output_values_list()
for secret_dict in output_values_list:
if 'NAME' not in secret_dict:
if "NAME" not in secret_dict:
raise ValueError(f"There is no NAME associated with the secret: {secret_dict}")
secret = KubectlSecretObject(secret_dict['NAME'])
secret = KubectlSecretObject(secret_dict["NAME"])
if 'TYPE' in secret_dict:
secret.set_type(secret_dict['TYPE'])
if "TYPE" in secret_dict:
secret.set_type(secret_dict["TYPE"])
if 'DATA' in secret_dict:
secret.set_data(secret_dict['DATA'])
if "DATA" in secret_dict:
secret.set_data(secret_dict["DATA"])
if 'AGE' in secret_dict:
secret.set_age(secret_dict['AGE'])
if "AGE" in secret_dict:
secret.set_age(secret_dict["AGE"])
self.kubectl_secret.append(secret)
def get_secrets(self) -> list[KubectlSecretObject]:
"""Return parsed secret objects.
Returns:
list[KubectlSecretObject]: List of parsed secret objects.
"""
return self.kubectl_secret

View File

@ -1,18 +1,27 @@
import base64
from typing import Optional
from cryptography import x509
from cryptography.hazmat.backends import default_backend
class KubectlSecretObject:
"""
Class to hold attributes of a 'kubectl get secrets' command entry
"""
def __init__(self, name: str):
"""
Constructor
"""Initialize the secret object with name.
Args:
name (str): secret name
name (str): Name of the secret.
"""
self.name = name
self.type = None
self.data = None
self.age = None
self._metadata = {}
self._raw_json = {}
def get_name(self) -> str:
"""
@ -68,7 +77,82 @@ class KubectlSecretObject:
def set_age(self, age: str) -> None:
"""
Setter for AGE entry
Args:
age (str): secret age
"""
self.age = age
def load_json(self, secret_json: dict) -> None:
"""Load and parse secret JSON data into object attributes.
Args:
secret_json (dict): JSON dictionary containing secret metadata and data.
"""
self._raw_json = secret_json
self._metadata = secret_json.get("metadata", {})
self.data = secret_json.get("data", {})
self.type = secret_json.get("type", None)
self.name = self._metadata.get("name", self.name)
def get_metadata(self) -> dict:
"""Return metadata portion of the secret JSON.
Returns:
dict: Metadata dictionary.
"""
return self._metadata
def get_namespace(self) -> Optional[str]:
"""Return the namespace of the secret.
Returns:
Optional[str]: The namespace if available, otherwise None.
"""
return self._metadata.get("namespace")
def get_raw_json(self) -> dict:
"""Return the full raw JSON dictionary for the secret.
Returns:
dict: Complete raw JSON.
"""
return self._raw_json
def get_tls_crt(self) -> Optional[str]:
"""Return decoded TLS certificate content.
Returns:
Optional[str]: Base64-decoded certificate string or None if missing.
"""
return self.data.get("tls.crt") if isinstance(self.data, dict) else None
def get_decoded_data(self, key: str) -> Optional[str]:
"""Return the decoded string of a specific key in the secret data.
Args:
key (str): The key to decode from the secret.
Returns:
Optional[str]: The decoded value, or None if key is missing.
"""
if not isinstance(self.data, dict):
return None
value = self.data.get(key)
if value:
try:
return base64.b64decode(value).decode("utf-8")
except Exception:
pass
return None
def get_certificate_issuer(self) -> str | None:
"""
Retrieves the Issuer information from the 'tls.crt' data of the parsed secret.
"""
encoded_cert = self.get_tls_crt()
if not encoded_cert:
return None
decoded_cert = base64.b64decode(encoded_cert)
cert = x509.load_pem_x509_certificate(decoded_cert, default_backend())
return cert.issuer.rfc4514_string()

View File

@ -3,6 +3,11 @@ from keywords.base_keyword import BaseKeyword
class OpenSSLKeywords(BaseKeyword):
"""Keyword library for OpenSSL operations such as certificate inspection and decoding.
This class provides utility methods for interacting with OpenSSL in the context of
Kubernetes TLS certificate validation.
"""
def __init__(self, ssh_connection: SSHConnection):
self.ssh_connection = ssh_connection
@ -25,3 +30,16 @@ class OpenSSLKeywords(BaseKeyword):
args += f'-subj "/CN={sys_domain_name}"'
self.ssh_connection.send(f"openssl req -x509 -nodes -days 365 -newkey rsa:2048 {args}")
self.validate_success_return_code(self.ssh_connection)
def create_ingress_certificate(self, key: str, crt: str, host: str) -> None:
"""
Creates an SSL certificate file suitable for Kubernetes Ingress TLS secrets, including Subject Alternative Name.
Args:
key (str): The path to the key file.
crt (str): The path to the certificate file.
host (str): The hostname that the certificate should be valid for (will be used in SAN).
"""
command = f"openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout {key} -out {crt} -subj '/CN={host}' -addext 'subjectAltName = DNS:{host}'"
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)

View File

@ -0,0 +1,72 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: pvtest
---
apiVersion: v1
kind: Secret
metadata:
name: pvtestkey
namespace: pvtest
type: kubernetes.io/dockerconfigjson
data:
.dockerconfigjson: eyJhdXRocyI6eyJyZWdpc3RyeS5sb2NhbDo5MDAxIjp7InVzZXJuYW1lIjoiYWRtaW4iLCJwYXNzd29yZCI6IkxpNjludXgqMTIzNCIsImF1dGgiOiJZV1J0YVc0NlRHazJPVzUxZUNveE1qTTAifX19
---
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: stepca-issuer
namespace: pvtest
spec:
acme:
server: '{{ stepca_server_url }}'
skipTLSVerify: true
privateKeySecretRef:
name: stepca-issuer
solvers:
- http01:
ingress:
podTemplate:
spec:
imagePullSecrets:
- name: pvtestkey
class: nginx
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kuard
namespace: pvtest
spec:
replicas: 1
selector:
matchLabels:
app: kuard
template:
metadata:
labels:
app: kuard
spec:
containers:
- name: kuard
image: gcr.io/kuar-demo/kuard-amd64:blue
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: kuard
namespace: pvtest
labels:
app: kuard
spec:
ports:
- port: 80
targetPort: 8080
protocol: TCP
selector:
app: kuard

View File

@ -0,0 +1,82 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: pvtest
---
apiVersion: v1
kind: Pod
metadata:
name: apple-app
namespace: pvtest
labels:
app: apple
spec:
containers:
- name: apple-app
image: hashicorp/http-echo
args:
- "-text=apple"
---
apiVersion: v1
kind: Service
metadata:
name: apple-service
namespace: pvtest
spec:
selector:
app: apple
ports:
- port: 5678
---
apiVersion: v1
kind: Pod
metadata:
name: banana-app
namespace: pvtest
labels:
app: banana
spec:
containers:
- name: banana-app
image: hashicorp/http-echo
args:
- "-text=banana"
---
apiVersion: v1
kind: Service
metadata:
name: banana-service
namespace: pvtest
spec:
selector:
app: banana
ports:
- port: 5678
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: example-ingress
namespace: pvtest
annotations:
ingress.kubernetes.io/rewrite-target: /
spec:
ingressClassName: nginx
rules:
- http:
paths:
- backend:
service:
name: apple-service
port:
number: 5678
path: /apple
pathType: ImplementationSpecific
- backend:
service:
name: banana-service
port:
number: 5678
path: /banana
pathType: ImplementationSpecific

View File

@ -0,0 +1,87 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: pvtest
---
kind: Pod
apiVersion: v1
metadata:
name: apple-app
namespace: pvtest
labels:
app: apple
spec:
containers:
- name: apple-app
image: hashicorp/http-echo
args:
- "-text=apple"
---
kind: Service
apiVersion: v1
metadata:
name: apple-service
namespace: pvtest
spec:
selector:
app: apple
ports:
- port: 5678
---
kind: Pod
apiVersion: v1
metadata:
name: banana-app
namespace: pvtest
labels:
app: banana
spec:
containers:
- name: banana-app
image: hashicorp/http-echo
args:
- "-text=banana"
---
kind: Service
apiVersion: v1
metadata:
name: banana-service
namespace: pvtest
spec:
selector:
app: banana
ports:
- port: 5678
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: example-ingress
namespace: pvtest
annotations:
ingress.kubernetes.io/rewrite-target: /
spec:
ingressClassName: nginx
tls:
- hosts:
- konoha.rei
secretName: konoha-secret
rules:
- host: konoha.rei
http:
paths:
- backend:
service:
name: apple-service
port:
number: 5678
path: /apple
pathType: ImplementationSpecific
- backend:
service:
name: banana-service
port:
number: 5678
path: /banana
pathType: ImplementationSpecific

View File

@ -9,13 +9,19 @@ from keywords.files.file_keywords import FileKeywords
from keywords.files.yaml_keywords import YamlKeywords
from keywords.k8s.certificate.kubectl_get_certificate_keywords import KubectlGetCertStatusKeywords
from keywords.k8s.certificate.kubectl_get_issuer_keywords import KubectlGetCertIssuerKeywords
from keywords.k8s.files.kubectl_file_delete_keywords import KubectlFileDeleteKeywords
from keywords.k8s.namespace.kubectl_create_namespace_keywords import KubectlCreateNamespacesKeywords
from keywords.k8s.namespace.kubectl_delete_namespace_keywords import KubectlDeleteNamespaceKeywords
from keywords.k8s.pods.kubectl_apply_pods_keywords import KubectlApplyPodsKeywords
from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords
from keywords.k8s.secret.kubectl_create_secret_keywords import KubectlCreateSecretsKeywords
from keywords.k8s.secret.kubectl_get_secret_keywords import KubectlGetSecretsKeywords
from keywords.network.ip_address_keywords import IPAddressKeywords
from keywords.openssl.openssl_keywords import OpenSSLKeywords
@mark.p0
def test_app_using_nginx_controller():
def test_app_using_nginx_controller(request):
"""
This test is to deploy an application which uses Nginx Ingress controller using a
certificate signed by External CA(acme stepCA)
@ -34,6 +40,7 @@ def test_app_using_nginx_controller():
dns_name = ConfigurationManager.get_security_config().get_dns_name()
dns_resolution_status = IPAddressKeywords(oam_ip).check_dnsname_resolution(dns_name=dns_name)
validate_equals(dns_resolution_status, True, "Verify the dns name resolution")
stepca_url = ConfigurationManager.get_security_config().get_stepca_server_url()
stepca_issuer = "stepca-issuer"
pod_name = "kuard"
cert = "kuard-ingress-tls"
@ -42,12 +49,22 @@ def test_app_using_nginx_controller():
global_policy_file_name = "global_policy.yaml"
kuard_file_name = "kuard.yaml"
namespace = "pvtest"
tls_secret_name = "kuard-ingress-tls"
file_keywords = FileKeywords(ssh_connection)
file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{deploy_app_file_name}"), f"/home/sysadmin/{deploy_app_file_name}", overwrite=False)
file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{global_policy_file_name}"), f"/home/sysadmin/{global_policy_file_name}", overwrite=False)
secret_json_keywords = KubectlGetSecretsKeywords(ssh_connection)
# Upload and apply global policy
global_policy_remote_path = f"/home/sysadmin/{global_policy_file_name}"
file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{global_policy_file_name}"), global_policy_remote_path, overwrite=False)
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{global_policy_file_name}")
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{deploy_app_file_name}")
# Upload and render deploy app file
template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{deploy_app_file_name}")
replacement_dictionary = {"stepca_server_url": stepca_url}
deploy_app_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, deploy_app_file_name, "/home/sysadmin")
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(deploy_app_yaml)
# Check the issuer status
KubectlGetCertIssuerKeywords(ssh_connection).wait_for_issuer_status(stepca_issuer, True, namespace)
# Check the ingress pod status
@ -66,3 +83,123 @@ def test_app_using_nginx_controller():
# Check the app url
response = CloudRestClient().get(f"{base_url}")
validate_equals(response.get_status_code(), 200, "Verify the app url is reachable")
# Verify cert is issued from StepCa
issuer = secret_json_keywords.get_certificate_issuer(tls_secret_name, namespace)
expected_issuer = ConfigurationManager.get_security_config().get_stepca_server_issuer()
validate_equals(issuer, expected_issuer, f"Verify the certificate issuer is '{expected_issuer}'")
def teardown():
KubectlDeleteNamespaceKeywords(ssh_connection).cleanup_namespace(namespace)
KubectlFileDeleteKeywords(ssh_connection).delete_resources(global_policy_remote_path)
request.addfinalizer(teardown)
@mark.p0
def test_simple_ingress_routing_http(request):
"""
This test verifies ingress routing using path-based rules for HTTP.
Steps:
- Apply simple ingress routing resources (pods, services, ingress)
- Validate /apple and /banana routes respond correctly
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
lab_config = ConfigurationManager.get_lab_config()
oam_ip = lab_config.get_floating_ip()
namespace = "pvtest"
base_url = f"http://{oam_ip}"
if lab_config.is_ipv6():
base_url = f"http://[{oam_ip}]"
# Verify DNS (optional if using raw IP)
dns_name = ConfigurationManager.get_security_config().get_dns_name()
dns_resolution_status = IPAddressKeywords(oam_ip).check_dnsname_resolution(dns_name=dns_name)
validate_equals(dns_resolution_status, True, "Verify DNS name resolution")
# Upload and apply YAML
yaml_file = "simple_ingress_routing_http.yaml"
local_path = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{yaml_file}")
remote_path = f"/home/sysadmin/{yaml_file}"
FileKeywords(ssh_connection).upload_file(local_path, remote_path, overwrite=True)
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(remote_path)
# Wait for the application pods to be running
pod_status = KubectlGetPodsKeywords(ssh_connection).wait_for_all_pods_status("['Completed' , 'Running']")
validate_equals(pod_status, True, "Verify pods are running")
# Validate routing for /apple
response_apple = ssh_connection.send(f"curl -s {base_url}/apple")
validate_equals(response_apple[0].strip(), "apple", "Expected response for /apple")
# Validate routing for /banana
response_banana = ssh_connection.send(f"curl -s {base_url}/banana")
validate_equals(response_banana[0].strip(), "banana", "Expected response for /banana")
def teardown():
# Clean up all default namespace resources created by the test
KubectlDeleteNamespaceKeywords(ssh_connection).cleanup_namespace(namespace)
request.addfinalizer(teardown)
def test_simple_ingress_routing_https(request):
"""
This test verifies ingress routing using path-based rules for HTTPS.
Steps:
- Create a TLS secret using OpenSSLKeywords.
- Apply simple ingress routing resources (pods, services, ingress) with TLS configuration.
- Validate /apple and /banana routes respond correctly over HTTPS.
- Validate the correct TLS certificate is served.
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
lab_config = ConfigurationManager.get_lab_config()
oam_ip = lab_config.get_floating_ip()
namespace = "pvtest"
host_name = "konoha.rei"
KEY_FILE = "key.crt"
CERT_FILE = "cert.crt"
server_url = f"https://{host_name}"
tls_secret_name = "kanoha-secret"
expected_issuer = f"CN={host_name}"
# Create TLS certificate and key using the dedicated Ingress method
OpenSSLKeywords(ssh_connection).create_ingress_certificate(key=KEY_FILE, crt=CERT_FILE, host=host_name)
remote_key_path = f"/home/sysadmin/{KEY_FILE}"
remote_cert_path = f"/home/sysadmin/{CERT_FILE}"
KubectlCreateNamespacesKeywords(ssh_connection).create_namespaces(namespace)
KubectlCreateSecretsKeywords(ssh_connection).create_secret_generic(secret_name="kanoha-secret", tls_crt=remote_cert_path, tls_key=remote_key_path, namespace=namespace)
yaml_file = "simple_ingress_routing_https.yaml"
local_path = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{yaml_file}")
remote_yaml_path = f"/home/sysadmin/{yaml_file}"
FileKeywords(ssh_connection).upload_file(local_path, remote_yaml_path, overwrite=True)
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(remote_yaml_path)
# Wait for the application pods to be running
pod_status = KubectlGetPodsKeywords(ssh_connection).wait_for_all_pods_status("['Completed' , 'Running']")
validate_equals(pod_status, True, "Verify pods are running")
# Validate routing for /apple
cmd = f"curl -k {server_url}/apple --resolve {host_name}:443:[{oam_ip}] -s"
response_apple = ssh_connection.send(cmd)
validate_equals(response_apple[0].strip(), "apple", "Expected response for /apple")
# Validate routing for /banana
cmd = f"curl -k {server_url}/banana --resolve {host_name}:443:[{oam_ip}] -s"
response_banana = ssh_connection.send(cmd)
validate_equals(response_banana[0].strip(), "banana", "Expected response for /banana")
# Verify cert is issued from StepCa
issuer = KubectlGetSecretsKeywords(ssh_connection).get_certificate_issuer(tls_secret_name, namespace)
validate_equals(issuer, expected_issuer, f"Verify the certificate issuer is '{expected_issuer}'")
def teardown():
# Clean up all default namespace resources created by the test
KubectlDeleteNamespaceKeywords(ssh_connection).cleanup_namespace(namespace)
request.addfinalizer(teardown)