Merge "pynetlink - Add support to configure pin via Netlink"

This commit is contained in:
Zuul
2025-11-18 19:11:18 +00:00
committed by Gerrit Code Review
2 changed files with 236 additions and 2 deletions

View File

@@ -13,6 +13,7 @@ from ..common import NetlinkException
from ..common import NetlinkFactory
from .constants import DeviceFields
from .constants import PinFields
from .constants import PinParentFields
from .devices import DpllDevice
from .devices import DpllDevices
from .pins import DpllPins
@@ -23,6 +24,7 @@ class DPLLCommands:
__slots__ = ()
DEVICE_GET = 'device-get'
PIN_GET = 'pin-get'
PIN_SET = 'pin-set'
class NetlinkDPLL:
@@ -177,3 +179,73 @@ class NetlinkDPLL:
"""
return DpllPins.loadPins(self._get_all_devices(), self._get_all_pins())
def _set_pin(self, pin_info: dict):
"""Change the pin configuration.
Changes the pin configuration according to the given pin information.
Not all pin field are mutable, use _get_pin_by_id() to print the pin
capabilities, and check which pin fields can be changed.
Raises NetlinkException class in case of error.
"""
try:
result = self._ynl.do(DPLLCommands.PIN_SET, pin_info)
return result
except NlError as e:
raise NetlinkException(e)
def set_pin_direction(self, pin_id: int, pin_direction: int):
"""Change pin direction.
Change the pin direction. All pin parents are changed at the same time.
Raises NetlinkException class in case of error.
"""
pin_info = {
PinFields.ID: pin_id,
PinFields.PARENT_DEVICE: []
}
# get pin's parent ids, and set the direction in all of them.
pins = self.get_pins_by_id(pin_id)
parent_ids = list(pin.dev_id for pin in pins)
for parent_id in parent_ids:
pin_info[PinFields.PARENT_DEVICE].append(
{
PinParentFields.PARENT_ID: parent_id,
PinParentFields.DIRECTION: pin_direction
}
)
return self._set_pin(pin_info)
def set_pin_priority(self, pin_id: int, pin_priority: int):
"""Change pin priority
Change the pin priority. All pin parents are changed at the same time.
Raises NetlinkException class in case of error.
"""
pin_info = {
PinFields.ID: pin_id,
PinFields.PARENT_DEVICE: []
}
# get pin's parent ids, and set the priority in all of them.
pins = self.get_pins_by_id(pin_id)
parent_ids = list(pin.dev_id for pin in pins)
for parent_id in parent_ids:
pin_info[PinFields.PARENT_DEVICE].append(
{
PinParentFields.PARENT_ID: parent_id,
PinParentFields.PRIORITY: pin_priority
}
)
return self._set_pin(pin_info)

View File

@@ -23,6 +23,7 @@ from ..dpll import DpllDevice
from ..dpll import DeviceFields
from ..dpll import DpllPins
from ..dpll import PinFields
from ..dpll import PinParentFields
class NetlinkDPLLTestCase(TestCase):
@@ -157,6 +158,9 @@ class NetlinkDPLLTestCase(TestCase):
f'#items: Should be {len(expected_device_ids)} Got {len(devices)}')
for device in devices:
self.assertEqual(device.dev_clock_id,
clock_id,
f"Wrong clock ID: Expected {clock_id}. Got {device.dev_clock_id}")
self.assertIn(device.dev_id,
expected_device_ids,
f'Contains wrong device: {device.dev_id}')
@@ -242,8 +246,8 @@ class NetlinkDPLLTestCase(TestCase):
@patch.object(YnlFamily, 'dump')
def test_get_pins_with_specific_id(self, mock_ynl_dump: Mock, mock_ynl_do: Mock):
pin_pict_idx = random.randint(0, 8)
pin_dict = self.dump_get_pins[pin_pict_idx]
pin_dict_idx = random.randint(0, 8)
pin_dict = self.dump_get_pins[pin_dict_idx]
pin_id = pin_dict[PinFields.ID]
mock_ynl_dump.return_value = self.dump_get_devices
@@ -283,6 +287,9 @@ class NetlinkDPLLTestCase(TestCase):
f'Should return DpllPins instance. Got {type(pins)}')
for pin in pins:
self.assertEqual(pin.dev_clock_id,
clock_id,
f"Wrong clock ID: Expected {clock_id}. Got {pin.dev_clock_id}")
self.assertIn(pin.pin_id,
expected_pin_ids,
f'Contains wrong pin: {pin.pin_id}')
@@ -311,3 +318,158 @@ class NetlinkDPLLTestCase(TestCase):
call(DPLLCommands.DEVICE_GET, {}),
call(DPLLCommands.PIN_GET, {})
])
@patch.object(YnlFamily, 'do')
def test_set_pin(self, mock_ynl_do: Mock):
expected_result = None
mock_ynl_do.return_value = expected_result
pin_info = {
'id': 34,
'parent-device': [
{
'parent-id': 2,
'direction': 1,
'priority': 2
},
{
'parent-id': 3,
'direction': 1,
'priority': 2
}
]
}
dpll = NetlinkDPLL()
result = dpll._set_pin(pin_info) # pylint: disable=W0212
self.assertEqual(expected_result,
result,
f'Wrong _set_pin result: '
f'Expected {expected_result}. Got {result}')
mock_ynl_do.assert_called_once_with(DPLLCommands.PIN_SET, pin_info)
@patch.object(YnlFamily, 'do')
@patch.object(YnlFamily, 'dump')
def test_set_pin_direction(self, mock_ynl_dump: Mock, mock_ynl_do: Mock):
pin_dict_idx = random.randint(0, 8)
pin_dict = self.dump_get_pins[pin_dict_idx]
pin_id = pin_dict[PinFields.ID]
pin_direction = random.randint(0, 2)
parent_ids = list(parent[PinParentFields.PARENT_ID]
for parent in pin_dict[PinFields.PARENT_DEVICE])
expected_result = None
mock_ynl_dump.return_value = self.dump_get_devices
mock_ynl_do.side_effect = [pin_dict, None]
dpll = NetlinkDPLL()
result = dpll.set_pin_direction(pin_id, pin_direction)
self.assertEqual(expected_result,
result,
f'Wrong _set_pin result: '
f'Expected {expected_result}. Got {result}')
mock_ynl_dump.assert_called_once_with(DPLLCommands.DEVICE_GET, {})
# Verify the do pin-set command call.
args, _ = mock_ynl_do.call_args
call_command = args[0]
self.assertEqual(DPLLCommands.PIN_SET,
call_command,
f'Wrong do command: '
f'Expected {DPLLCommands.PIN_SET}. Got {call_command}')
call_args = args[1]
sorted_call_args = {
PinFields.ID: call_args[PinFields.ID],
PinFields.PARENT_DEVICE: sorted(
call_args[PinFields.PARENT_DEVICE],
key=lambda x: x[PinParentFields.PARENT_ID]
)
}
expected_call_args = {
PinFields.ID: pin_id,
PinFields.PARENT_DEVICE: list(
{
PinParentFields.PARENT_ID: id,
PinParentFields.DIRECTION: pin_direction
}
for id in sorted(parent_ids)
)
}
self.assertDictEqual(sorted_call_args,
expected_call_args,
f'Wrong call arguments: '
f'Expected {expected_call_args}. Got {sorted_call_args}')
@patch.object(YnlFamily, 'do')
@patch.object(YnlFamily, 'dump')
def test_set_pin_priority(self, mock_ynl_dump: Mock, mock_ynl_do: Mock):
pin_dict_idx = random.randint(0, 8)
pin_dict = self.dump_get_pins[pin_dict_idx]
pin_id = pin_dict[PinFields.ID]
pin_priority = random.randint(0, 7)
pin_info = {
PinFields.ID: pin_id,
PinFields.PARENT_DEVICE: []
}
parent_ids = list(parent[PinParentFields.PARENT_ID]
for parent in pin_dict[PinFields.PARENT_DEVICE])
for parent_id in parent_ids:
pin_info[PinFields.PARENT_DEVICE].append(
{
PinParentFields.PARENT_ID: parent_id,
PinParentFields.PRIORITY: pin_priority
}
)
expected_result = None
mock_ynl_dump.return_value = self.dump_get_devices
mock_ynl_do.side_effect = [pin_dict, None]
dpll = NetlinkDPLL()
result = dpll.set_pin_priority(pin_id, pin_priority)
self.assertEqual(expected_result,
result,
f'Wrong _set_pin result: '
f'Expected {expected_result}. Got {result}')
mock_ynl_dump.assert_called_once_with(DPLLCommands.DEVICE_GET, {})
# Verify the do pin-set command call.
args, _ = mock_ynl_do.call_args
call_command = args[0]
# Verify the command is pin-set
self.assertEqual(DPLLCommands.PIN_SET,
call_command,
f'Wrong do command: '
f'Expected {DPLLCommands.PIN_SET}. Got {call_command}')
# Verify the pin id
call_args = args[1]
sorted_call_args = {
PinFields.ID: call_args[PinFields.ID],
PinFields.PARENT_DEVICE: sorted(
call_args[PinFields.PARENT_DEVICE],
key=lambda x: x[PinParentFields.PARENT_ID]
)
}
expected_call_args = {
PinFields.ID: pin_id,
PinFields.PARENT_DEVICE: list(
{
PinParentFields.PARENT_ID: id,
PinParentFields.PRIORITY: pin_priority
}
for id in sorted(parent_ids)
)
}
self.assertDictEqual(sorted_call_args,
expected_call_args,
f'Wrong call arguments: '
f'Expected {expected_call_args}. Got {sorted_call_args}')