Add port forwarding over usb to bmc settings

This enables manipluating the feature in XCCs to
allow network connections to forward over USB port.

Change-Id: I577d6952e0dab4b9a9df8b6118fc75ed0173d366
This commit is contained in:
Jarrod Johnson 2022-05-26 11:44:17 -04:00
parent 5f63810884
commit 5c32d4d11d
2 changed files with 174 additions and 0 deletions
pyghmi
ipmi/oem/lenovo
redfish/oem/lenovo

@ -944,6 +944,30 @@ class XCCClient(IMMClient):
settings['presence_assert'] = {
'value': 'Enable' if asserted else 'Disable'
}
usbparms = self.wc.grab_json_response('/api/dataset/imm_usb')
if usbparms:
usbparms = usbparms.get('items', [{}])[0]
if usbparms['usb_eth_over_usb_enabled'] == 1:
usbeth = 'Enable'
else:
usbeth = 'Disable'
settings['usb_ethernet'] = {
'value': usbeth
}
if usbparms['usb_eth_to_eth_enabled'] == 1:
fwd = 'Enable'
else:
fwd = 'Disable'
settings['usb_ethernet_port_forwarding'] = {
'value': fwd
}
mappings = []
for mapping in usbparms['usb_mapped_ports']:
src = mapping['ext_port']
dst = mapping['eth_port']
if src != 0 and dst != 0:
mappings.append('{0}:{1}'.format(src, dst))
settings['usb_forwarded_ports'] = {'value': ','.join(mappings)}
try:
enclosureinfo = self.ipmicmd.xraw_command(0x3a, 0xf1, data=[0])
except pygexc.IpmiException:
@ -984,6 +1008,7 @@ class XCCClient(IMMClient):
def set_bmc_configuration(self, changeset):
ruleset = {}
usbsettings = {}
for key in changeset:
if isinstance(changeset[key], six.string_types):
changeset[key] = {'value': changeset[key]}
@ -1014,11 +1039,73 @@ class XCCClient(IMMClient):
raise pygexc.InvalidParameterValue(
'"{0}" is not a recognized value for {1}'.format(
assertion, key))
elif key.lower() in (
'usb_ethernet', 'usb_ethernet_port_forwarding',
'usb_forwarded_ports'):
usbsettings[key] = changeset[key]['value']
else:
raise pygexc.InvalidParameterValue(
'{0} not a known setting'.format(key))
if ruleset:
self.wc.grab_json_response('/api/dataset', ruleset)
if usbsettings:
self.apply_usb_configuration(usbsettings)
def apply_usb_configuration(self, usbsettings):
def numify(val):
if 'enabled'.startswith(val.lower()):
return '1'
if 'disabled'.startswith(val.lower()):
return '0'
raise Exception('Usupported value')
usbparms = self.wc.grab_json_response('/api/dataset/imm_usb')
usbparms = usbparms.get('items', [{}])[0]
addrmode = '{0}'.format(usbparms['lan_over_usb_addr_mode'])
ethena = '{0}'.format(usbparms['usb_eth_over_usb_enabled'])
fwdena = '{0}'.format(usbparms['usb_eth_to_eth_enabled'])
newena = usbsettings.get('usb_ethernet', None)
newfwd = usbsettings.get('usb_ethernet_port_forwarding', None)
newsettings = {
'USB_LANOverUSBAddrMode': addrmode,
'USB_EthOverUsbEna': ethena,
'USB_PortForwardEna': fwdena,
'USB_IPChangeEna': '0',
}
needsettings = False
if newena is not None:
needsettings = True
newsettings['USB_EthOverUsbEna'] = numify(newena)
if newfwd is not None:
needsettings = True
newsettings['USB_PortForwardEna'] = numify(newfwd)
if needsettings:
self.wc.grab_json_response('/api/dataset', newsettings)
if 'usb_forwarded_ports' in usbsettings:
oldfwds = {}
usedids = set([])
newfwds = usbsettings['usb_forwarded_ports'].split(',')
for mapping in usbparms['usb_mapped_ports']:
rule = '{0}:{1}'.format(
mapping['ext_port'], mapping['eth_port'])
if rule not in newfwds:
self.wc.grab_json_response(
'/api/function', {
'USB_RemoveMapping': '{0}'.format(mapping['id'])})
else:
usedids.add(mapping['id'])
oldfwds[rule] = mapping['id']
for mapping in usbsettings['usb_forwarded_ports'].split(','):
if mapping not in oldfwds:
newid = 1
while newid in usedids:
newid += 1
if newid > 11:
raise Exception('Too Many Port Forwards')
usedids.add(newid)
newmapping = '{0},{1}'.format(
newid, mapping.replace(':', ','))
self.wc.grab_json_response(
'/api/function', {'USB_AddMapping': newmapping})
def clear_system_configuration(self):
res = self.wc.grab_json_response_with_status(

@ -137,6 +137,30 @@ class OEMHandler(generic.OEMHandler):
settings['presence_assert'] = {
'value': 'Enable' if asserted else 'Disable'
}
usbparms = self.wc.grab_json_response('/api/dataset/imm_usb')
if usbparms:
usbparms = usbparms.get('items', [{}])[0]
if usbparms['usb_eth_over_usb_enabled'] == 1:
usbeth = 'Enable'
else:
usbeth = 'Disable'
settings['usb_ethernet'] = {
'value': usbeth
}
if usbparms['usb_eth_to_eth_enabled'] == 1:
fwd = 'Enable'
else:
fwd = 'Disable'
settings['usb_ethernet_port_forwarding'] = {
'value': fwd
}
mappings = []
for mapping in usbparms['usb_mapped_ports']:
src = mapping['ext_port']
dst = mapping['eth_port']
if src != 0 and dst != 0:
mappings.append('{0}:{1}'.format(src, dst))
settings['usb_forwarded_ports'] = {'value': ','.join(mappings)}
return settings
rulemap = {
@ -151,6 +175,7 @@ class OEMHandler(generic.OEMHandler):
def set_bmc_configuration(self, changeset):
ruleset = {}
usbsettings = {}
for key in changeset:
if isinstance(changeset[key], six.string_types):
changeset[key] = {'value': changeset[key]}
@ -173,12 +198,74 @@ class OEMHandler(generic.OEMHandler):
raise pygexc.InvalidParameterValue(
'"{0}" is not a recognized value for {1}'.format(
currval, key))
elif key.lower() in (
'usb_ethernet', 'usb_ethernet_port_forwarding',
'usb_forwarded_ports'):
usbsettings[key] = changeset[key]['value']
else:
raise pygexc.InvalidParameterValue(
'{0} not a known setting'.format(key))
if usbsettings:
self.apply_usb_configuration(usbsettings)
if ruleset:
self.wc.grab_json_response('/api/dataset', ruleset)
def apply_usb_configuration(self, usbsettings):
def numify(val):
if 'enabled'.startswith(val.lower()):
return '1'
if 'disabled'.startswith(val.lower()):
return '0'
raise Exception('Usupported value')
usbparms = self.wc.grab_json_response('/api/dataset/imm_usb')
usbparms = usbparms.get('items', [{}])[0]
addrmode = '{0}'.format(usbparms['lan_over_usb_addr_mode'])
ethena = '{0}'.format(usbparms['usb_eth_over_usb_enabled'])
fwdena = '{0}'.format(usbparms['usb_eth_to_eth_enabled'])
newena = usbsettings.get('usb_ethernet', None)
newfwd = usbsettings.get('usb_ethernet_port_forwarding', None)
newsettings = {
'USB_LANOverUSBAddrMode': addrmode,
'USB_EthOverUsbEna': ethena,
'USB_PortForwardEna': fwdena,
'USB_IPChangeEna': '0',
}
needsettings = False
if newena is not None:
needsettings = True
newsettings['USB_EthOverUsbEna'] = numify(newena)
if newfwd is not None:
needsettings = True
newsettings['USB_PortForwardEna'] = numify(newfwd)
if needsettings:
self.wc.grab_json_response('/api/dataset', newsettings)
if 'usb_forwarded_ports' in usbsettings:
oldfwds = {}
usedids = set([])
newfwds = usbsettings['usb_forwarded_ports'].split(',')
for mapping in usbparms['usb_mapped_ports']:
rule = '{0}:{1}'.format(
mapping['ext_port'], mapping['eth_port'])
if rule not in newfwds:
self.wc.grab_json_response(
'/api/function', {
'USB_RemoveMapping': '{0}'.format(mapping['id'])})
else:
usedids.add(mapping['id'])
oldfwds[rule] = mapping['id']
for mapping in usbsettings['usb_forwarded_ports'].split(','):
if mapping not in oldfwds:
newid = 1
while newid in usedids:
newid += 1
if newid > 11:
raise Exception('Too Many Port Forwards')
usedids.add(newid)
newmapping = '{0},{1}'.format(
newid, mapping.replace(':', ','))
self.wc.grab_json_response(
'/api/function', {'USB_AddMapping': newmapping})
def get_description(self):
description = self._do_web_request('/DeviceDescription.json')
if description: