Add kubectl GlobalNetworkPolicy keywords
Implements kubectl support for Calico GlobalNetworkPolicies: - KubectlGetGlobalNetworkPolicyKeywords: List and retrieve policies - KubectlGlobalNetworkPolicyObject: Policy data container - KubectlGetGlobalNetworkPolicyOutput: Collection management - KubectlGetGlobalNetworkPolicyTableParser: kubectl output parsing Provide policy validation for GlobalNetworkPolicy testing. Change-Id: I9aa43a748c27a5dbc6a613f956b8da3dee5c34f1 Signed-off-by: Andrew Vaillancourt <andrew.vaillancourt@windriver.com>
This commit is contained in:
@@ -0,0 +1,82 @@
|
||||
from framework.ssh.ssh_connection import SSHConnection
|
||||
from keywords.base_keyword import BaseKeyword
|
||||
from keywords.k8s.globalnetworkpolicy.object.kubectl_get_globalnetworkpolicy_output import KubectlGetGlobalNetworkPolicyOutput
|
||||
from keywords.k8s.globalnetworkpolicy.object.kubectl_globalnetworkpolicy_object import KubectlGlobalNetworkPolicyObject
|
||||
from keywords.k8s.k8s_command_wrapper import export_k8s_config
|
||||
|
||||
|
||||
class KubectlGetGlobalNetworkPolicyKeywords(BaseKeyword):
|
||||
"""
|
||||
Class for 'kubectl get globalnetworkpolicies.crd.projectcalico.org' keywords
|
||||
"""
|
||||
|
||||
def __init__(self, ssh_connection: SSHConnection):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
ssh_connection (SSHConnection): SSH connection object.
|
||||
"""
|
||||
self.ssh_connection = ssh_connection
|
||||
|
||||
def list_globalnetworkpolicies(self) -> KubectlGetGlobalNetworkPolicyOutput:
|
||||
"""Gets the Calico GlobalNetworkPolicies available.
|
||||
|
||||
Returns:
|
||||
KubectlGetGlobalNetworkPolicyOutput: The GlobalNetworkPolicy output.
|
||||
"""
|
||||
kubectl_get_globalnetworkpolicy_output = self.ssh_connection.send(export_k8s_config("kubectl get globalnetworkpolicies.crd.projectcalico.org"))
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
globalnetworkpolicy_list_output = KubectlGetGlobalNetworkPolicyOutput(kubectl_get_globalnetworkpolicy_output)
|
||||
|
||||
return globalnetworkpolicy_list_output
|
||||
|
||||
def get_globalnetworkpolicy_by_name(self, policy_name: str) -> KubectlGlobalNetworkPolicyObject:
|
||||
"""
|
||||
Gets a specific GlobalNetworkPolicy by name.
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the GlobalNetworkPolicy to retrieve.
|
||||
|
||||
Returns:
|
||||
KubectlGlobalNetworkPolicyObject: The GlobalNetworkPolicy object.
|
||||
|
||||
Raises:
|
||||
ValueError: If the GlobalNetworkPolicy is not found.
|
||||
"""
|
||||
globalnetworkpolicy_output = self.list_globalnetworkpolicies()
|
||||
return globalnetworkpolicy_output.get_globalnetworkpolicy_by_name(policy_name)
|
||||
|
||||
def has_globalnetworkpolicy(self, policy_name: str) -> bool:
|
||||
"""
|
||||
Check if GlobalNetworkPolicy exists.
|
||||
|
||||
Returns False on any error, making it safe for polling validation.
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the GlobalNetworkPolicy to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the policy exists, False otherwise.
|
||||
"""
|
||||
try:
|
||||
globalnetworkpolicy_output = self.list_globalnetworkpolicies()
|
||||
return globalnetworkpolicy_output.has_globalnetworkpolicy(policy_name)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def get_globalnetworkpolicy_yaml(self, policy_name: str) -> str:
|
||||
"""
|
||||
Get GlobalNetworkPolicy details in YAML format.
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the GlobalNetworkPolicy to retrieve.
|
||||
|
||||
Returns:
|
||||
str: The GlobalNetworkPolicy YAML as a string.
|
||||
|
||||
Raises:
|
||||
Exception: If kubectl command fails or policy not found.
|
||||
"""
|
||||
output = self.ssh_connection.send(export_k8s_config(f"kubectl get globalnetworkpolicy {policy_name} -o yaml"))
|
||||
self.validate_success_return_code(self.ssh_connection)
|
||||
return "\n".join(output) if isinstance(output, list) else output
|
||||
@@ -0,0 +1,78 @@
|
||||
from typing import List
|
||||
|
||||
from keywords.k8s.globalnetworkpolicy.object.kubectl_get_globalnetworkpolicy_table_parser import KubectlGetGlobalNetworkPolicyTableParser
|
||||
from keywords.k8s.globalnetworkpolicy.object.kubectl_globalnetworkpolicy_object import KubectlGlobalNetworkPolicyObject
|
||||
|
||||
|
||||
class KubectlGetGlobalNetworkPolicyOutput:
|
||||
"""
|
||||
Class for 'kubectl get globalnetworkpolicies.crd.projectcalico.org output' keywords
|
||||
"""
|
||||
|
||||
def __init__(self, kubectl_get_globalnetworkpolicy_output: str):
|
||||
"""
|
||||
Constructor
|
||||
|
||||
Args:
|
||||
kubectl_get_globalnetworkpolicy_output (str): Raw string output from running a "kubectl get globalnetworkpolicies.crd.projectcalico.org" command.
|
||||
|
||||
"""
|
||||
self.kubectl_globalnetworkpolicies: List[KubectlGlobalNetworkPolicyObject] = []
|
||||
kubectl_get_globalnetworkpolicy_table_parser = KubectlGetGlobalNetworkPolicyTableParser(kubectl_get_globalnetworkpolicy_output)
|
||||
output_values_list = kubectl_get_globalnetworkpolicy_table_parser.get_output_values_list()
|
||||
|
||||
for policy_dict in output_values_list:
|
||||
|
||||
if "NAME" not in policy_dict:
|
||||
raise ValueError(f"There is no NAME associated with the GlobalNetworkPolicy: {policy_dict}")
|
||||
|
||||
policy = KubectlGlobalNetworkPolicyObject(policy_dict["NAME"])
|
||||
|
||||
if "AGE" in policy_dict:
|
||||
policy.set_age(policy_dict["AGE"])
|
||||
|
||||
self.kubectl_globalnetworkpolicies.append(policy)
|
||||
|
||||
def get_globalnetworkpolicies(self) -> List[KubectlGlobalNetworkPolicyObject]:
|
||||
"""
|
||||
Return the list of all GlobalNetworkPolicies available.
|
||||
|
||||
Returns:
|
||||
List[KubectlGlobalNetworkPolicyObject]: List of GlobalNetworkPolicy objects.
|
||||
|
||||
"""
|
||||
return self.kubectl_globalnetworkpolicies
|
||||
|
||||
def get_globalnetworkpolicy_by_name(self, policy_name: str) -> KubectlGlobalNetworkPolicyObject:
|
||||
"""
|
||||
Return a GlobalNetworkPolicy with the specified name.
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the GlobalNetworkPolicy to retrieve.
|
||||
|
||||
Returns:
|
||||
KubectlGlobalNetworkPolicyObject: The GlobalNetworkPolicy object with the specified name.
|
||||
|
||||
Raises:
|
||||
ValueError: If no GlobalNetworkPolicy with the specified name is found.
|
||||
"""
|
||||
for policy in self.kubectl_globalnetworkpolicies:
|
||||
if policy.get_name() == policy_name:
|
||||
return policy
|
||||
raise ValueError(f"GlobalNetworkPolicy '{policy_name}' not found")
|
||||
|
||||
def has_globalnetworkpolicy(self, policy_name: str) -> bool:
|
||||
"""
|
||||
Check if a GlobalNetworkPolicy with the specified name exists.
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the GlobalNetworkPolicy to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the GlobalNetworkPolicy exists, False otherwise.
|
||||
|
||||
"""
|
||||
for policy in self.kubectl_globalnetworkpolicies:
|
||||
if policy.get_name() == policy_name:
|
||||
return True
|
||||
return False
|
||||
@@ -0,0 +1,21 @@
|
||||
from typing import List
|
||||
|
||||
from keywords.k8s.k8s_table_parser_base import K8sTableParserBase
|
||||
|
||||
|
||||
class KubectlGetGlobalNetworkPolicyTableParser(K8sTableParserBase):
|
||||
"""
|
||||
Class for parsing the output of "kubectl get globalnetworkpolicies.crd.projectcalico.org" commands.
|
||||
"""
|
||||
|
||||
def __init__(self, k8s_output: List[str]):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
k8s_output (List[str]): The raw String output of a kubernetes command that returns a table.
|
||||
"""
|
||||
super().__init__(k8s_output)
|
||||
self.possible_headers = [
|
||||
"NAME",
|
||||
"AGE",
|
||||
]
|
||||
@@ -0,0 +1,37 @@
|
||||
class KubectlGlobalNetworkPolicyObject:
|
||||
"""
|
||||
Class to hold attributes of a 'kubectl get globalnetworkpolicies.crd.projectcalico.org' policy entry.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
name (str): Name of the GlobalNetworkPolicy.
|
||||
"""
|
||||
self.name = name
|
||||
self.age = None
|
||||
|
||||
def get_name(self) -> str:
|
||||
"""Getter for NAME entry.
|
||||
|
||||
Returns:
|
||||
str: The name of the GlobalNetworkPolicy.
|
||||
"""
|
||||
return self.name
|
||||
|
||||
def set_age(self, age: str) -> None:
|
||||
"""Setter for AGE.
|
||||
|
||||
Args:
|
||||
age (str): Age of the GlobalNetworkPolicy.
|
||||
"""
|
||||
self.age = age
|
||||
|
||||
def get_age(self) -> str:
|
||||
"""Getter for AGE entry.
|
||||
|
||||
Returns:
|
||||
str: The age of the GlobalNetworkPolicy.
|
||||
"""
|
||||
return self.age
|
||||
@@ -0,0 +1,40 @@
|
||||
from keywords.k8s.globalnetworkpolicy.object.kubectl_get_globalnetworkpolicy_table_parser import KubectlGetGlobalNetworkPolicyTableParser
|
||||
|
||||
|
||||
def test_get_globalnetworkpolicy_table_parser():
|
||||
"""Tests the k8s_get_globalnetworkpolicy table parser."""
|
||||
get_globalnetworkpolicy_output = [
|
||||
"NAME AGE\n",
|
||||
"controller-cluster-host-if-gnp 15d\n",
|
||||
"controller-mgmt-if-gnp 15d\n",
|
||||
"gnp-oam-overrides 1m\n",
|
||||
]
|
||||
|
||||
table_parser = KubectlGetGlobalNetworkPolicyTableParser(get_globalnetworkpolicy_output)
|
||||
output_values = table_parser.get_output_values_list()
|
||||
|
||||
assert len(output_values) == 3, "There are three entries in this get globalnetworkpolicy table."
|
||||
first_line = output_values[0]
|
||||
|
||||
assert first_line["NAME"] == "controller-cluster-host-if-gnp"
|
||||
assert first_line["AGE"] == "15d"
|
||||
|
||||
second_line = output_values[1]
|
||||
assert second_line["NAME"] == "controller-mgmt-if-gnp"
|
||||
assert second_line["AGE"] == "15d"
|
||||
|
||||
third_line = output_values[2]
|
||||
assert third_line["NAME"] == "gnp-oam-overrides"
|
||||
assert third_line["AGE"] == "1m"
|
||||
|
||||
|
||||
def test_get_globalnetworkpolicy_table_parser_empty():
|
||||
"""Tests the k8s_get_globalnetworkpolicy table parser with empty output."""
|
||||
get_globalnetworkpolicy_output = [
|
||||
"NAME AGE\n",
|
||||
]
|
||||
|
||||
table_parser = KubectlGetGlobalNetworkPolicyTableParser(get_globalnetworkpolicy_output)
|
||||
output_values = table_parser.get_output_values_list()
|
||||
|
||||
assert len(output_values) == 0, "There should be no entries in empty table."
|
||||
Reference in New Issue
Block a user