olindgre 2ece01e1d7 Make powertrain-build not overlap with pybuild in site-packages
Change-Id: I7b59f3f04f0f787d35db0b9389f295bf1ad24f56
2024-09-17 10:25:04 +02:00

421 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright 2024 Volvo Car Corporation
# Licensed under Apache 2.0.
# -*- coding: utf-8 -*-
"""Module for reading unit configuration files."""
import json
import os
import time
from pprint import pformat
from powertrain_build.build_proj_config import BuildProjConfig
from powertrain_build.problem_logger import ProblemLogger
from powertrain_build.versioncheck import Version
class CodeGenerators:
"""Enum for code generators."""
target_link = 'target_link'
embedded_coder = 'embedded_coder'
class UnitConfigs(ProblemLogger):
"""A class for accessing the projects unit definitions (see :doc:`unit_config`).
Provides methods for retrieving the all definitions of a unit and all existing units.
"""
CONFIG_SKIP_LIST = ['VcDebugSafe', 'VcDebug', 'VcDebugOutputSafe', 'VcDebugOutput']
def __init__(self, build_prj_config, feature_config):
"""Class Initialization.
Args:
build_prj_config (BuildProjConfig): A class instance which holds
the information of where to find units configs to parse
feature_config (FeatureConfigs): Class instance project feature definitions
"""
super().__init__()
if not isinstance(build_prj_config, BuildProjConfig):
raise TypeError('build_prj_config argument is not an'
' instance of BuildProjConfig')
self._build_prj_config = build_prj_config
self._feature_cfg = feature_config
self._raw_per_unit_configs = {}
self._per_unit_configs = {}
self._per_type_unit_configs = {}
self._if_define_dict = {}
self._missing_configs = set()
self._empty_config_def = set()
self._parse_all_unit_configs()
self._per_type_unit_signals()
self.code_generators = self._get_code_generators()
self.base_types_headers = self._get_base_types_headers()
# write the summary of error (to avoid repeating error messages)
for unit in self._missing_configs:
self.critical('%s is missing config files', unit)
for var, unit in self._empty_config_def:
self.warning('%s in unit %s, has empty config_def!'
'probably goto-block, missing a corresponing from-block.',
var, unit)
def __repr__(self):
"""Get string representation of object."""
return pformat(self._per_type_unit_configs)
def _parse_all_unit_configs(self):
"""Parse all unit config files."""
start_time = time.time()
self.info(' Start loading unit_cfg json files')
cfg_dirs = self._build_prj_config.get_unit_cfg_dirs()
for unit, cfg_dir in cfg_dirs.items():
self._parse_unit_config(unit, cfg_dir)
self.info(' Finished loading unit_cfg json files (in %4.2f s)', time.time() - start_time)
def _parse_unit_config(self, unit, cfg_dir):
"""Parse one unit config file."""
file_ = os.path.join(cfg_dir, f'config_{unit}.json')
with open(file_, 'r', encoding="utf-8") as fhndl:
self.debug('Loading json file %s', unit)
try:
tmp_ucfg = json.load(fhndl)
if not Version.is_compatible(tmp_ucfg.get('version', '0.0.0')):
raise ValueError(f'Incompatible config file version for unit {unit}.')
if unit in self._raw_per_unit_configs:
self.critical("Conflicting Unit name %s: Units need to have unique names", unit)
self._raw_per_unit_configs[unit] = tmp_ucfg
except json.JSONDecodeError as ex:
self.critical('Error reading config file %s: %s', file_, ex)
return
for include_unit in tmp_ucfg.get('includes', []):
self.debug('%s includes %s in %s', unit, include_unit, cfg_dir)
self._parse_unit_config(include_unit, cfg_dir)
def _filter_io_nvm_feat(self):
"""Remove all parameters not defined in the prj_config.
Parameters can be removed via not active feature in the unit, or
the entire unit is not included in the project.
Args:
config (str): the name of the configuration
the format of the data-dict::
{'UnitName': {'class': 'CVC_EXT',
'configs': [['Vc_D_CodegenHev '
'== 2',
'Vc_D_CodegenHev '
'> 0']],
'description': 'HV battery cooling request',
'handle': 'VcPpmPsm/VcPpmPsm/Subsystem/'
'VcPpmPsm/yVcBec_B_ChillerCoolReq',
'lsb': 1,
'max': 3,
'min': 0,
'name': 'sVcBec_D_HvBattCoolgReq',
'offset': 0,
'type': 'UInt8',
'unit': '-'}
}
"""
res = {}
self.debug('_filter_io_nvm_feat: Feature Cfg')
for unit in self._build_prj_config.get_included_units():
self._filter_io_nvm_feat_unit(unit, res)
if not res:
self.warning('No units configured for project')
return res
def _filter_core_config(self, u_def_data):
"""Handle core configs."""
f_core = {}
for core_type, core_data in u_def_data.items():
f_core[core_type] = {}
for key, value in core_data.items():
if key != 'IllegalBlk':
# Matlab sets core:{type:{name:{API_blk:[{path, config}]}}}
# config.py - core:{type:{name:{configs}}}
configs = value.get('configs', [cfg for blk in value['API_blk'] for cfg in blk['config']])
if self._feature_cfg.check_if_active_in_config(configs):
f_core[core_type][key] = value
return f_core
def _filter_io_nvm_feat_unit(self, unit, res):
"""Handle one unit config with respect to the filtering in :_filter_io_nvm_feat:."""
try:
u_data = self._raw_per_unit_configs[unit]
except KeyError:
# Some units in the raster should not have config files
if unit not in self.CONFIG_SKIP_LIST:
self.debug('_filter_io_nvm_feat_unit: cfg missing: %s', unit)
self._missing_configs.add(unit)
return
for u_def_type, u_def_data in u_data.items():
res.setdefault(unit, {}).setdefault(u_def_type, {})
if u_def_type == 'dids':
f_dids = {k: v for k, v in u_def_data.items()
if self._feature_cfg.check_if_active_in_config(v['configs'])}
res[unit][u_def_type] = f_dids
elif u_def_type == 'core':
res[unit][u_def_type] = self._filter_core_config(u_def_data)
elif u_def_type == 'pre_procs':
# the pre_proc key does not have configuration attributes
res[unit]['pre_procs'] = u_def_data
elif u_def_type == 'integrity_level':
res[unit]['integrity_level'] = u_def_data
elif u_def_type == 'code_generator':
res[unit]['code_generator'] = u_def_data
elif u_def_type == 'version':
res[unit]['version'] = u_def_data
elif u_def_type == 'csp':
csp_data = {}
if 'methods' in u_def_data:
csp_data = {'methods': {}}
for method_name, method_data in u_def_data['methods'].items():
if self._feature_cfg.check_if_active_in_config(method_data['configs']):
csp_data['methods'][method_name] = method_data
res[unit][u_def_type] = csp_data
elif u_def_type == 'includes':
# List of configs for handwritten code
for included_unit in u_def_data:
self.debug('%s includes %s', unit, included_unit)
self._filter_io_nvm_feat_unit(included_unit, res)
else:
for var, var_pars in u_def_data.items():
# TODO: remove this code when the bug in the matlab code is removed.
if var_pars['configs'] == []:
self.debug('Adding %s', unit)
self._empty_config_def.add((var, unit))
if self._feature_cfg.check_if_active_in_config(var_pars['configs']):
res[unit][u_def_type].setdefault(var, {}).update(var_pars)
@staticmethod
def _update_io_nvm(dict_, unit, data_type, variables):
"""Change the struct for in out and nvm variables.
The resulting new struct is stored in the dict dict_
"""
for var, var_pars in variables.items():
dict_.setdefault(data_type, {}).setdefault(var, {}).setdefault(unit, var_pars)
def _update_dids(self, unit, key, data, feat_cfg=None):
"""Change the struct for in out and nvm variables."""
# TODO: Add functionality
@staticmethod
def _update_core(dict_, unit, data_type, core_ids):
"""Change the struct for in core parameters."""
for _, core_data in core_ids.items():
for var, var_pars in core_data.items():
dict_.setdefault(data_type, {}).setdefault(var, {}).setdefault(unit, var_pars)
def _update_pre_procs(self, unit, key, data, feat_cfg=None):
"""Change the struct for in pre_processor parameters."""
# TODO: Add functionality
def _per_type_unit_signals(self):
"""Change the structure of the data to aggregate all unit configs.
Returns:
dict: a structure per config type instead of per unit
"""
# loop over all projects and store the active items in each configuration
self._per_unit_configs = self._filter_io_nvm_feat()
dict_ = self._per_type_unit_configs = {}
for unit, udata in self._per_unit_configs.items():
for data_type, variables in udata.items():
if data_type in ['core']:
self._update_core(dict_, unit, data_type, variables)
elif data_type in ['dids']:
self._update_dids(dict_, unit, data_type, variables)
elif data_type in ['pre_procs']:
self._update_pre_procs(dict_, unit, data_type, variables)
elif data_type in ['outports', 'inports', 'dids', 'nvm', 'local_vars', 'calib_consts', 'csp']:
self._update_io_nvm(dict_, unit, data_type, variables)
else:
dict_.setdefault(data_type, {}).setdefault(unit, udata)
def check_if_in_unit_cfg(self, unit, symbol):
"""Check if the symbol is defined in the unit config file."""
for data in self._raw_per_unit_configs[unit].values():
if isinstance(data, dict):
if symbol in data:
return True
return False
def get_per_cfg_unit_cfg(self):
"""Get all io-signals and core-ids for all units.
Get all io-signals and core-ids for all units, where all inports, outport, etc,
are aggregated from all unit definition files.
Returns:
dict: a dict with the below format::
{
'inports/outports/nvm/core': {
'VARIABLE_NAME': {
'UNIT_NAME': {
'class': 'CVC_EXT',
'configs': [['all']],
'description': 'Power Pulse ',
'handle': 'VcPemAlc/VcPemAlc/Subsystem/VcPemAlc/yVcVmcPmm_B_SsActive9',
'lsb': 1,
'max': 800,
'min': 0,
'name': 'sVcAesPp_Pw_PwrPls',
'offset': 0,
'type': 'UInt16',
'unit': 'W'
}
}
}
}
}
The top level keys are 'inports', 'outports', 'nvm' and 'core'
"""
return self._per_type_unit_configs
def check_if_in_per_cfg_unit_cfg(self, cfg, symbol):
"""Check if the symbol is defined in the aggregated unit config files."""
return (
cfg in self._per_type_unit_configs and
symbol in self._per_type_unit_configs[cfg])
def get_per_unit_cfg(self):
"""Get io-signals for all units, per unit, for a given project.
If 'all' is given as a project, all signals, regardless of configuration,
is returned.
Returns:
dict: a dict with the below format::
{'NAME_OF_UNIT': {
'core': {'Events': {},
'FIDs': {},
'IUMPR': {},
'Ranking': {},
'TstId': {}},
'dids': {},
'inports': {'VARIABLE_NAME': {'class': 'CVC_DISP',
'configs': [['all']],
'description': 'Torque '
'arbitraion '
'state',
'handle': 'VcPemAlc/VcPemAlc/...',
'lsb': 1,
'max': 9,
'min': 0,
'name': 'rVcPemAlc_D_AuxLoadEvent',
'offset': 0,
'type': 'UInt8',
'unit': '-'}
},
'outports' : {},
'nvm' : {}
}
}
"""
return self._per_unit_configs
def get_per_unit_cfg_total(self):
"""Get total io-signals configuration for all units, per unit, for a given project.
Does not remove signals disabled by code switches.
Returns:
dict: a dict with the below format::
{'NAME_OF_UNIT': {
'core': {'Events': {},
'FIDs': {},
'IUMPR': {},
'Ranking': {},
'TstId': {}},
'dids': {},
'inports': {'VARIABLE_NAME': {'class': 'CVC_DISP',
'configs': [['all']],
'description': 'Torque '
'arbitraion '
'state',
'handle': 'VcPemAlc/VcPemAlc/...',
'lsb': 1,
'max': 9,
'min': 0,
'name': 'rVcPemAlc_D_AuxLoadEvent',
'offset': 0,
'type': 'UInt8',
'unit': '-'}
},
'outports' : {},
'nvm' : {}
}
}
"""
res = {}
for unit in self._build_prj_config.get_included_units():
try:
res[unit] = self._raw_per_unit_configs[unit]
except KeyError:
continue
return res
def get_unit_config(self, unit):
"""Get config for a unit.
Arguments:
unit (str): Unit to get config for
"""
return self._raw_per_unit_configs[unit]
@staticmethod
def get_base_name(unit):
"""Get base name of unit."""
return unit.partition('__')[0]
def get_unit_code_generator(self, unit):
"""Get code generator for a given a unit (model).
Args:
unit (str): Current unit/model name.
Returns:
code_generator (str): Code generator used for given model.
"""
per_unit_cfg = self.get_per_unit_cfg()
if unit in per_unit_cfg and 'code_generator' in per_unit_cfg[unit]:
code_generator = per_unit_cfg[unit]['code_generator']
else:
# Default to target_link
code_generator = CodeGenerators.target_link
return code_generator
def _get_code_generators(self):
per_unit_cfg = self.get_per_unit_cfg()
code_generators = set()
for _, config in per_unit_cfg.items():
if 'code_generator' in config:
code_generators.add(config['code_generator'])
if not code_generators:
# Default to target_link
self.info('All code has an old config, defaulting to target_link')
return {CodeGenerators.target_link}
return code_generators
def _get_base_types_headers(self):
general_includes = ''
if CodeGenerators.embedded_coder in self.code_generators:
general_includes += '#include "rtwtypes.h"\n'
if CodeGenerators.target_link in self.code_generators:
general_includes += '#include "tl_basetypes.h"\n'
return general_includes