Merge "Telegraf: Add core selection"

This commit is contained in:
Zuul 2023-12-04 17:46:53 +00:00 committed by Gerrit Code Review
commit 81a379ee0d
4 changed files with 479 additions and 1 deletions

View File

@ -2,7 +2,7 @@ BUILDER=script
LABEL=telegraf
SOURCE_REPO=https://github.com/influxdata/telegraf.git
SOURCE_REF=v1.27.3
SOURCE_PATCHES="../files/telegraf-0001-Patch-for-docker-image.patch ../files/telegraf-0002-fix-inputs.intel_powerstat-reduce-msr-read-latency-o.patch ../files/telegraf-0003-feat-intel.powerstat-Estimate-isolated-core-frequenc.patch ../files/telegraf-0004-fix-inputs.intel_powerstat-Add-MSR-read-timeout.patch"
SOURCE_PATCHES="../files/telegraf-0001-Patch-for-docker-image.patch ../files/telegraf-0002-fix-inputs.intel_powerstat-reduce-msr-read-latency-o.patch ../files/telegraf-0003-feat-intel.powerstat-Estimate-isolated-core-frequenc.patch ../files/telegraf-0004-fix-inputs.intel_powerstat-Add-MSR-read-timeout.patch ../files/telegraf-0005-feat-intel.powerstat-Add-core-selection.patch"
COMMAND=bash
SCRIPT=build.sh
ARGS=telegraf

View File

@ -0,0 +1,449 @@
From 703d4e262ea1285da8d7be2e2d425bba621ae446 Mon Sep 17 00:00:00 2001
From: Caio Cesar Ferreira <Caio.CesarFerreira@windriver.com>
Date: Fri, 27 Oct 2023 16:47:00 -0300
Subject: [PATCH] feat(inputs.intel_powerstat): Add core selection
This commit adds the possibility for the user to select from which cores
will be collected metrics.
Signed-off-by: Caio Cesar Ferreira <Caio.CesarFerreira@windriver.com>
---
.../inputs/intel_powerstat/intel_powerstat.go | 61 ++++++----
.../intel_powerstat/intel_powerstat_test.go | 4 +-
plugins/inputs/intel_powerstat/msr.go | 113 +++++++++++++++++-
.../inputs/intel_powerstat/msr_mock_test.go | 15 +++
plugins/inputs/intel_powerstat/msr_test.go | 3 +-
plugins/inputs/intel_powerstat/sample.conf | 5 +
6 files changed, 173 insertions(+), 28 deletions(-)
diff --git a/plugins/inputs/intel_powerstat/intel_powerstat.go b/plugins/inputs/intel_powerstat/intel_powerstat.go
index 98dabcf0c..a6e88ccaf 100644
--- a/plugins/inputs/intel_powerstat/intel_powerstat.go
+++ b/plugins/inputs/intel_powerstat/intel_powerstat.go
@@ -47,6 +47,7 @@ type PowerStat struct {
PackageMetrics []string `toml:"package_metrics"`
ReadMethod string `toml:"read_method"`
ReadTimeout int64 `toml:"msr_read_timeout_ms"`
+ Cores []string `toml:"cores"`
Log telegraf.Logger `toml:"-"`
fs fileService
@@ -87,7 +88,7 @@ func (p *PowerStat) Init() error {
return err
}
- if !contains(knownReadMethods, p.ReadMethod) {
+ if !Contains(knownReadMethods, p.ReadMethod) {
return fmt.Errorf("provided read method %q is not valid", p.ReadMethod)
}
@@ -106,7 +107,7 @@ func (p *PowerStat) initMSR() {
// Initialize MSR service only when there is at least one metric enabled
if p.cpuFrequency || p.cpuBusyFrequency || p.cpuTemperature || p.cpuC0StateResidency || p.cpuC1StateResidency ||
p.cpuC6StateResidency || p.cpuBusyCycles || p.packageTurboLimit || p.packageUncoreFrequency || p.packageCPUBaseFrequency {
- p.msr = newMsrServiceWithFs(p.Log, p.fs, p.ReadMethod, p.ReadTimeout)
+ p.msr = newMsrServiceWithFs(p.Log, p.fs, p.ReadMethod, p.ReadTimeout, p.Cores)
}
}
@@ -323,13 +324,25 @@ func (p *PowerStat) addCurrentDramPowerConsumption(socketID string, acc telegraf
func (p *PowerStat) addPerCoreMetrics(acc telegraf.Accumulator) {
var wg sync.WaitGroup
- wg.Add(len(p.msr.getCPUCoresData()))
- for cpuID := range p.msr.getCPUCoresData() {
- go p.addMetricsForSingleCore(cpuID, acc, &wg)
+ if len(p.msr.getReadableCPUCores()) > 0 {
+ wg.Add(len(p.msr.getReadableCPUCores()))
+
+ for _, cpuID := range p.msr.getReadableCPUCores() {
+ go p.addMetricsForSingleCore(cpuID, acc, &wg)
+ }
+
+ wg.Wait()
+ } else {
+ wg.Add(len(p.msr.getCPUCoresData()))
+
+ for cpuID := range p.msr.getCPUCoresData() {
+ go p.addMetricsForSingleCore(cpuID, acc, &wg)
+ }
+
+ wg.Wait()
}
- wg.Wait()
}
func (p *PowerStat) addMetricsForSingleCore(cpuID string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
@@ -736,11 +749,11 @@ func (p *PowerStat) getBusClock(cpuID string) float64 {
busClock133 := []int64{0x1E, 0x1F, 0x1A, 0x2E, 0x25, 0x2C, 0x2F, 0x4C}
busClockCalculate := []int64{0x37, 0x4D}
- if contains(convertIntegerArrayToStringArray(busClock100), model) {
+ if Contains(convertIntegerArrayToStringArray(busClock100), model) {
return 100.0
- } else if contains(convertIntegerArrayToStringArray(busClock133), model) {
+ } else if Contains(convertIntegerArrayToStringArray(busClock133), model) {
return 133.0
- } else if contains(convertIntegerArrayToStringArray(busClockCalculate), model) {
+ } else if Contains(convertIntegerArrayToStringArray(busClockCalculate), model) {
return p.getSilvermontBusClock(cpuID)
}
@@ -775,23 +788,23 @@ func (p *PowerStat) parsePackageMetricsConfig() {
return
}
- if contains(p.PackageMetrics, packageTurboLimit) {
+ if Contains(p.PackageMetrics, packageTurboLimit) {
p.packageTurboLimit = true
}
- if contains(p.PackageMetrics, packageCurrentPowerConsumption) {
+ if Contains(p.PackageMetrics, packageCurrentPowerConsumption) {
p.packageCurrentPowerConsumption = true
}
- if contains(p.PackageMetrics, packageCurrentDramPowerConsumption) {
+ if Contains(p.PackageMetrics, packageCurrentDramPowerConsumption) {
p.packageCurrentDramPowerConsumption = true
}
- if contains(p.PackageMetrics, packageThermalDesignPower) {
+ if Contains(p.PackageMetrics, packageThermalDesignPower) {
p.packageThermalDesignPower = true
}
- if contains(p.PackageMetrics, packageUncoreFrequency) {
+ if Contains(p.PackageMetrics, packageUncoreFrequency) {
p.packageUncoreFrequency = true
}
- if contains(p.PackageMetrics, packageCPUBaseFrequency) {
+ if Contains(p.PackageMetrics, packageCPUBaseFrequency) {
p.packageCPUBaseFrequency = true
}
}
@@ -801,31 +814,31 @@ func (p *PowerStat) parseCPUMetricsConfig() {
return
}
- if contains(p.CPUMetrics, cpuFrequency) {
+ if Contains(p.CPUMetrics, cpuFrequency) {
p.cpuFrequency = true
}
- if contains(p.CPUMetrics, cpuC0StateResidency) {
+ if Contains(p.CPUMetrics, cpuC0StateResidency) {
p.cpuC0StateResidency = true
}
- if contains(p.CPUMetrics, cpuC1StateResidency) {
+ if Contains(p.CPUMetrics, cpuC1StateResidency) {
p.cpuC1StateResidency = true
}
- if contains(p.CPUMetrics, cpuC6StateResidency) {
+ if Contains(p.CPUMetrics, cpuC6StateResidency) {
p.cpuC6StateResidency = true
}
- if contains(p.CPUMetrics, cpuBusyCycles) {
+ if Contains(p.CPUMetrics, cpuBusyCycles) {
p.cpuBusyCycles = true
}
- if contains(p.CPUMetrics, cpuBusyFrequency) {
+ if Contains(p.CPUMetrics, cpuBusyFrequency) {
p.cpuBusyFrequency = true
}
- if contains(p.CPUMetrics, cpuTemperature) {
+ if Contains(p.CPUMetrics, cpuTemperature) {
p.cpuTemperature = true
}
}
@@ -852,7 +865,7 @@ func (p *PowerStat) verifyProcessor() error {
return fmt.Errorf("Intel processor not found, vendorId: %s", firstCPU.vendorID)
}
- if !contains(convertIntegerArrayToStringArray(allowedProcessorModelsForC1C6), firstCPU.model) {
+ if !Contains(convertIntegerArrayToStringArray(allowedProcessorModelsForC1C6), firstCPU.model) {
p.cpuC1StateResidency = false
p.cpuC6StateResidency = false
}
@@ -881,7 +894,7 @@ func (p *PowerStat) verifyProcessor() error {
return nil
}
-func contains[T comparable](s []T, e T) bool {
+func Contains[T comparable](s []T, e T) bool {
for _, v := range s {
if v == e {
return true
diff --git a/plugins/inputs/intel_powerstat/intel_powerstat_test.go b/plugins/inputs/intel_powerstat/intel_powerstat_test.go
index fc501a288..1118a011f 100644
--- a/plugins/inputs/intel_powerstat/intel_powerstat_test.go
+++ b/plugins/inputs/intel_powerstat/intel_powerstat_test.go
@@ -120,6 +120,7 @@ func TestGather(t *testing.T) {
On("retrieveAndCalculateData", mock.Anything).Return(nil).Times(len(raplDataMap)).
On("getConstraintMaxPowerWatts", mock.Anything).Return(546783852.3, nil)
mockServices.msr.On("getCPUCoresData").Return(preparedCPUData).
+ On("getReadableCPUCores").Return(coreIDs).
On("isMsrLoaded", mock.Anything).Return(true).
On("openAndReadMsr", mock.Anything).Return(nil).
On("retrieveCPUFrequencyForCore", mock.Anything).Return(1200000.2, nil)
@@ -640,6 +641,7 @@ func getPowerWithMockedServices() (*PowerStat, *MockServices) {
p.packageCurrentPowerConsumption = true
p.packageCurrentDramPowerConsumption = true
p.packageThermalDesignPower = true
+ p.Cores = []string{"0-3"}
return p, &mockServices
}
@@ -730,7 +732,7 @@ func TestGetBusClock(t *testing.T) {
p.cpuInfo = map[string]*cpuInfo{
tt.socketID: {cpuID: tt.socketID, physicalID: tt.socketID, model: strconv.FormatUint(tt.modelCPU, 10)},
}
- if contains(busClockCalculate, tt.modelCPU) {
+ if Contains(busClockCalculate, tt.modelCPU) {
mockServices.msr.On("readSingleMsr", mock.Anything, msrFSBFreqString).Return(tt.msrFSBFreqValue, tt.readSingleMsrErrFSB)
}
defer mockServices.msr.AssertExpectations(t)
diff --git a/plugins/inputs/intel_powerstat/msr.go b/plugins/inputs/intel_powerstat/msr.go
index e5bc8413a..b3c7c60a5 100644
--- a/plugins/inputs/intel_powerstat/msr.go
+++ b/plugins/inputs/intel_powerstat/msr.go
@@ -8,6 +8,7 @@ import (
"io"
"os"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -41,7 +42,6 @@ const (
platformInfo = 0xCE
fsbFreq = 0xCD
)
-
const (
msrTurboRatioLimitString = "MSR_TURBO_RATIO_LIMIT"
msrTurboRatioLimit1String = "MSR_TURBO_RATIO_LIMIT1"
@@ -52,9 +52,13 @@ const (
msrFSBFreqString = "MSR_FSB_FREQ"
)
+// Maximum size of core IDs or socket IDs (8192). Based on maximum value of CPUs that linux kernel supports.
+const maxIDsSize = 1 << 13
+
// msrService is responsible for interactions with MSR.
type msrService interface {
getCPUCoresData() map[string]*msrData
+ getReadableCPUCores() []string
retrieveCPUFrequencyForCore(core string) (float64, error)
retrieveUncoreFrequency(socketID string, typeFreq string, kind string, die string) (float64, error)
openAndReadMsr(core string) error
@@ -64,6 +68,7 @@ type msrService interface {
type msrServiceImpl struct {
cpuCoresData map[string]*msrData
+ cpuCores []string
msrOffsets []int64
fs fileService
log telegraf.Logger
@@ -75,6 +80,10 @@ func (m *msrServiceImpl) getCPUCoresData() map[string]*msrData {
return m.cpuCoresData
}
+func (m *msrServiceImpl) getReadableCPUCores() []string {
+ return m.cpuCores
+}
+
func (m *msrServiceImpl) isMsrLoaded() bool {
for cpuID := range m.getCPUCoresData() {
err := m.openAndReadMsr(cpuID)
@@ -412,6 +421,11 @@ func (m *msrServiceImpl) setCPUCores() error {
for _, cpuPath := range cpuPaths {
core := strings.TrimPrefix(filepath.Base(cpuPath), cpuPrefix)
+ if m.cpuCores != nil {
+ if !Contains(m.cpuCores, core) {
+ continue
+ }
+ }
m.cpuCoresData[core] = &msrData{
mperf: 0,
aperf: 0,
@@ -433,12 +447,15 @@ func (m *msrServiceImpl) setCPUCores() error {
return nil
}
-func newMsrServiceWithFs(logger telegraf.Logger, fs fileService, readMethod string, readTimeout int64) *msrServiceImpl {
+func newMsrServiceWithFs(logger telegraf.Logger, fs fileService, readMethod string, readTimeout int64, cores []string) *msrServiceImpl {
+ parsedCores := parseCores(logger, cores)
+
msrService := &msrServiceImpl{
fs: fs,
log: logger,
readMethod: readMethod,
readTimeout: time.Duration(readTimeout) * time.Millisecond,
+ cpuCores: parsedCores,
}
err := msrService.setCPUCores()
if err != nil {
@@ -451,3 +468,95 @@ func newMsrServiceWithFs(logger telegraf.Logger, fs fileService, readMethod stri
throttleTemperatureLocation, temperatureLocation}
return msrService
}
+
+func parseCores(logger telegraf.Logger, cores []string) []string {
+ var ids []string
+ var duplicatedIDs []string
+ var err error
+
+ if len(cores) == 0 || cores == nil {
+ logger.Warn("an empty list of cores was provided. All possible cores will be configured")
+ return nil
+ }
+ ids, err = parseIDs(cores)
+ if err != nil {
+ return nil
+ }
+ ids, duplicatedIDs = removeDuplicateValues(ids)
+ ids = removeNotFoundIDs(ids, logger)
+
+ for _, duplication := range duplicatedIDs {
+ logger.Warnf("duplicated id number %s will be removed", duplication)
+ }
+
+ if err != nil {
+ logger.Warnf("Error while parsing list of cores: %w. All possible cores will be configured", err)
+ return nil
+ }
+
+ return ids
+}
+
+func parseIDs(allIDsStrings []string) ([]string, error) {
+ var result []string
+ for _, idsString := range allIDsStrings {
+ ids := strings.Split(idsString, ",")
+
+ for _, id := range ids {
+ id := strings.TrimSpace(id)
+ // a-b support
+ var start, end int
+ n, err := fmt.Sscanf(id, "%d-%d", &start, &end)
+ if err == nil && n == 2 {
+ if start >= end {
+ return nil, fmt.Errorf("`%d` is equal or greater than `%d`", start, end)
+ }
+ for ; start <= end; start++ {
+ if len(result)+1 > maxIDsSize {
+ return nil, fmt.Errorf("requested number of IDs exceeds max size `%d`", maxIDsSize)
+ }
+ result = append(result, strconv.Itoa(start))
+
+ }
+ continue
+ }
+ // Single value
+ if len(result)+1 > maxIDsSize {
+ return nil, fmt.Errorf("requested number of IDs exceeds max size `%d`", maxIDsSize)
+ }
+ result = append(result, id)
+ }
+ }
+ return result, nil
+}
+
+func removeDuplicateValues(stringSlice []string) (result []string, duplicates []string) {
+ keys := make(map[string]bool)
+
+ for _, entry := range stringSlice {
+ if _, value := keys[entry]; !value {
+ keys[entry] = true
+ result = append(result, entry)
+ } else {
+ duplicates = append(duplicates, entry)
+ }
+ }
+ return result, duplicates
+}
+
+func removeNotFoundIDs(cores []string, logger telegraf.Logger) []string {
+ corePath := "/sys/devices/system/cpu/cpu%s/"
+ var foundCores []string
+
+ for _, coresID := range cores {
+ path := fmt.Sprintf(corePath, coresID)
+ err := checkFile(path)
+ if err != nil {
+ logger.Warnf("Removing core id %s Error: %s", coresID, err)
+ continue
+ }
+ foundCores = append(foundCores, coresID)
+
+ }
+ return foundCores
+}
diff --git a/plugins/inputs/intel_powerstat/msr_mock_test.go b/plugins/inputs/intel_powerstat/msr_mock_test.go
index f4b99f0e6..c14da7db1 100644
--- a/plugins/inputs/intel_powerstat/msr_mock_test.go
+++ b/plugins/inputs/intel_powerstat/msr_mock_test.go
@@ -41,6 +41,21 @@ func (_m *mockMsrService) getCPUCoresData() map[string]*msrData {
return r0
}
+func (_m *mockMsrService) getReadableCPUCores() []string{
+ ret := _m.Called()
+
+ var r0 []string
+ if rf, ok := ret.Get(0).(func() []string); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]string)
+ }
+ }
+
+ return r0
+}
+
// openAndReadMsr provides a mock function with given fields: core
func (_m *mockMsrService) openAndReadMsr(core string) error {
ret := _m.Called(core)
diff --git a/plugins/inputs/intel_powerstat/msr_test.go b/plugins/inputs/intel_powerstat/msr_test.go
index 8c26ed9b3..b4164153b 100644
--- a/plugins/inputs/intel_powerstat/msr_test.go
+++ b/plugins/inputs/intel_powerstat/msr_test.go
@@ -178,13 +178,14 @@ func verifyCPUCoresData(cores []string, t *testing.T, msr *msrServiceImpl, expec
func getMsrServiceWithMockedFs() (*msrServiceImpl, *mockFileService) {
cores := []string{"cpu0", "cpu1", "cpu2", "cpu3"}
+ mockedCores := []string{"0-3"}
logger := testutil.Logger{Name: "PowerPluginTest"}
readMethod := "concurrent"
fsMock := &mockFileService{}
fsMock.On("getStringsMatchingPatternOnPath", mock.Anything).
Return(cores, nil).Once()
timeout := int64(100)
- msr := newMsrServiceWithFs(logger, fsMock, readMethod, timeout)
+ msr := newMsrServiceWithFs(logger, fsMock, readMethod, timeout, mockedCores)
return msr, fsMock
}
diff --git a/plugins/inputs/intel_powerstat/sample.conf b/plugins/inputs/intel_powerstat/sample.conf
index ffe877d0b..b0af4e877 100644
--- a/plugins/inputs/intel_powerstat/sample.conf
+++ b/plugins/inputs/intel_powerstat/sample.conf
@@ -29,3 +29,8 @@
# The user can set the timeout duration for MSR reading (in milliseconds)
# The default value is 100 ms.
# msr_read_timeout_ms = 100
+
+ # The user can select from which cores he wants to collect metrics.
+ # The default is empty, which reports every core in the system.
+ # The user can set the cores as the example below.
+ # cores = ["0,3", "10", "20-30"]
--
2.25.1

View File

@ -0,0 +1,28 @@
From 20a5d2bd95a6c8e063de3d1f5521b10eaf30c0a2 Mon Sep 17 00:00:00 2001
From: Caio Cesar Ferreira <Caio.CesarFerreira@windriver.com>
Date: Mon, 30 Oct 2023 15:12:49 -0300
Subject: [PATCH] Add Core Selection
This commit adds core field to the values file.
This change is required to core selection feature to work on telegraf.
Signed-off-by: Caio Cesar Ferreira <Caio.CesarFerreira@windriver.com>
---
charts/telegraf-ds/values.yaml | 1 +
1 file changed, 1 insertion(+)
diff --git a/charts/telegraf-ds/values.yaml b/charts/telegraf-ds/values.yaml
index fdd4f7f..069ca5d 100644
--- a/charts/telegraf-ds/values.yaml
+++ b/charts/telegraf-ds/values.yaml
@@ -151,6 +151,7 @@ config:
cpu_metrics: ["cpu_frequency","cpu_busy_frequency","cpu_temperature","cpu_c0_state_residency","cpu_c1_state_residency","cpu_c6_state_residency","cpu_busy_cycles"]
package_metrics: ["current_power_consumption","current_dram_power_consumption","thermal_design_power","cpu_base_frequency","uncore_frequency"]
read_method: "concurrent"
+ cores: []
intel_pmu:
event_definitions: ["/etc/telegraf/events_definition.json"]
core_events:
--
2.25.1

View File

@ -1 +1,2 @@
0001-Patch-telegraf-helm-chart.patch
0002-Patch-telegraf-add-core-selection.patch