Files
test/config/lab/objects/lab_config.py
Abhishek jaiswal fae03d45ab Refactor phased deployment test case and keywords
This commit refactors the phased deployment test case and associated
keywords to use the new configuration for retrieving labs and their
deployment assets.

* The test case and keywords have been updated to utilize the new configuration system.
* This ensures that the tests accurately reflect the current lab setup and deployment process.
* Removes reliance on older methods of obtaining lab and deployment information.

Change-Id: I26e7b170ff07711b999486665153ab00812e874c
Signed-off-by: Abhishek jaiswal <abhishek.jaiswal@windriver.com>
2025-04-23 17:37:55 +00:00

512 lines
15 KiB
Python

from typing import List, Optional
import json5
from config.host.objects.host_configuration import HostConfiguration
from config.lab.objects.credentials import Credentials
from config.lab.objects.node import Node
from framework.resources.resource_finder import get_stx_resource_path
class LabConfig:
"""
Class to hold lab config
"""
def __init__(self, config):
try:
json_data = open(config)
except FileNotFoundError:
print(f"Could not find the lab config file: {config}")
raise
lab_dict = json5.load(json_data)
self.floating_ip = lab_dict["floating_ip"]
self.lab_name = lab_dict["lab_name"]
self.lab_type = lab_dict["lab_type"]
self.admin_credentials = Credentials(lab_dict["admin_credentials"])
self.bm_password = lab_dict["bm_password"]
self.use_jump_server = lab_dict["use_jump_server"]
if "jump_server_config" in lab_dict:
jump_host_config_location = get_stx_resource_path(lab_dict["jump_server_config"])
self.jump_server_config = HostConfiguration(jump_host_config_location)
self.ssh_port: int = 22
if "ssh_port" in lab_dict:
self.ssh_port = int(lab_dict["ssh_port"])
self.lab_capabilities = []
if "lab_capabilities" in lab_dict:
self.lab_capabilities = lab_dict["lab_capabilities"]
if "horizon_url" in lab_dict:
self.horizon_url = lab_dict["horizon_url"]
else:
self.horizon_url = f"https://[{self.floating_ip}]:8443/"
if "horizon_credentials" in lab_dict:
self.horizon_credentials = Credentials(lab_dict["horizon_credentials"])
else:
self.horizon_credentials = Credentials({"user_name": "admin", "password": self.admin_credentials.get_password()})
if "rest_credentials" in lab_dict:
self.rest_credentials = Credentials(lab_dict["rest_credentials"])
else:
self.rest_credentials = Credentials({"user_name": "admin", "password": self.admin_credentials.get_password()})
self.nodes = []
self.subclouds: List["LabConfig"] = []
# if subclouds are listed in the config get the list with the subcloud's names.
if "subclouds" in lab_dict:
for subcloud in lab_dict["subclouds"]:
subcloud_config_location = get_stx_resource_path((lab_dict["subclouds"][subcloud]))
self.subclouds.append(LabConfig(subcloud_config_location))
if "nodes" in lab_dict:
for node in lab_dict["nodes"]:
self.nodes.append(Node(node, lab_dict["nodes"][node]))
self.ipv6 = True
if ":" not in self.floating_ip:
self.ipv6 = False
self.is_dc_lab = len(self.subclouds) > 0
# If there is a value in the config, override the above value
if "is_dc" in lab_dict:
self.is_dc_lab = lab_dict["is_dc"]
self.system_controller_ip = None # only gets set if DC system and only in the subcloud
if "system_controller_ip" in lab_dict:
self.system_controller_ip = lab_dict["system_controller_ip"]
self.system_controller_name = None # only gets set if DC system and only in the subcloud
if "system_controller_name" in lab_dict:
self.system_controller_name = lab_dict["system_controller_name"]
self.lab_config_file = config
def get_floating_ip(self) -> str:
"""
Getter for floating ip
Returns:
str: The floating IP address
"""
return self.floating_ip
def set_floating_ip(self, floating_ip: str) -> None:
"""
Setter for floating ip
Args:
floating_ip (str): the host's floating ip
"""
self.floating_ip = floating_ip
# Update ipv4 vs ipv6 value.
self.ipv6 = True
if ":" not in self.floating_ip:
self.ipv6 = False
def get_lab_name(self) -> str:
"""
Getter for lab name
Returns:
str: The lab name
"""
return self.lab_name
def set_lab_name(self, lab_name: str) -> None:
"""
Setter for lab name
Args:
lab_name (str): the lab name
"""
self.lab_name = lab_name
def get_lab_type(self) -> str:
"""
Getter for lab type
Returns:
str: The lab type
"""
return self.lab_type
def set_lab_type(self, lab_type: str) -> None:
"""
Setter for lab type
Args:
lab_type (str): The lab type
"""
self.lab_type = lab_type
def get_admin_credentials(self) -> Credentials:
"""
Getter for admin credentials
Returns:
Credentials: The admin credentials object
"""
return self.admin_credentials
def get_horizon_credentials(self) -> Credentials:
"""
Getter for Horizon credentials
Returns:
Credentials: The Horizon credentials object
"""
return self.horizon_credentials
def get_rest_credentials(self) -> Credentials:
"""
Getter for Rest credentials
Returns:
Credentials: The REST API credentials object
"""
return self.rest_credentials
def set_nodes(self, nodes: List[Node]) -> None:
"""
Sets the labs nodes (clear any nodes already created).
Args:
nodes (List[Node]): The nodes to set
"""
self.nodes = nodes
def get_nodes(self) -> List[Node]:
"""
Get all lab nodes.
Returns:
List[Node]: List of all nodes in the lab
"""
return self.nodes
def get_node(self, lab_node_name: str) -> Node:
"""
Get a lab node by name.
Args:
lab_node_name (str): The name of the lab node ex. Controller-0
Returns:
Node: The lab node with the given name
"""
for node in self.nodes:
if node.get_name() == lab_node_name:
return node
return None
def get_computes(self) -> List[Node]:
"""
Getter for compute nodes.
Returns:
List[Node]: List of compute nodes
"""
nodes = self.get_nodes()
computes = [node for node in nodes if node.node_type == "worker"]
return computes
def get_compute(self, compute_name: str) -> Optional[Node]:
"""
Retrieve an instance of Node whose type is 'Compute' and name is specified by the argument 'compute_name'.
Args:
compute_name (str): the name of the 'Compute' node.
Returns:
Optional[Node]: The compute node if found, None otherwise
"""
computes = self.get_computes()
compute = [compute_node for compute_node in computes if compute_node.get_name() == compute_name]
if len(compute) > 0:
return compute[0]
else:
return None
def is_ipv6(self) -> bool:
"""
Return True is lab is ipv6, False otherwise
Returns:
bool: True if IPv6 is used, False otherwise
"""
return self.ipv6
def get_subclouds(self) -> List["LabConfig"]:
"""
Getter for subclouds
Returns:
List[LabConfig]: List of subcloud configurations
"""
return self.subclouds
def set_subclouds(self, subclouds: List["LabConfig"]) -> None:
"""
Setter for subcloud
Args:
subclouds (List[LabConfig]): List of subcloud configurations
"""
self.subclouds = subclouds
def get_subcloud(self, subcloud_name: str) -> Optional["LabConfig"]:
"""
Get a subcloud configuration by name.
Args:
subcloud_name (str): Name of the subcloud to retrieve
Returns:
Optional[LabConfig]: The subcloud configuration if found, None otherwise
"""
for subcloud in self.subclouds:
if subcloud.get_lab_name() == subcloud_name:
return subcloud
return None
def get_subcloud_names(self) -> List[str]:
"""
Getter for the names of the subclouds in the list of subclouds in LabConfig file.
Args: None
Returns:
List[str]: List of subcloud names
"""
return [subcloud.get_lab_name() for subcloud in self.subclouds]
def get_horizon_url(self):
"""
Getter for Horizon URL
Returns: The URL to connect to Horizon
"""
return self.horizon_url
def set_horizon_url(self, url: str) -> None:
"""
Setter for horizon url
Args:
url (str): The Horizon URL to set
"""
self.horizon_url = url
def is_dc(self) -> bool:
"""
Getter for is dc
Returns:
bool: true if it's a dc, false otherwise
"""
return self.is_dc_lab
def is_use_jump_server(self) -> bool:
"""
Getter for use jump server
Returns:
bool: True if jump server should be used, False otherwise
"""
return self.use_jump_server
def get_jump_host_configuration(self) -> HostConfiguration:
"""
Getter for jump host configuration
Returns:
HostConfiguration: The jump host configuration
"""
return self.jump_server_config
def get_ssh_port(self) -> int:
"""
Getter for the SSH port.
Returns:
int: The SSH port number
"""
return self.ssh_port
def get_lab_capabilities(self) -> List[str]:
"""
Getter for lab capabilities
Returns:
List[str]: List of lab capabilities
"""
return self.lab_capabilities
def add_lab_capability(self, capability: str) -> None:
"""
Setter for lab capability
Args:
capability (str): The capability to add
"""
if capability not in self.lab_capabilities:
self.lab_capabilities.append(capability)
def get_lab_config_file(self) -> str:
"""
Getter for lab config file
Returns:
str: Path to the lab configuration file
"""
return self.lab_config_file
def set_lab_config_file(self, lab_config_file: str) -> None:
"""
Setter for lab config file
Args:
lab_config_file (str): Path to the lab configuration file
"""
self.lab_config_file = lab_config_file
def get_bm_password(self) -> str:
"""
Getter for bm password
Returns:
str: The bm password
"""
return self.bm_password
def get_system_controller_ip(self) -> str:
"""
Get the system controller IP address.
Returns:
str: The system controller IP address
"""
return self.system_controller_ip
def set_system_controller_ip(self, system_controller_ip: str) -> None:
"""
Set system controller IP address.
Args:
system_controller_ip (str): The system controller IP address
"""
self.system_controller_ip = system_controller_ip
def get_system_controller_name(self) -> str:
"""
Get the system controller name.
Returns:
str: The system controller name
"""
return self.system_controller_name
def set_system_controller_name(self, system_controller_name: str) -> None:
"""
Setter for system_controller_name
Args:
system_controller_name (str): the system_controller_name
"""
self.system_controller_name = system_controller_name
def to_log_strings(self) -> List[str]:
"""
Convert lab configuration to a list of loggable strings.
Returns:
List[str]: List of strings representing the lab configuration
"""
log_strings = []
log_strings.append(f"lab_name: {self.get_lab_name()}")
log_strings.append(f"lab_type: {self.get_lab_type()}")
log_strings.append(f"floating_ip: {self.get_floating_ip()}")
log_strings.append(f"is_ipv6: {self.is_ipv6()}")
log_strings.append(f"ssh_port: {self.get_ssh_port()}")
log_strings.append(f"admin_credentials: {self.get_admin_credentials().to_string()}")
log_strings.append(f"horizon_url: {self.get_horizon_url()}")
log_strings.append(f"horizon_credentials: {self.get_horizon_credentials().to_string()}")
log_strings.append(f"is_dc: {self.is_dc()}")
log_strings.append(f"use_jump_server: {self.is_use_jump_server()}")
if self.use_jump_server:
for log_string in self.get_jump_host_configuration().to_log_strings():
log_strings.append(f" {log_string}")
for node in self.get_nodes():
for log_string in node.to_log_strings():
log_strings.append(log_string)
for subcloud in self.get_subclouds():
log_strings.append("Subcloud")
for log_string in subcloud.to_log_strings():
log_strings.append(f" {log_string}")
return log_strings
def get_controllers(self) -> List[Node]:
"""
Getter for controller nodes.
Returns:
List[Node]: List of controller nodes
"""
nodes = self.get_nodes()
controllers = [node for node in nodes if node.node_type == "controller"]
return controllers
def get_subclouds_by_type(self, subcloud_type: str = None) -> List["LabConfig"]:
"""
Get a list of subcloud configurations by type.
Args:
subcloud_type (str): Type of the subcloud to retrieve
Returns:
List[LabConfig]: List of subcloud configurations of the specified type
"""
if subcloud_type is None:
return self.subclouds
return [subcloud for subcloud in self.subclouds if subcloud.get_lab_type().lower() == subcloud_type.lower()]
def get_subclouds_name_by_type(self, subcloud_type: str = None) -> List[str]:
"""
Get a list of subcloud names by type.
Args:
subcloud_type (str): Type of the subcloud to retrieve
Returns:
List[str]: List of subcloud names of the specified type
"""
if subcloud_type is None:
return [subcloud.get_lab_name() for subcloud in self.subclouds]
return [subcloud.get_lab_name() for subcloud in self.get_subclouds_by_type(subcloud_type)]
def get_first_controller(self) -> Optional[Node]:
"""Get the first controller node.
Returns:
Optional[Node]: The first controller node if found, None otherwise
"""
controllers = self.get_controllers()
if len(controllers) > 0:
return controllers[0]
return None