Removed hardcoded hostnames in PTP verify config and optimized the code.

Change-Id: Ib80fac80d8e81d7c44ef9ba1d1f9fee10a47ed38
Signed-off-by: Guntaka Umashankar Reddy <umashankarguntaka.reddy@windriver.com>
This commit is contained in:
Guntaka Umashankar Reddy
2025-05-06 11:14:26 -04:00
parent f087fd4f2b
commit c7beef2658
6 changed files with 190 additions and 201 deletions

View File

@@ -6,6 +6,7 @@ from framework.validation.validation import validate_equals, validate_equals_wit
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.fault_management.alarms.alarm_list_keywords import AlarmListKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.host.system_host_list_keywords import SystemHostListKeywords
from keywords.linux.systemctl.systemctl_status_keywords import SystemCTLStatusKeywords
from keywords.ptp.cat.cat_clock_conf_keywords import CatClockConfKeywords
from keywords.ptp.cat.cat_ptp_cgu_keywords import CatPtpCguKeywords
@@ -34,6 +35,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
self.ssh_connection = ssh_connection
ptp_setup_keywords = PTPSetupKeywords()
ptp_setup = ptp_setup_keywords.generate_ptp_setup_from_template(ptp_setup_template_path)
self.ptp_setup = ptp_setup
self.ptp4l_setup_list = ptp_setup.get_ptp4l_setup_list()
self.phc2sys_setup_list = ptp_setup.get_phc2sys_setup_list()
@@ -46,19 +48,17 @@ class PTPVerifyConfigKeywords(BaseKeyword):
self.expected_grandmaster_settings_tbc_object = ptp_setup.get_grandmaster_settings_tbc()
self.expected_grandmaster_settings_tgm_object = ptp_setup.get_grandmaster_settings_tgm()
self.ctrl0_hostname = "controller-0"
self.ctrl1_hostname = "controller-1"
self.comp0_hostname = "compute-0"
def verify_all_ptp_configurations(self) -> None:
"""
verify all ptp configurations
Returns: None
"""
self.verify_gnss_status()
hosts = SystemHostListKeywords(self.ssh_connection).get_system_host_list().get_controllers_and_computes()
self.verify_sma_status()
self.verify_gnss_status(hosts)
self.verify_sma_status(hosts)
self.verify_systemctl_status()
@@ -68,54 +68,48 @@ class PTPVerifyConfigKeywords(BaseKeyword):
validate_equals_with_retry(self.no_alarms, True, "Validate that no alarms on the system", 300)
def verify_gnss_status(self) -> None:
def verify_gnss_status(self, hosts: list) -> None:
"""
verify GNSS status
Args:
hosts (list): list of controllers and computes
Returns: None
"""
gnss_keywords = GnssKeywords()
for ts2phc_instance_obj in self.ts2phc_setup_list:
ptp_host_ifs = ts2phc_instance_obj.get_ptp_interfaces()
instance_parameters = ts2phc_instance_obj.get_instance_parameters()
expected_gnss_port = gnss_keywords.extract_gnss_port(instance_parameters)
expected_gnss_port = gnss_keywords.extract_gnss_port(ts2phc_instance_obj.get_instance_parameters())
if not expected_gnss_port: # No need to verify GNSS status if ts2phc.nmea_serialport not configured
get_logger().log_info("Validation skipped as expected; GNSS port is None")
continue
for ptp_host_if in ptp_host_ifs:
for hostname, get_interfaces in [
(self.ctrl0_hostname, ptp_host_if.get_controller_0_interfaces),
(self.ctrl1_hostname, ptp_host_if.get_controller_1_interfaces),
(self.comp0_hostname, ptp_host_if.get_compute_0_interfaces),
]:
for interface in get_interfaces():
if interface:
self.validate_gnss_status_on_hostname(hostname, interface, expected_gnss_port)
for ptp_host_if in ts2phc_instance_obj.get_ptp_interfaces():
for host in hosts:
interfaces = ptp_host_if.get_interfaces_for_hostname(host.get_host_name())
for interface in filter(None, interfaces): # Skip None or empty values
self.validate_gnss_status_on_hostname(host.get_host_name(), interface, expected_gnss_port)
def verify_sma_status(self) -> None:
def verify_sma_status(self, hosts: list) -> None:
"""
verify SMA status
Args:
hostnames (list): list of controllers and computes
Returns: None
"""
for clock_instance_obj in self.clock_setup_list:
ptp_host_ifs = clock_instance_obj.get_ptp_interfaces()
for ptp_host_if in clock_instance_obj.get_ptp_interfaces():
if "input" not in ptp_host_if.get_ptp_interface_parameter():
continue
for ptp_host_if in ptp_host_ifs:
ptp_interface_parameters = ptp_host_if.get_ptp_interface_parameter()
if "input" in ptp_interface_parameters:
for hostname, get_interfaces in [
(self.ctrl0_hostname, ptp_host_if.get_controller_0_interfaces),
(self.ctrl1_hostname, ptp_host_if.get_controller_1_interfaces),
(self.comp0_hostname, ptp_host_if.get_compute_0_interfaces),
]:
for interface in get_interfaces():
if interface:
self.validate_sma_status_on_hostname(hostname, interface)
for host in hosts:
interfaces = ptp_host_if.get_interfaces_for_hostname(host.get_host_name())
for interface in filter(None, interfaces):
self.validate_sma_status_on_hostname(host.get_host_name(), interface)
def verify_systemctl_status(self) -> None:
"""
@@ -169,23 +163,34 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Returns: None
"""
port_data_set = self.get_port_data_set_using_interface_and_port_identity_mapping()
validate_parent = bool(self.expected_parent_data_set_object)
validate_time_props = bool(self.expected_time_properties_data_set_object)
if not validate_parent:
get_logger().log_info("Validation skipped as expected; expected_parent_data_set_object is None")
if not validate_time_props:
get_logger().log_info("Validation skipped as expected; expected_time_properties_data_set_object is None")
for ptp4l_instance_obj in self.ptp4l_setup_list:
name = ptp4l_instance_obj.get_name()
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
hostnames = ptp4l_instance_obj.get_instance_hostnames()
instance_parameters = ptp4l_instance_obj.get_instance_parameters()
ptp_role = next((obj.get_ptp_role() for obj in self.ptp4l_expected_list_objects if obj.get_name() == name), None)
for hostname in hostnames:
ptp_role = self.ptp_setup.get_ptp4l_expected_by_name(name).get_ptp_role()
for hostname in ptp4l_instance_obj.get_instance_hostnames():
self.validate_port_data_set(hostname, name, config_file, socket_file)
self.validate_get_domain(hostname, instance_parameters, config_file, socket_file)
self.validate_parent_data_set(hostname, name, port_data_set, config_file, socket_file)
if validate_parent:
self.validate_parent_data_set(hostname, name, port_data_set, config_file, socket_file)
self.validate_time_properties_data_set(hostname, config_file, socket_file)
if validate_time_props:
self.validate_time_properties_data_set(hostname, config_file, socket_file)
self.validate_grandmaster_settings_np(hostname, ptp_role, config_file, socket_file)
@@ -325,6 +330,11 @@ class PTPVerifyConfigKeywords(BaseKeyword):
instance_parameters = ptp_instance_obj.get_instance_parameters()
parameters = self.parse_instance_parameters_string(instance_parameters)
expected_ts2phc_nmea_serialport = parameters.get("ts2phc.nmea_serialport")
if expected_ts2phc_nmea_serialport:
observed_expected_ts2phc_nmea_serialport = get_pmc_get_default_data_set_object.get_ts2phc_nmea_serialport()
validate_equals(observed_expected_ts2phc_nmea_serialport, expected_ts2phc_nmea_serialport, "ts2phc.nmea_serialport value within PTP config file content")
expected_boundary_clock_jbod = parameters.get("boundary_clock_jbod")
if expected_boundary_clock_jbod:
observed_boundary_clock_jbod = get_pmc_get_default_data_set_object.get_boundary_clock_jbod()
@@ -355,13 +365,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
observed_tx_timestamp_timeout = get_pmc_get_default_data_set_object.get_tx_timestamp_timeout()
validate_equals(observed_tx_timestamp_timeout, expected_tx_timestamp_timeout, "tx_timestamp_timeout value within PTP config file content")
interfaces_getter = {
"controller-0": lambda x: x.get_controller_0_interfaces(),
"controller-1": lambda x: x.get_controller_1_interfaces(),
"compute-0": lambda x: x.get_compute_0_interfaces(),
}.get(hostname, lambda x: [])
expected_associated_interfaces = [interface for ptp_host_if in ptp_instance_obj.get_ptp_interfaces() for interface in interfaces_getter(ptp_host_if) if interface] # Avoid empty interface names
expected_associated_interfaces = [interface for ptp_host_if in ptp_instance_obj.get_ptp_interfaces() for interface in ptp_host_if.get_interfaces_for_hostname(hostname) if interface]
observed_associated_interfaces = cat_ptp_config_output.get_associated_interfaces()
validate_equals(observed_associated_interfaces, expected_associated_interfaces, "Associated interfaces within PTP config file content")
@@ -376,53 +380,36 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Validates the ptp config file content for clock.
Args:
ptp_instance_obj (Any) : PTP instance setup object
ptp_instance_obj (Any): PTP instance setup object
hostname (str): The name of the host.
config_file (str): the config file.
Returns: None
config_file (str): The config file.
Raises:
Exception: raised when validate fails
Exception: raised when validation fails
"""
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
ssh_connection = LabConnectionKeywords().get_ssh_for_hostname(hostname)
cat_ptp_config_output = CatClockConfKeywords(ssh_connection).cat_clock_conf(config_file)
observed_clock_conf_objects = cat_ptp_config_output.get_clock_conf_objects()
cat_ptp_config_keywords = CatClockConfKeywords(ssh_connection)
cat_ptp_config_output = cat_ptp_config_keywords.cat_clock_conf(config_file)
get_clock_conf_objects = cat_ptp_config_output.get_clock_conf_objects()
expected_config_by_ifname = {", ".join(sorted(interfaces)): ptp_if.get_ptp_interface_parameter() for ptp_if in ptp_instance_obj.get_ptp_interfaces() if (interfaces := ptp_if.get_interfaces_for_hostname(hostname))}
expected_clock_config = []
interfaces_getter = {
"controller-0": lambda x: x.get_controller_0_interfaces(),
"controller-1": lambda x: x.get_controller_1_interfaces(),
"compute-0": lambda x: x.get_compute_0_interfaces(),
}.get(hostname, lambda x: [])
# Validate each observed config against expected
for observed_clock_conf_obj in observed_clock_conf_objects:
observed_ifname = ", ".join(sorted(observed_clock_conf_obj.get_ifname().split(", ")))
observed_sma_name = observed_clock_conf_obj.get_sma_name()
observed_sma_mode = observed_clock_conf_obj.get_sma_mode()
for ptp_host_if in ptp_instance_obj.get_ptp_interfaces():
interfaces = interfaces_getter(ptp_host_if)
if interfaces:
expected_clock_config.append(
{
"ifname": ", ".join(interfaces),
"ptp_interface_parameter": ptp_host_if.get_ptp_interface_parameter(),
}
)
expected_params = expected_config_by_ifname.get(observed_ifname)
if expected_params is None:
raise Exception(f"No expected PTP config found for observed ifname: {observed_ifname}")
for index, clock_conf_obj in enumerate(get_clock_conf_objects):
observed_ifname = clock_conf_obj.get_ifname()
observed_sma_name = clock_conf_obj.get_sma_name()
observed_sma_mode = clock_conf_obj.get_sma_mode()
if "=" not in expected_params:
raise Exception(f"Expected parameter '{expected_params}' is not in 'name=mode' format")
if index >= len(expected_clock_config):
raise Exception("Observed clock index is greater than expected clock list index")
expected_sma_name, expected_sma_mode = expected_params.split("=", 1)
expected_ifname = expected_clock_config[index].get("ifname")
expected_ptp_interface_parameter = expected_clock_config[index].get("ptp_interface_parameter")
validate_equals(observed_ifname, expected_ifname, "ifname value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_name, expected_ptp_interface_parameter, "sma name value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_mode, expected_ptp_interface_parameter, "sma mode value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_name, expected_sma_name, "sma name value within PTP config file content for clock-conf.conf")
validate_list_contains(observed_sma_mode, expected_sma_mode, "sma mode value within PTP config file content for clock-conf.conf")
def validate_port_data_set(
self,
@@ -449,28 +436,15 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
ptp4l_expected_obj = self.ptp_setup.get_ptp4l_expected_by_name(name)
expected_port_data_set_objects = ptp4l_expected_obj.get_port_data_set_for_hostname(hostname)
observed_port_data_set_objects = pmc_keywords.pmc_get_port_data_set(config_file, socket_file).get_pmc_get_port_data_set_objects()
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
if len(observed_port_data_set_objects) > len(expected_port_data_set_objects):
raise Exception("Observed port data set objects contains more entries than expected port data set objects")
get_port_data_set_output = pmc_keywords.pmc_get_port_data_set(config_file, socket_file)
get_pmc_get_port_data_set_objects = get_port_data_set_output.get_pmc_get_port_data_set_objects()
for index, get_pmc_get_port_data_set_object in enumerate(get_pmc_get_port_data_set_objects):
if index >= len(expected_port_data_set_objects):
raise Exception("Observed port data set index is greater than expected port data set objects index")
expected_port_state = expected_port_data_set_objects[index].get_port_state()
observed_port_state = get_pmc_get_port_data_set_object.get_port_state()
validate_equals(observed_port_state, expected_port_state, "portState value within GET PORT_DATA_SET")
for expected_port_data_set_obj, observed_port_data_set_obj in zip(expected_port_data_set_objects, observed_port_data_set_objects):
validate_equals(observed_port_data_set_obj.get_port_state(), expected_port_data_set_obj.get_port_state(), "portState value within GET PORT_DATA_SET")
def validate_get_domain(
self,
@@ -533,50 +507,37 @@ class PTPVerifyConfigKeywords(BaseKeyword):
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
expected_gm_clock_class = self.expected_parent_data_set_object.get_gm_clock_class()
expected_gm_clock_accuracy = self.expected_parent_data_set_object.get_gm_clock_accuracy()
expected_gm_offset_scaled_log_variance = self.expected_parent_data_set_object.get_gm_offset_scaled_log_variance()
parent_data_set_obj = pmc_keywords.pmc_get_parent_data_set(config_file, socket_file).get_pmc_get_parent_data_set_object()
get_parent_data_set_output = pmc_keywords.pmc_get_parent_data_set(config_file, socket_file)
get_parent_data_set_object = get_parent_data_set_output.get_pmc_get_parent_data_set_object()
observed_parent_port_identity = get_parent_data_set_object.get_parent_port_identity()
observed_gm_clock_class = get_parent_data_set_object.get_gm_clock_class()
observed_gm_clock_accuracy = get_parent_data_set_object.get_gm_clock_accuracy()
observed_gm_offset_scaled_log_variance = get_parent_data_set_object.get_gm_offset_scaled_log_variance()
validate_equals(observed_gm_clock_class, expected_gm_clock_class, "gm.ClockClass value within GET PARENT_DATA_SET")
validate_equals(observed_gm_clock_accuracy, expected_gm_clock_accuracy, "gm.ClockAccuracy value within GET PARENT_DATA_SET")
validate_equals(observed_gm_offset_scaled_log_variance, expected_gm_offset_scaled_log_variance, "gm.OffsetScaledLogVariance value within GET PARENT_DATA_SET")
validate_equals(parent_data_set_obj.get_gm_clock_class(), self.expected_parent_data_set_object.get_gm_clock_class(), "gm.ClockClass value within GET PARENT_DATA_SET")
validate_equals(parent_data_set_obj.get_gm_clock_accuracy(), self.expected_parent_data_set_object.get_gm_clock_accuracy(), "gm.ClockAccuracy value within GET PARENT_DATA_SET")
validate_equals(parent_data_set_obj.get_gm_offset_scaled_log_variance(), self.expected_parent_data_set_object.get_gm_offset_scaled_log_variance(), "gm.OffsetScaledLogVariance value within GET PARENT_DATA_SET")
# Validates the parentPortIdentity of the SLAVE's PARENT_DATA_SET against the portIdentity of the MASTER's PORT_DATA_SET.
if not port_data_set:
return
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
ptp4l_expected_obj = self.ptp_setup.get_ptp4l_expected_by_name(name)
expected_port_data_set_objects = ptp4l_expected_obj.get_port_data_set_for_hostname(hostname)
observed_parent_port_identity = parent_data_set_obj.get_parent_port_identity()
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
for expected_port_data_set_obj in expected_port_data_set_objects:
for expected_port_data_set_object in expected_port_data_set_objects:
expected_parent_port_identity_dict = expected_port_data_set_object.get_parent_port_identity()
if expected_parent_port_identity_dict:
parent_instance_name = expected_parent_port_identity_dict.get("name")
parent_hostname = expected_parent_port_identity_dict.get("hostname")
parent_interface = expected_parent_port_identity_dict.get("interface")
parent_port_identity_info = expected_port_data_set_obj.get_parent_port_identity()
if not parent_port_identity_info:
continue
if not all([parent_instance_name, parent_hostname, parent_interface]):
continue # Skip if any essential key is missing
parent_instance_name = parent_port_identity_info.get("name")
parent_hostname = parent_port_identity_info.get("hostname")
parent_interface = parent_port_identity_info.get("interface")
for observed_port_data_set in port_data_set:
if not all([parent_instance_name, parent_hostname, parent_interface]):
continue # Skip incomplete entries
for observed_port_data_set in port_data_set:
if observed_port_data_set.get("name") == parent_instance_name and observed_port_data_set.get("hostname") == parent_hostname and parent_interface in observed_port_data_set:
expected_port_identity = observed_port_data_set.get(parent_interface)
if observed_port_data_set.get("name") == parent_instance_name and observed_port_data_set.get("hostname") == parent_hostname and expected_port_identity:
validate_equals(observed_parent_port_identity, expected_port_identity, "Parent port identity matches the master port identity")
validate_equals(observed_parent_port_identity, expected_port_identity, "Parent port identity matches the master port identity")
def validate_time_properties_data_set(
self,
@@ -639,10 +600,13 @@ class PTPVerifyConfigKeywords(BaseKeyword):
Raises:
Exception: raised when validate fails
"""
if ptp_role == "tgm":
expected_grandmaster_settings_object = self.expected_grandmaster_settings_tgm_object
else:
expected_grandmaster_settings_object = self.expected_grandmaster_settings_tbc_object
expected_grandmaster_settings_object_map = {"tgm": self.expected_grandmaster_settings_tgm_object, "tbc": self.expected_grandmaster_settings_tbc_object}
expected_grandmaster_settings_object = expected_grandmaster_settings_object_map.get(ptp_role)
if not expected_grandmaster_settings_object:
get_logger().log_info(f"Validation skipped as expected; expected_grandmaster_settings_{ptp_role}_object is None")
return
expected_clock_class = expected_grandmaster_settings_object.get_clock_class()
expected_clock_accuracy = expected_grandmaster_settings_object.get_clock_accuracy()
@@ -679,7 +643,7 @@ class PTPVerifyConfigKeywords(BaseKeyword):
PARENT_DATA_SET against the portIdentity of the MASTER's PORT_DATA_SET.
Returns:
Dict: port data set using interface and port indentity mapping
Dict: port data set using interface and port identity mapping
"""
port_data_set_list = []
lab_connect_keywords = LabConnectionKeywords()
@@ -688,36 +652,27 @@ class PTPVerifyConfigKeywords(BaseKeyword):
name = ptp4l_instance_obj.get_name()
config_file = f"/etc/linuxptp/ptpinstance/ptp4l-{name}.conf"
socket_file = f"/var/run/ptp4l-{name}"
hostnames = ptp4l_instance_obj.get_instance_hostnames()
for hostname in hostnames:
ptp4l_expected_obj = self.ptp_setup.get_ptp4l_expected_by_name(name)
for hostname in hostnames:
ssh_connection = lab_connect_keywords.get_ssh_for_hostname(hostname)
pmc_keywords = PMCKeywords(ssh_connection)
port_data_set_dict = {}
port_data_set_dict["name"] = name
port_data_set_dict["hostname"] = hostname
expected_port_data_set_objects = ptp4l_expected_obj.get_port_data_set_for_hostname(hostname)
get_port_data_set_output = pmc_keywords.pmc_get_port_data_set(config_file, socket_file)
get_pmc_get_port_data_set_objects = get_port_data_set_output.get_pmc_get_port_data_set_objects()
observed_port_data_set_objects = get_port_data_set_output.get_pmc_get_port_data_set_objects()
for ptp4l_expected_object in self.ptp4l_expected_list_objects:
if ptp4l_expected_object.get_name() == name:
port_data_set_getter = {
"controller-0": ptp4l_expected_object.get_controller_0_port_data_set,
"controller-1": ptp4l_expected_object.get_controller_1_port_data_set,
"compute-0": ptp4l_expected_object.get_compute_0_port_data_set,
}.get(hostname)
break
if len(observed_port_data_set_objects) > len(expected_port_data_set_objects):
raise Exception("Observed port data set index exceeds expected port data set")
expected_port_data_set_objects = port_data_set_getter() if port_data_set_getter else None
port_data_set_dict = {"name": name, "hostname": hostname}
for index, get_pmc_get_port_data_set_object in enumerate(get_pmc_get_port_data_set_objects):
if index >= len(expected_port_data_set_objects):
raise Exception("Observed port data set index is greater than expected port data set objects index")
port_data_set_dict[expected_port_data_set_objects[index].get_interface()] = get_pmc_get_port_data_set_object.get_port_identity()
for observed_port_data_set_obj, expected_port_data_set_obj in zip(observed_port_data_set_objects, expected_port_data_set_objects):
interface = expected_port_data_set_obj.get_interface()
port_data_set_dict[interface] = observed_port_data_set_obj.get_port_identity()
port_data_set_list.append(port_data_set_dict)

View File

@@ -34,6 +34,9 @@ class DefaultDataSetOutput:
output_values = cat_ptp_table_parser.get_output_values_dict()
self.pmc_get_default_data_set_object = PMCGetDefaultDataSetObject()
if "ts2phc.nmea_serialport" in output_values:
self.pmc_get_default_data_set_object.set_ts2phc_nmea_serialport(output_values["ts2phc.nmea_serialport"])
if "twoStepFlag" in output_values:
self.pmc_get_default_data_set_object.set_two_step_flag(int(output_values["twoStepFlag"]))

View File

@@ -4,6 +4,7 @@ class PMCGetDefaultDataSetObject:
"""
def __init__(self):
self.ts2phc_nmea_serialport = ""
self.two_step_flag: int = -1
self.slave_only: int = -1
self.number_ports: int = -1
@@ -32,6 +33,26 @@ class PMCGetDefaultDataSetObject:
self.tx_timestamp_timeout: int = -1
self.uds_address: str = ""
def get_ts2phc_nmea_serialport(self) -> str:
"""
Getter for ts2phc_nmea_serialport
Returns:
str: ts2phc_nmea_serialport
"""
return self.ts2phc_nmea_serialport
def set_ts2phc_nmea_serialport(self, ts2phc_nmea_serialport: str):
"""
Setter for ts2phc_nmea_serialport
Args:
ts2phc_nmea_serialport (str): the ts2phc_nmea_serialport value
"""
self.ts2phc_nmea_serialport = ts2phc_nmea_serialport
def get_two_step_flag(self) -> int:
"""
Getter for two_step_flag
@@ -237,7 +258,7 @@ class PMCGetDefaultDataSetObject:
Getter for domain_number
Returns:
str: the domain_number value
int: the domain_number value
"""
return self.domain_number
@@ -247,7 +268,7 @@ class PMCGetDefaultDataSetObject:
Setter for domain_number
Args:
domain_number (str): the domain_number value
domain_number (int): the domain_number value
"""
self.domain_number = domain_number

View File

@@ -64,41 +64,23 @@ class PTP4LExpectedDict:
"""
return self.ptp_role
def get_controller_0_port_data_set(self) -> List[PortDataSet]:
def get_port_data_set_for_hostname(self, hostname) -> List[PortDataSet]:
"""
Gets the list of controller-0 port data set.
Gets the list of port data set for hostname.
Args:
hostname (str): The name of the host.
Returns:
List[PortDataSet]: The list of controller-0 port data set.
List[PortDataSet]: The list of port data set for hostname.
"""
port_data_set_list = []
for port_data_set in self.controller_0_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list
hostname_to_port_data_set = {
"controller-0": self.controller_0_port_data_set,
"controller-1": self.controller_1_port_data_set,
"compute-0": self.compute_0_port_data_set,
}.get(hostname)
def get_controller_1_port_data_set(self) -> List[PortDataSet]:
"""
Gets the list of controller-1 port data set.
if not hostname_to_port_data_set:
raise Exception(f"Expected port data set not found for hostname: {hostname}")
Returns:
List[PortDataSet]: The list of controller-1 port data set.
"""
port_data_set_list = []
for port_data_set in self.controller_1_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list
def get_compute_0_port_data_set(self) -> List[PortDataSet]:
"""
Gets the list of compute-0 port data set.
Returns:
List[PortDataSet]: The list of compute-0 port data set.
"""
port_data_set_list = []
for port_data_set in self.compute_0_port_data_set:
port_data_set_object = PortDataSet(port_data_set)
port_data_set_list.append(port_data_set_object)
return port_data_set_list
return [PortDataSet(port_data_set) for port_data_set in hostname_to_port_data_set]

View File

@@ -33,7 +33,7 @@ class PTPHostInterfaceSetup:
self.compute_0_interfaces = None
if "compute_0_interfaces" in setup_dict:
self.compute_0_interfaces = setup_dict.get("compute_0_interfaces")
def __str__(self):
"""
String representation of this object.
@@ -88,3 +88,20 @@ class PTPHostInterfaceSetup:
List[str]: The compute_0_interfaces of this ptp host interface setup.
"""
return self.compute_0_interfaces
def get_interfaces_for_hostname(self, hostname: str) -> List[str]:
"""
Gets the interfaces for the given hostname in this PTP host interface setup.
Args:
hostname (str): The name of the host.
Returns:
List[str]: The interfaces for the given hostname in this PTP host interface setup.
"""
interfaces_to_hostname_mapping = {
"controller-0": self.controller_0_interfaces,
"controller-1": self.controller_1_interfaces,
"compute-0": self.compute_0_interfaces,
}
return interfaces_to_hostname_mapping.get(hostname)

View File

@@ -72,12 +72,8 @@ class PTPSetup:
clock_setup = ClockSetup(clock_entry_dict, self.host_ptp_if_dict)
self.clock_setup_list.append(clock_setup)
expected_dict = setup_dict["expected_dict"]
if "ptp4l" in expected_dict:
ptp4l_list = expected_dict["ptp4l"]
for ptp4l_expected_dict in ptp4l_list:
ptp4l_expected = PTP4LExpectedDict(ptp4l_expected_dict)
self.ptp4l_expected_list.append(ptp4l_expected)
expected_dict = setup_dict.get("expected_dict", {})
self.ptp4l_expected_list.extend(PTP4LExpectedDict(item) for item in expected_dict.get("ptp4l", []))
if "parent_data_set" in expected_dict:
self.parent_data_set = ParentDataSet(expected_dict["parent_data_set"])
@@ -206,6 +202,21 @@ class PTPSetup:
"""
return self.ptp4l_expected_list
def get_ptp4l_expected_by_name(self, name: str) -> PTP4LExpectedDict:
"""
Getter for ptp4l expected by name.
Args:
name (str): The name of the instance.
Returns:
PTP4LExpectedDict: ptp4l expected by name
"""
ptp4l_expected_obj = next((obj for obj in self.ptp4l_expected_list if obj.get_name() == name), None)
if not ptp4l_expected_obj:
raise ValueError(f"No expected PTP4L object found for name: {name}")
return ptp4l_expected_obj
def get_parent_data_set(self) -> ParentDataSet:
"""
Getter for the parent data set.