olindgre 2ece01e1d7 Make powertrain-build not overlap with pybuild in site-packages
Change-Id: I7b59f3f04f0f787d35db0b9389f295bf1ad24f56
2024-09-17 10:25:04 +02:00

450 lines
21 KiB
Python

# Copyright 2024 Volvo Car Corporation
# Licensed under Apache 2.0.
"""Module for labelsplit files."""
import glob
import re
import json
import sys
from xml.etree import ElementTree
from pathlib import Path
from powertrain_build.feature_configs import FeatureConfigs
from powertrain_build.unit_configs import UnitConfigs
from powertrain_build.signal_interfaces import CsvSignalInterfaces
from powertrain_build.lib import helper_functions, logger
LOGGER = logger.create_logger(__file__)
class LabelSplit:
""" Provides common LabelSplit functions for multiple repos.
"""
def __init__(self, project, build_cfg, cfg_json, cmt_source_folder):
"""Read project configuration file to internal an representation.
Args:
project (str): Project name.
build_cfg(BuildProjConfig): configures which units are active in the current project and where
the code switch files are located.
cfg_json(Path): Path to label split configuration file.
cmt_source_folder (Path): Path to CMT source folder.
"""
super().__init__()
self.project = project
self.build_cfg = build_cfg
project_a2l_file_path = Path(self.build_cfg.get_src_code_dst_dir(),
self.build_cfg.get_a2l_name())
self.feature_cfg = FeatureConfigs(self.build_cfg)
self.unit_cfg = UnitConfigs(self.build_cfg, self.feature_cfg)
self.csv_if = CsvSignalInterfaces(self.build_cfg, self.unit_cfg)
self.project_a2l_symbols = self.get_project_a2l_symbols(project_a2l_file_path)
self.labelsplit_cfg = self.read_json(cfg_json)
self.cmt_source_folder = cmt_source_folder
@staticmethod
def read_json(cfg_json):
"""Read label split configuration file from given location
If the file does not exsit in the given location, program
exits with error message.
Args:
cfg_json(Path): Path to label split configuration file
Returns:
labelsplit_cfg (dict): Dict of given file content
"""
labelsplit_cfg = None
if cfg_json.exists():
with cfg_json.open() as json_file:
labelsplit_cfg = json.load(json_file)
return labelsplit_cfg
LOGGER.error('Cannot find label split config file: %s', cfg_json)
sys.exit(1)
@staticmethod
def get_project_a2l_symbols(project_a2l_file_path):
"""Get a list of calibration symbols found in a given project A2L file.
Args:
project_a2l_file_path (Path): Path to project A2L file.
Returns:
symbols_in_a2l (list): List of calibration symbols found in the project A2L file.
"""
symbols_in_a2l = []
with project_a2l_file_path.open() as a2l_fh:
a2l_text = a2l_fh.read()
calibration_blocks = re.findall(r'(?:\s*\n)*(\s*/begin (CHARACTERISTIC|AXIS_PTS)[\n\s]*'
r'(\w+)([\[\d+\]]*).*?\n.*?/end \2)',
a2l_text,
flags=re.M | re.DOTALL)
for blk in calibration_blocks:
symbols_in_a2l.append(blk[2])
return symbols_in_a2l
@staticmethod
def get_sgp_symbols(sgp_file: Path):
"""Get symbols and symbol_groups found in a given _sgp.xml file.
Example output: {sVcExample: [(3, EC_EX_1), (4, EC_EX_2)]}, where the indices are column indices:
1 -> symbol name (therefore not in list of symbol groups).
2 -> diesel group.
3 -> petrol group.
4 -> hybrid group.
5 -> subsystem (therefore not in list of symbol groups).
Args:
sgp_file (Path): Path to an _sgp.xml file.
Returns:
found_sgp_symbols (dict): A symbol to symbol_groups dictionary found in the sgp_file.
"""
tree = ElementTree.parse(sgp_file)
root = tree.getroot()
search_string = '{{urn:schemas-microsoft-com:office:spreadsheet}}{tag}'
label_sheet = root.find(search_string.format(tag='Worksheet'))
table = label_sheet.find(search_string.format(tag='Table'))
rows = table.findall(search_string.format(tag='Row'))
found_sgp_symbols = {}
for row in rows:
symbol = None
column_counter = 1
cells = row.findall(search_string.format(tag='Cell'))
for cell in cells:
data = cell.find(search_string.format(tag='Data'))
if data is not None:
# Sometimes there are spaces in the symbol cell
# Sometimes there is a weird \ufeff character (VcDebug_sgp.xml) in the symbol cell
value = data.text.replace(' ', '').replace('\ufeff', '')
if symbol is None:
symbol = value
found_sgp_symbols[symbol] = []
else:
new_index = search_string.format(tag='Index')
if new_index in cell.attrib:
column_counter = int(cell.attrib[new_index])
found_sgp_symbols[symbol].append((column_counter, value))
column_counter += 1
return found_sgp_symbols
def get_sgp_symbol_group(self, symbol_groups_by_index):
"""Match _sgp.xml file indices (symbol groups) with a given project.
Args:
symbol_groups_by_index (list(tuple)): List of (index, symbol_group) pairs.
Returns:
symbol_group (str): The symbol group corresponding to the given project.
"""
symbol_group = ''
symbol_dict = self.labelsplit_cfg.get("SGP_SYMBOL_GROUPS")
symbol_list = [val for key, val in symbol_dict.items() if key in self.project]
if len(symbol_list) >= 1:
for index, group in symbol_groups_by_index:
if index == symbol_list[0]:
symbol_group = group
else:
LOGGER.error('Cannot match symbol group type for project: %s', self.project)
return symbol_group
def get_interface_symbols_and_groups(self, interface_dict, in_symbol_sgp_dict, out_symbol_sgp_dict):
"""Get a list of (symbol, symbol_group) pairs found in given interface and sgp files.
Args:
interface_dict (dict): interface to symbol map, matching a certain IO type.
in_symbol_sgp_dict (dict): An input symbol to symbol_groups dictionary found in an sgp_file,
to be compared with interface_dict inputs.
out_symbol_sgp_dict (dict): An output symbol to symbol_groups dictionary found in an sgp_file,
to be compared with interface_dict outputs.
Returns:
symbols_and_groups (list(tuple)): List of (symbol, symbol_group) pairs in: interface and sgp file.
"""
symbols_and_groups = []
for interface, symbol_data in interface_dict.items():
for symbol in symbol_data.keys():
debug_name = re.sub(r'\w(\w+)', r'c\1_db', symbol)
switch_name = re.sub(r'\w(\w+)', r'c\1_sw', symbol)
if 'Input' in interface and debug_name in in_symbol_sgp_dict and switch_name in in_symbol_sgp_dict:
debug_symbol_group = self.get_sgp_symbol_group(in_symbol_sgp_dict[debug_name])
switch_symbol_group = self.get_sgp_symbol_group(in_symbol_sgp_dict[switch_name])
symbols_and_groups.extend([(debug_name, debug_symbol_group), (switch_name, switch_symbol_group)])
elif 'Output' in interface and debug_name in out_symbol_sgp_dict and switch_name in out_symbol_sgp_dict:
debug_symbol_group = self.get_sgp_symbol_group(out_symbol_sgp_dict[debug_name])
switch_symbol_group = self.get_sgp_symbol_group(out_symbol_sgp_dict[switch_name])
symbols_and_groups.extend([(debug_name, debug_symbol_group), (switch_name, switch_symbol_group)])
return symbols_and_groups
def get_debug_symbols_and_groups(self):
"""Get a list of (symbol, symbol_group) pairs found in project interface and VcDebug*_sgp.xml files.
Returns:
debug_symbols_and_groups (list(tuple)): List of (symbol, symbol_group) pairs in:
interface and VcDebug*_sgp.xmlfiles.
"""
_unused, dep, _unused_two, debug = self.csv_if.get_io_config()
sgp_file_dict = self.labelsplit_cfg.get("SGP_FILE")
debug_sgp_file = Path(sgp_file_dict.get('cfg_folder'), sgp_file_dict.get('debug'))
debug_output_sgp_file = Path(sgp_file_dict.get('cfg_folder'), sgp_file_dict.get('debug_output'))
dep_sgp_file = Path(sgp_file_dict.get('cfg_folder'), sgp_file_dict.get('dep'))
dep_output_sgp_file = Path(sgp_file_dict.get('cfg_folder'), sgp_file_dict.get('dep_output'))
debug_sgp_symbols = self.get_sgp_symbols(debug_sgp_file)
debug_output_sgp_symbols = self.get_sgp_symbols(debug_output_sgp_file)
dep_sgp_symbols = self.get_sgp_symbols(dep_sgp_file)
dep_output_sgp_symbols = self.get_sgp_symbols(dep_output_sgp_file)
symbols_and_groups_tmp = []
debug_tmp = self.get_interface_symbols_and_groups(debug,
debug_sgp_symbols,
debug_output_sgp_symbols)
dep_tmp = self.get_interface_symbols_and_groups(dep,
dep_sgp_symbols,
dep_output_sgp_symbols)
symbols_and_groups_tmp.extend(debug_tmp)
symbols_and_groups_tmp.extend(dep_tmp)
debug_symbols_and_groups = []
for symbol, symbol_group in symbols_and_groups_tmp:
if symbol_group == '':
LOGGER.info('Debug symbol %s is missing symbol group and will be removed.', symbol)
else:
debug_symbols_and_groups.append((symbol, symbol_group))
return debug_symbols_and_groups
def check_unit_par_file(self, unit):
"""Check <unit>_par.m file for default sgp symbol group.
Args:
unit (str): Current unit/model name.
Returns:
has_sgp_default (Bool): True/False if unit is associated with default sgp value.
default_symbol_group (str): Name of default symbol group.
"""
has_sgp_default = False
default_symbol_group = ''
base_search_string = r'SgpDefault\.{unit}\.[A-Za-z]+\s*=\s*[\'\"]([A-Za-z_]+)[\'\"]'
search_string = base_search_string.format(unit=unit)
non_existent_par_file = Path('non_existent_par_file.m')
found_par_files = glob.glob('Models/*/' + unit + '/' + unit + '_par.m')
if len(found_par_files) > 1:
LOGGER.warning('Found more than one _par.m file, using %s', found_par_files[0])
par_file = Path(found_par_files[0]) if found_par_files else non_existent_par_file
if self.labelsplit_cfg.get("special_unit_prefixes"):
for special_prefix in self.labelsplit_cfg.get("special_unit_prefixes"):
if unit.startswith(special_prefix) and not par_file.is_file():
# Some units require special handling.
if '__' in unit:
parent = unit.replace('__', 'Mdl__')
else:
parent = unit + 'Mdl'
found_par_files = glob.glob('Models/*/' + parent + '/' + parent + '_par.m')
par_file = Path(found_par_files[0]) if found_par_files else Path(non_existent_par_file)
# Default symbol group is based on c-file name
c_name = re.sub('(Mdl)?(__.*)?', '', unit)
search_string = base_search_string.format(unit=c_name)
if par_file.is_file():
with par_file.open(encoding="latin-1") as par_fh:
par_text = par_fh.read()
sgp_default_match = re.search(search_string, par_text)
if sgp_default_match is not None:
has_sgp_default = True
default_symbol_group = sgp_default_match.group(1)
else:
LOGGER.info('Missing _par file for model: %s', unit)
return has_sgp_default, default_symbol_group
def get_unit_sgp_file(self, unit):
"""Get path to <unit>_sgp.xml file.
Args:
unit (str): Current unit/model name.
Returns:
sgp_file (Path): Path to <unit>_sgp.xml file.
"""
non_existent_sgp_file = Path('non_existent_sgp_file.xml')
found_sgp_files = glob.glob('Models/*/' + unit + '/' + unit + '_sgp.xml')
if len(found_sgp_files) > 1:
LOGGER.warning('Found more than one _sgp.xml file, using %s', found_sgp_files[0])
sgp_file = Path(found_sgp_files[0]) if found_sgp_files else Path(non_existent_sgp_file)
if self.labelsplit_cfg.get("special_unit_prefixes"):
for special_prefix in self.labelsplit_cfg.get("special_unit_prefixes"):
if unit.startswith(special_prefix) and not sgp_file.is_file():
# Some units require special handling.
if '__' in unit:
parent = unit.replace('__', 'Mdl__')
else:
parent = unit + 'Mdl'
found_sgp_files = glob.glob('Models/*/' + parent + '/' + parent + '_sgp.xml')
sgp_file = Path(found_sgp_files[0]) if found_sgp_files else Path(non_existent_sgp_file)
return sgp_file
def get_unit_symbols_and_groups(self, unit, calibration_symbols):
"""Get a list of (symbol, symbol_group) pairs found in A2L, <unit>_sgp/par and config_<unit>.json files.
Args:
unit (str): Current unit/model name.
calibration_symbols (list): All calibration symbols for the unit (from config_<unit>.json).
Returns:
unit_symbols_and_groups (list(tuple)): List of (symbol, symbol_group) pairs in: A2L, _sgp/_par and
config files.
"""
unit_symbols_and_groups = []
has_sgp_default, default_symbol_group = self.check_unit_par_file(unit)
sgp_file = self.get_unit_sgp_file(unit)
if sgp_file.is_file():
found_sgp_symbols = self.get_sgp_symbols(sgp_file)
else:
found_sgp_symbols = {}
LOGGER.info('Missing _sgp file for model: %s', unit)
for symbol in calibration_symbols:
if symbol not in self.project_a2l_symbols:
LOGGER.info('Symbol %s not in project A2L file and will be removed.', symbol)
continue
if symbol in found_sgp_symbols:
symbol_group = self.get_sgp_symbol_group(found_sgp_symbols[symbol])
unit_symbols_and_groups.append((symbol, symbol_group))
elif has_sgp_default:
if symbol.endswith('_sw') or symbol.endswith('_db'):
LOGGER.info('Debug symbol %s not in sgp file and will be removed.', symbol)
else:
unit_symbols_and_groups.append((symbol, default_symbol_group))
else:
LOGGER.info('Symbol %s missing in _sgp file and lack SgpDefault value.', symbol)
return unit_symbols_and_groups
def get_calibration_constants(self):
"""Get all calibration symbols for each unit in the project.
Returns:
calibration_symbols_per_unit (dict): A unit to symbol list dictionary.
"""
security_variables = self.labelsplit_cfg.get("security_variables")
u_conf_dict = self.unit_cfg.get_per_cfg_unit_cfg()
safe_calibration_symbols = {}
for symbol, symbol_data in u_conf_dict['calib_consts'].items():
if symbol not in security_variables:
safe_calibration_symbols.update({symbol: symbol_data})
calibration_symbols_per_unit = {}
for symbol, symbol_data in safe_calibration_symbols.items():
for unit, unit_data in symbol_data.items():
if 'CVC_CAL' in unit_data['class']:
if unit in calibration_symbols_per_unit:
calibration_symbols_per_unit[unit].append(symbol)
else:
calibration_symbols_per_unit[unit] = [symbol]
return calibration_symbols_per_unit
def get_symbols_and_groups(self):
"""Get a list of (symbol, symbol_group) pairs found in A2L, <unit>_sgp/par and config_<unit>.json files.
Returns:
exit_code (int): 0/1 based on successful collection of symbols and symbol groups.
all_symbols_and_groups (dict): A symbol to symbol_group dictionary for all A2L, _sgp/_par and config files,
"""
exit_code = 0
calibration_symbols_per_unit = self.get_calibration_constants()
debug_symbols = self.get_debug_symbols_and_groups()
all_symbol_and_group_pairs = []
if self.labelsplit_cfg.get("project_symbols"):
special_project_dict = self.labelsplit_cfg.get("project_symbols")
pair_list = [val for key, val in special_project_dict.items() if key in self.project]
if len(pair_list) == 1:
symbol_and_group_pairs_list = pair_list[0].items()
all_symbol_and_group_pairs += symbol_and_group_pairs_list
elif len(pair_list) > 1:
LOGGER.error('Project %s has does not follow the name rule', self.project)
return 1, {}
all_symbol_and_group_pairs.extend(debug_symbols)
for unit, symbols in calibration_symbols_per_unit.items():
if self.labelsplit_cfg.get("special_units"):
special_unit_dict = self.labelsplit_cfg.get("special_units")
if unit in special_unit_dict.keys():
# Some units require special handling.
LOGGER.warning('Found %s, assuming %s is used.', unit, special_unit_dict.get(unit))
labels = self.get_unit_symbols_and_groups(special_unit_dict.get(unit), symbols)
else:
labels = self.get_unit_symbols_and_groups(unit, symbols)
else:
labels = self.get_unit_symbols_and_groups(unit, symbols)
all_symbol_and_group_pairs.extend(labels)
symbol_to_group_dict = {}
for symbol, symbol_group in all_symbol_and_group_pairs:
if symbol in symbol_to_group_dict:
if symbol_to_group_dict[symbol] != symbol_group:
LOGGER.error('Symbol %s multiply defined with different symbol groups.', symbol)
exit_code = 1
else:
symbol_to_group_dict[symbol] = symbol_group
return exit_code, symbol_to_group_dict
def generate_label_split_xml_file(self, symbols_and_groups):
"""Generate a label split file, given a directory plus labels and groups to add.
Args:
symbols_and_groups (dict): A symbol to symbol_group dictionary given a project.
Returns:
exit_code (int): 0/1 based on successful generation of Labelsplit.xls.
"""
errors = []
project_root_dir = self.build_cfg.get_root_dir()
cmt_output_folder = helper_functions.create_dir(Path(project_root_dir, 'output', 'CMT'))
start_file_name = Path(self.cmt_source_folder, 'template_labelsplit_sgp_start.xml_')
row_count_file_name = Path(cmt_output_folder, 'labelsplit_rowcount.xml_')
start_2_file_name = Path(self.cmt_source_folder, 'template_labelsplit_sgp_start_2.xml_')
label_split_rows_filename = Path(cmt_output_folder, 'labelsplit_rows.xml_')
end_file_name = Path(self.cmt_source_folder, 'template_labelsplit_sgp_end.xml_')
files_to_merge = [start_file_name, row_count_file_name, start_2_file_name,
label_split_rows_filename, end_file_name]
with row_count_file_name.open('w', encoding="utf-8") as rc_fh:
rc_fh.write(f'{len(symbols_and_groups) + 1}') # header + data
with label_split_rows_filename.open('w', encoding="utf-8") as lsrf_fh:
for symbol, symbol_group in symbols_and_groups.items():
if symbol_group == '':
errors.append(f'Missing symbol group for symbol: {symbol}')
elif symbol_group == 'VCC_SPM_DEBUG':
LOGGER.info('Ignoring undistributed debug symbol: %s', symbol)
else:
lsrf_fh.write(
' <Row ss:AutoFitHeight="0">\n'
f' <Cell><Data ss:Type="String">{symbol}'
'</Data><NamedCell ss:Name="_FilterDatabase"/></Cell>\n'
f' <Cell ss:Index="5"><Data ss:Type="String">{symbol_group}'
'</Data><NamedCell ss:Name="_FilterDatabase"/></Cell>\n'
' </Row>\n'
)
if errors:
LOGGER.error('\n'.join(errors))
return 1
output_file_name = Path(project_root_dir, 'output', 'CMT', 'Labelsplit.xls')
with output_file_name.open('w', encoding="utf-8") as output_fh:
for file_name in files_to_merge:
with file_name.open(encoding="utf-8") as input_fh:
content = input_fh.read()
output_fh.write(content)
LOGGER.info('Delivery to: %s', str(output_file_name))
return 0