Merge "Adding the test for Cert Manager"
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
class HelmOverrideObject:
|
||||
"""
|
||||
This class represents a helm override entry as an object.
|
||||
|
||||
This is typically a line in the system helm override output table.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Constructor
|
||||
"""
|
||||
self.name: str = None
|
||||
self.namespace: str = None
|
||||
self.user_overrides: str = None
|
||||
|
||||
def set_name(self, name: str):
|
||||
"""
|
||||
Set the name of the helm override.
|
||||
|
||||
Args:
|
||||
name (str): The name of the helm override entry.
|
||||
"""
|
||||
self.name = name
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""
|
||||
Get the name of the helm override.
|
||||
|
||||
Returns:
|
||||
str: The name of the helm override entry.
|
||||
"""
|
||||
return self.name
|
||||
|
||||
def set_namespace(self, namespace: str):
|
||||
"""
|
||||
Set the namespace for the helm override.
|
||||
|
||||
Args:
|
||||
namespace (str): The Kubernetes namespace.
|
||||
"""
|
||||
self.namespace = namespace
|
||||
|
||||
def get_namespace(self) -> str:
|
||||
"""
|
||||
Get the namespace of the helm override.
|
||||
|
||||
Returns:
|
||||
str: The Kubernetes namespace.
|
||||
"""
|
||||
return self.namespace
|
||||
|
||||
def set_user_overrides(self, user_overrides: str):
|
||||
"""
|
||||
Set the user-defined overrides.
|
||||
|
||||
Args:
|
||||
user_overrides (str): string of user overrides.
|
||||
"""
|
||||
self.user_overrides = user_overrides
|
||||
|
||||
def get_user_overrides(self) -> str:
|
||||
"""
|
||||
Get the user-defined overrides.
|
||||
|
||||
Returns:
|
||||
str: string of user overrides.
|
||||
"""
|
||||
return self.user_overrides
|
||||
@@ -0,0 +1,60 @@
|
||||
from framework.exceptions.keyword_exception import KeywordException
|
||||
from framework.logging.automation_logger import get_logger
|
||||
from keywords.cloud_platform.system.helm.objects.system_helm_override_object import HelmOverrideObject
|
||||
from keywords.cloud_platform.system.system_vertical_table_parser import SystemVerticalTableParser
|
||||
|
||||
|
||||
class SystemHelmOverrideOutput:
|
||||
"""
|
||||
This class parses the output of 'system helm-override-list' command into an object of type HelmOverrideObject.
|
||||
"""
|
||||
|
||||
def __init__(self, helm_override_output: str):
|
||||
"""
|
||||
Constructor
|
||||
|
||||
Args:
|
||||
helm_override_output (str): Output of the 'system helm-override-list' command.
|
||||
|
||||
Raises:
|
||||
KeywordException: If the output is not valid.
|
||||
"""
|
||||
system_vertical_table_parser = SystemVerticalTableParser(helm_override_output)
|
||||
output_values = system_vertical_table_parser.get_output_values_dict()
|
||||
|
||||
if self.is_valid_output(output_values):
|
||||
self.helm_override = HelmOverrideObject()
|
||||
self.helm_override.set_name(output_values["name"])
|
||||
self.helm_override.set_namespace(output_values["namespace"])
|
||||
self.helm_override.set_user_overrides(output_values["user_overrides"])
|
||||
else:
|
||||
raise KeywordException(f"The output line {output_values} was not valid")
|
||||
|
||||
def get_helm_override_list(self) -> HelmOverrideObject:
|
||||
"""
|
||||
Returns the parsed helm override list object.
|
||||
|
||||
Returns:
|
||||
HelmOverrideObject: The parsed helm override object list.
|
||||
"""
|
||||
return self.helm_override
|
||||
|
||||
@staticmethod
|
||||
def is_valid_output(value: dict) -> bool:
|
||||
"""
|
||||
Checks if the output contains all the expected fields.
|
||||
|
||||
Args:
|
||||
value (dict): The dictionary of output values.
|
||||
|
||||
Returns:
|
||||
bool: True if the output contains all required fields, False otherwise.
|
||||
"""
|
||||
required_fields = ["name", "namespace", "user_overrides"]
|
||||
valid = True
|
||||
for field in required_fields:
|
||||
if field not in value:
|
||||
get_logger().log_error(f"{field} is not in the output value")
|
||||
valid = False
|
||||
break
|
||||
return valid
|
||||
@@ -0,0 +1,123 @@
|
||||
class HelmOverrideShowObject:
|
||||
"""
|
||||
Represents a single Helm override entry extracted from the 'system helm-override-show' command output.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initializes the HelmOverrideShowObject with default None values for all fields.
|
||||
"""
|
||||
self.name: str = None
|
||||
self.namespace: str = None
|
||||
self.attributes: dict = None
|
||||
self.combined_overrides: dict = None
|
||||
self.system_overrides: dict = None
|
||||
self.user_overrides: dict = None
|
||||
|
||||
def set_name(self, name: str):
|
||||
"""
|
||||
Sets the name of the Helm chart.
|
||||
|
||||
Args:
|
||||
name (str): The name of the Helm chart.
|
||||
"""
|
||||
self.name = name
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""
|
||||
Gets the name of the Helm chart.
|
||||
|
||||
Returns:
|
||||
str: The name of the Helm chart.
|
||||
"""
|
||||
return self.name
|
||||
|
||||
def set_namespace(self, namespace: str):
|
||||
"""
|
||||
Sets the namespace of the Helm chart.
|
||||
|
||||
Args:
|
||||
namespace (str): The Kubernetes namespace where the chart is deployed.
|
||||
"""
|
||||
self.namespace = namespace
|
||||
|
||||
def get_namespace(self) -> str:
|
||||
"""
|
||||
Gets the namespace of the Helm chart.
|
||||
|
||||
Returns:
|
||||
str: The Kubernetes namespace where the chart is deployed.
|
||||
"""
|
||||
return self.namespace
|
||||
|
||||
def set_attributes(self, attributes: dict):
|
||||
"""
|
||||
Sets the attributes for the Helm chart.
|
||||
|
||||
Args:
|
||||
attributes (dict): Attributes dictionary (e.g., {'enabled': True}).
|
||||
"""
|
||||
self.attributes = attributes
|
||||
|
||||
def get_attributes(self) -> dict:
|
||||
"""
|
||||
Gets the attributes for the Helm chart.
|
||||
|
||||
Returns:
|
||||
dict: Attributes dictionary.
|
||||
"""
|
||||
return self.attributes
|
||||
|
||||
def set_combined_overrides(self, combined_overrides: dict):
|
||||
"""
|
||||
Sets the combined overrides (merged system and user) for the chart.
|
||||
|
||||
Args:
|
||||
combined_overrides (dict): Combined override values in dictionary format.
|
||||
"""
|
||||
self.combined_overrides = combined_overrides
|
||||
|
||||
def get_combined_overrides(self) -> dict:
|
||||
"""
|
||||
Gets the combined overrides for the chart.
|
||||
|
||||
Returns:
|
||||
dict: Combined override values.
|
||||
"""
|
||||
return self.combined_overrides
|
||||
|
||||
def set_system_overrides(self, system_overrides: dict):
|
||||
"""
|
||||
Sets the system-defined overrides for the chart.
|
||||
|
||||
Args:
|
||||
system_overrides (dict): System overrides in dictionary format.
|
||||
"""
|
||||
self.system_overrides = system_overrides
|
||||
|
||||
def get_system_overrides(self) -> dict:
|
||||
"""
|
||||
Gets the system-defined overrides for the chart.
|
||||
|
||||
Returns:
|
||||
dict: System overrides dictionary.
|
||||
"""
|
||||
return self.system_overrides
|
||||
|
||||
def set_user_overrides(self, user_overrides: dict):
|
||||
"""
|
||||
Sets the user-defined overrides for the chart.
|
||||
|
||||
Args:
|
||||
user_overrides (dict): User override values in dictionary format.
|
||||
"""
|
||||
self.user_overrides = user_overrides
|
||||
|
||||
def get_user_overrides(self) -> dict:
|
||||
"""
|
||||
Gets the user-defined overrides for the chart.
|
||||
|
||||
Returns:
|
||||
dict: User override values.
|
||||
"""
|
||||
return self.user_overrides
|
||||
@@ -0,0 +1,63 @@
|
||||
from framework.exceptions.keyword_exception import KeywordException
|
||||
from framework.logging.automation_logger import get_logger
|
||||
from keywords.cloud_platform.system.helm.objects.system_helm_override_show_object import HelmOverrideShowObject
|
||||
from keywords.cloud_platform.system.system_vertical_table_parser import SystemVerticalTableParser
|
||||
|
||||
|
||||
class SystemHelmOverrideShowOutput:
|
||||
"""
|
||||
This class parses the output of 'system helm-override-show' command into an object of type HelmOverrideShowObject.
|
||||
"""
|
||||
|
||||
def __init__(self, helm_override_show_output: str):
|
||||
"""
|
||||
Initialize the SystemHelmOverrideShowOutput class.
|
||||
|
||||
Args:
|
||||
helm_override_show_output (str): Output of the 'system helm-override-show' command.
|
||||
|
||||
Raises:
|
||||
KeywordException: If the output is not valid.
|
||||
"""
|
||||
system_vertical_table_parser = SystemVerticalTableParser(helm_override_show_output)
|
||||
output_values = system_vertical_table_parser.get_output_values_dict()
|
||||
|
||||
if self.is_valid_output(output_values):
|
||||
self.helm_override_show = HelmOverrideShowObject()
|
||||
self.helm_override_show.set_name(output_values["name"])
|
||||
self.helm_override_show.set_namespace(output_values["namespace"])
|
||||
self.helm_override_show.set_attributes(output_values["attributes"])
|
||||
self.helm_override_show.set_combined_overrides(output_values["combined_overrides"])
|
||||
self.helm_override_show.set_system_overrides(output_values["system_overrides"])
|
||||
self.helm_override_show.set_user_overrides(output_values["user_overrides"])
|
||||
else:
|
||||
raise KeywordException(f"The output line {output_values} was not valid")
|
||||
|
||||
def get_helm_override_show(self) -> HelmOverrideShowObject:
|
||||
"""
|
||||
Returns the parsed helm override object.
|
||||
|
||||
Returns:
|
||||
HelmOverrideShowObject: The parsed helm override object.
|
||||
"""
|
||||
return self.helm_override_show
|
||||
|
||||
@staticmethod
|
||||
def is_valid_output(value: dict) -> bool:
|
||||
"""
|
||||
Checks if the output contains all the expected fields.
|
||||
|
||||
Args:
|
||||
value (dict): The dictionary of output values.
|
||||
|
||||
Returns:
|
||||
bool: True if the output contains all required fields, False otherwise.
|
||||
"""
|
||||
required_fields = ["name", "namespace", "attributes", "combined_overrides", "system_overrides", "user_overrides"]
|
||||
valid = True
|
||||
for field in required_fields:
|
||||
if field not in value:
|
||||
get_logger().log_error(f"{field} is not in the output value")
|
||||
valid = False
|
||||
break
|
||||
return valid
|
||||
@@ -0,0 +1,72 @@
|
||||
from framework.ssh.ssh_connection import SSHConnection
|
||||
from framework.validation.validation import validate_str_contains
|
||||
from keywords.base_keyword import BaseKeyword
|
||||
from keywords.cloud_platform.command_wrappers import source_openrc
|
||||
from keywords.cloud_platform.system.helm.objects.system_helm_override_output import SystemHelmOverrideOutput
|
||||
from keywords.cloud_platform.system.helm.objects.system_helm_override_show_output import SystemHelmOverrideShowOutput
|
||||
|
||||
|
||||
class SystemHelmOverrideKeywords(BaseKeyword):
|
||||
"""
|
||||
This class contains all the keywords related to the 'system helm-override' commands.
|
||||
"""
|
||||
|
||||
def __init__(self, ssh_connection: SSHConnection):
|
||||
"""
|
||||
Initialize the KubectlGetPodsKeywords class.
|
||||
|
||||
Args:
|
||||
ssh_connection (SSHConnection): An SSH connection object to the target system.
|
||||
"""
|
||||
self.ssh_connection = ssh_connection
|
||||
|
||||
def update_helm_override(self, yaml_file: str, app_name: str, chart_name: str, namespace: str) -> SystemHelmOverrideOutput:
|
||||
"""
|
||||
Gets the system helm-override list
|
||||
|
||||
Args:
|
||||
yaml_file (str): the yaml file with override values
|
||||
app_name (str): the app name
|
||||
chart_name (str): the chart name
|
||||
namespace (str): the namespace
|
||||
|
||||
Returns:
|
||||
SystemHelmOverrideOutput: object with the list of helm overrides.
|
||||
"""
|
||||
command = source_openrc(f"system helm-override-update --values {yaml_file} {app_name} {chart_name} {namespace}")
|
||||
output = self.ssh_connection.send(command)
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
system_helm_override_output = SystemHelmOverrideOutput(output)
|
||||
return system_helm_override_output
|
||||
|
||||
def get_system_helm_override_show(self, app_name: str, chart_name: str, namespace: str) -> SystemHelmOverrideShowOutput:
|
||||
"""
|
||||
Gets the system helm-override show
|
||||
|
||||
Args:
|
||||
app_name (str): the app name
|
||||
chart_name (str): the chart name
|
||||
namespace (str): the namespace
|
||||
|
||||
Returns:
|
||||
SystemHelmOverrideShowOutput: The parsed helm override show object.
|
||||
"""
|
||||
command = source_openrc(f"system helm-override-show {app_name} {chart_name} {namespace}")
|
||||
output = self.ssh_connection.send(command)
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
system_helm_override_show_output = SystemHelmOverrideShowOutput(output)
|
||||
return system_helm_override_show_output
|
||||
|
||||
def verify_helm_user_override(self, label: str, app_name: str, chart_name: str, namespace: str):
|
||||
"""
|
||||
Verifies the user override
|
||||
|
||||
Args:
|
||||
label (str): the label
|
||||
app_name (str): the app name
|
||||
chart_name (str): the chart name
|
||||
namespace (str): the namespace
|
||||
"""
|
||||
user_override = self.get_system_helm_override_show(app_name, chart_name, namespace).get_helm_override_show().get_user_overrides()
|
||||
|
||||
validate_str_contains(user_override, label, "User Override")
|
||||
@@ -21,28 +21,35 @@ class KubectlGetPodsKeywords(BaseKeyword):
|
||||
ssh_connection (SSHConnection): An SSH connection object to the target system.
|
||||
"""
|
||||
self.ssh_connection = ssh_connection
|
||||
|
||||
def get_pods(self, namespace: str = None) -> KubectlGetPodsOutput:
|
||||
|
||||
def get_pods(self, namespace: str = None, label: str = None) -> KubectlGetPodsOutput:
|
||||
"""
|
||||
Gets the k8s pods that are available using '-o wide'.
|
||||
|
||||
Args:
|
||||
namespace(str, optional): The namespace to search for pods. If None, it will search in all namespaces.
|
||||
label (str, optional): The label to search for pods.
|
||||
|
||||
Returns:
|
||||
KubectlGetPodsOutput: An object containing the parsed output of the command.
|
||||
|
||||
"""
|
||||
arg_namespace = ""
|
||||
|
||||
arg_label = ""
|
||||
|
||||
if namespace:
|
||||
arg_namespace = f"-n {namespace}"
|
||||
|
||||
kubectl_get_pods_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} -o wide get pods"))
|
||||
if label:
|
||||
arg_label = f"-l {label}"
|
||||
|
||||
kubectl_get_pods_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} {arg_label} -o wide get pods"))
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
pods_list_output = KubectlGetPodsOutput(kubectl_get_pods_output)
|
||||
|
||||
return pods_list_output
|
||||
|
||||
|
||||
def get_pods_no_validation(self, namespace: str = None) -> KubectlGetPodsOutput:
|
||||
"""
|
||||
Gets the k8s pods that are available using '-o wide'.
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
podLabels:
|
||||
'{{ cm_label_key }}': '{{ cm_label_value }}'
|
||||
@@ -1,17 +1,23 @@
|
||||
from pytest import mark
|
||||
from pytest import FixtureRequest, mark
|
||||
|
||||
from config.configuration_file_locations_manager import ConfigurationFileLocationsManager
|
||||
from config.configuration_manager import ConfigurationManager
|
||||
from framework.logging.automation_logger import get_logger
|
||||
from framework.resources.resource_finder import get_stx_resource_path
|
||||
from framework.validation.validation import validate_equals
|
||||
from framework.validation.validation import validate_equals, validate_equals_with_retry, validate_list_contains, validate_str_contains, validate_str_contains_with_retry
|
||||
from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient
|
||||
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
|
||||
from keywords.files.file_keywords import FileKeywords
|
||||
from keywords.cloud_platform.system.application.object.system_application_status_enum import SystemApplicationStatusEnum
|
||||
from keywords.cloud_platform.system.application.system_application_apply_keywords import SystemApplicationApplyKeywords
|
||||
from keywords.cloud_platform.system.helm.system_helm_override_keywords import SystemHelmOverrideKeywords
|
||||
from keywords.files.file_keqywords import FileKeywords
|
||||
from keywords.files.yaml_keywords import YamlKeywords
|
||||
from keywords.k8s.certificate.kubectl_get_certificate_keywords import KubectlGetCertStatusKeywords
|
||||
from keywords.k8s.certificate.kubectl_get_issuer_keywords import KubectlGetCertIssuerKeywords
|
||||
from keywords.k8s.files.kubectl_file_delete_keywords import KubectlFileDeleteKeywords
|
||||
from keywords.k8s.namespace.kubectl_create_namespace_keywords import KubectlCreateNamespacesKeywords
|
||||
from keywords.k8s.namespace.kubectl_delete_namespace_keywords import KubectlDeleteNamespaceKeywords
|
||||
from keywords.k8s.namespace.kubectl_get_namespaces_keywords import KubectlGetNamespacesKeywords
|
||||
from keywords.k8s.pods.kubectl_apply_pods_keywords import KubectlApplyPodsKeywords
|
||||
from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords
|
||||
from keywords.k8s.secret.kubectl_create_secret_keywords import KubectlCreateSecretsKeywords
|
||||
@@ -203,3 +209,158 @@ def test_simple_ingress_routing_https(request):
|
||||
KubectlDeleteNamespaceKeywords(ssh_connection).cleanup_namespace(namespace)
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
|
||||
|
||||
@mark.p3
|
||||
def test_override_cert_manager():
|
||||
"""
|
||||
Verify post-install override functionality of cert-manager app.
|
||||
|
||||
Test Steps:
|
||||
- Override helm values using system override
|
||||
- Re-apply the application
|
||||
- Confirm pods are in Running state post-override
|
||||
|
||||
"""
|
||||
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
|
||||
|
||||
app_name = "cert-manager"
|
||||
chart_name = "cert-manager"
|
||||
namespace = "cert-manager"
|
||||
label_key = "test"
|
||||
label_value = "cm_label"
|
||||
label = f"{label_key}: {label_value}"
|
||||
k8s_label = f"{label_key}={label_value}"
|
||||
|
||||
cm_override_file_name = "cm_override_values.yaml"
|
||||
|
||||
config_file_locations = ConfigurationFileLocationsManager()
|
||||
ConfigurationManager.load_configs(config_file_locations)
|
||||
|
||||
template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{cm_override_file_name}")
|
||||
replacement_dictionary = {"cm_label_key": label_key, "cm_label_value": label_value}
|
||||
get_logger().log_info(f"Creating resource from file {template_file}")
|
||||
remote_path = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, f"{cm_override_file_name}", "/home/sysadmin")
|
||||
|
||||
get_logger().log_info(f"Helm override for {app_name} with custom values")
|
||||
SystemHelmOverrideKeywords(ssh_connection).update_helm_override(remote_path, app_name, chart_name, namespace)
|
||||
|
||||
get_logger().log_info(f"Verify helm override show for {app_name} with custom values")
|
||||
SystemHelmOverrideKeywords(ssh_connection).verify_helm_user_override(label, app_name, chart_name, namespace)
|
||||
get_logger().log_info(f"Re-apply the {app_name} application")
|
||||
# Step 3: Re-apply the cert manager app on the active controller
|
||||
|
||||
# Executes the application installation
|
||||
system_application_apply_output = SystemApplicationApplyKeywords(ssh_connection).system_application_apply(app_name)
|
||||
|
||||
# Asserts that the applying process concluded successfully
|
||||
system_application_object = system_application_apply_output.get_system_application_object()
|
||||
validate_str_contains(system_application_object.get_name(), app_name, "Apply cert-manager")
|
||||
validate_str_contains(system_application_object.get_status(), SystemApplicationStatusEnum.APPLIED.value, "Apply cert-manager")
|
||||
|
||||
def get_pod_status():
|
||||
pod_status = KubectlGetPodsKeywords(ssh_connection).get_pods(namespace=namespace, label=k8s_label).get_pods_start_with(app_name)[0].get_status()
|
||||
return pod_status == "Running"
|
||||
|
||||
validate_equals_with_retry(get_pod_status, True, 600)
|
||||
|
||||
|
||||
@mark.p3
|
||||
def test_manual_cert_installation(request: FixtureRequest):
|
||||
"""
|
||||
Test manual installation of 'system-registry-local-certificate', 'system-restapi-gui-certificate'
|
||||
|
||||
Args:
|
||||
request (FixtureRequest): request needed for adding teardown
|
||||
|
||||
Steps:
|
||||
- Create the namespace
|
||||
- Deploy the Stepca issuer
|
||||
- Install the stepca root secret
|
||||
- Deploy the cert with stepca issuer
|
||||
- Verify that installed cert shown in "system certificate-list"
|
||||
- Now deploy a certificate
|
||||
- Check that the manual installation of certificate is accepted
|
||||
Teardown:
|
||||
- Delete the namespace
|
||||
|
||||
"""
|
||||
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
|
||||
lab_config = ConfigurationManager.get_lab_config()
|
||||
oam_ip = lab_config.get_floating_ip()
|
||||
|
||||
cluster_issuer = "system-selfsigning-issuer"
|
||||
root_ca_cert = "cloudplatform-rootca-certificate"
|
||||
root_ca_secret = "cloudplatform-rootca-secret"
|
||||
platform_issuer = "cloudplatform-issuer"
|
||||
registry_local_cert = "system-registry-local-certificate"
|
||||
registry_local_secret = "system-registry-local-secret"
|
||||
restapi_gui_cert = "system-restapi-gui-certificate"
|
||||
restapi_gui_secret = "system-restapi-gui-secret"
|
||||
|
||||
registry_local_cert_file_name = "registry_local_cert.yaml"
|
||||
restapi_gui_cert_file_name = "restapi_gui_cert.yaml"
|
||||
cluster_issuer_file_name = "cluster_issuer.yaml"
|
||||
root_cacert_file_name = "root_ca_cert.yaml"
|
||||
platform_issuer_file_name = "platform_issuer.yaml"
|
||||
namespace = "testcert"
|
||||
|
||||
kubectl_create_ns_keyword = KubectlCreateNamespacesKeywords(ssh_connection)
|
||||
kubectl_create_ns_keyword.create_namespaces(namespace)
|
||||
ns_list = KubectlGetNamespacesKeywords(ssh_connection).get_namespaces()
|
||||
|
||||
validate_equals(ns_list.is_namespace(namespace_name=namespace), True, "create namespace")
|
||||
|
||||
cluster_issuer_template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{cluster_issuer_file_name}")
|
||||
issuer_replacement_dictionary = {"cluster_issuer": cluster_issuer}
|
||||
cluster_issuer_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(cluster_issuer_template_file, issuer_replacement_dictionary, f"{cluster_issuer_file_name}", "/home/sysadmin")
|
||||
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(cluster_issuer_yaml)
|
||||
|
||||
root_cacert_template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{root_cacert_file_name}")
|
||||
root_cacert_replacement_dictionary = {"root_ca_cert": root_ca_cert, "root_ca_secret": root_ca_secret, "cluster_issuer": cluster_issuer, "namespace": namespace}
|
||||
root_cacert_issuer_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(root_cacert_template_file, root_cacert_replacement_dictionary, f"{root_cacert_file_name}", "/home/sysadmin")
|
||||
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(root_cacert_issuer_yaml)
|
||||
platform_issuer_template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{platform_issuer_file_name}")
|
||||
platform_issuer_replacement_dictionary = {"root_ca_secret": root_ca_secret, "platform_issuer": platform_issuer, "namespace": namespace}
|
||||
platform_issuer_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(platform_issuer_template_file, platform_issuer_replacement_dictionary, f"{platform_issuer_file_name}", "/home/sysadmin")
|
||||
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(platform_issuer_yaml)
|
||||
|
||||
KubectlGetCertIssuerKeywords(ssh_connection).wait_for_issuer_status(platform_issuer, True, namespace)
|
||||
# Check the cert status
|
||||
KubectlGetCertStatusKeywords(ssh_connection).wait_for_certs_status(root_ca_cert, True, namespace)
|
||||
root_ca_list_of_secrets = KubectlGetSecretsKeywords(ssh_connection).get_secret_names(namespace=namespace)
|
||||
validate_list_contains(root_ca_secret, root_ca_list_of_secrets, "Root ca secret")
|
||||
KubectlGetCertIssuerKeywords(ssh_connection).wait_for_issuer_status(platform_issuer, True, namespace)
|
||||
|
||||
registry_local_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{registry_local_cert_file_name}")
|
||||
registry_replacement_dictionary = {"registry_local_cert": registry_local_cert, "registry_local_secret": registry_local_secret, "platform_issuer": platform_issuer, "floating_ip": oam_ip, "namespace": namespace}
|
||||
registry_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(registry_local_file, registry_replacement_dictionary, f"{registry_local_cert_file_name}", "/home/sysadmin")
|
||||
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(registry_yaml)
|
||||
# Check the cert status
|
||||
KubectlGetCertStatusKeywords(ssh_connection).wait_for_certs_status(registry_local_cert, True, namespace)
|
||||
|
||||
def get_list_of_secrets():
|
||||
return KubectlGetSecretsKeywords(ssh_connection).get_secret_names(namespace=namespace)
|
||||
|
||||
validate_str_contains_with_retry(get_list_of_secrets, registry_local_secret, "Registry local secret", timeout=10)
|
||||
restapi_gui_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{restapi_gui_cert_file_name}")
|
||||
restapi_replacement_dictionary = {"restapi_gui_cert": restapi_gui_cert, "restapi_gui_secret": restapi_gui_secret, "platform_issuer": platform_issuer, "floating_ip": oam_ip, "namespace": namespace}
|
||||
restapi_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(restapi_gui_file, restapi_replacement_dictionary, f"{restapi_gui_cert_file_name}", "/home/sysadmin")
|
||||
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(restapi_yaml)
|
||||
# Check the cert status
|
||||
KubectlGetCertStatusKeywords(ssh_connection).wait_for_certs_status(restapi_gui_cert, True, namespace)
|
||||
validate_str_contains_with_retry(get_list_of_secrets, restapi_gui_secret, "restapi gui secret", timeout=10)
|
||||
|
||||
def teardown_namespace():
|
||||
# cleanup created dashboard namespace
|
||||
get_logger().log_info("Deleting testcert namespace")
|
||||
ns_list = KubectlGetNamespacesKeywords(ssh_connection).get_namespaces()
|
||||
|
||||
if ns_list.is_namespace(namespace_name=namespace):
|
||||
get_logger().log_info("Deleting testcert namespace")
|
||||
# delete created namespace
|
||||
KubectlDeleteNamespaceKeywords(ssh_connection).cleanup_namespace(namespace=namespace)
|
||||
else:
|
||||
get_logger().log_info("testcert namespace does not exist")
|
||||
|
||||
request.addfinalizer(teardown_namespace)
|
||||
|
||||
Reference in New Issue
Block a user