237 lines
9.0 KiB
Diff
237 lines
9.0 KiB
Diff
From 05d41aed1c5769b4ab2d7c79a5549548533c6cff Mon Sep 17 00:00:00 2001
|
|
From: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
|
|
Date: Mon, 19 Jun 2023 11:27:51 -0300
|
|
Subject: [PATCH 2/3] fix(inputs.intel_powerstat): reduce msr read latency on
|
|
preempt-rt kernels
|
|
|
|
Using cyclictest utility from rt-tests [1] results on latency increase
|
|
when telegraf is executed on a preempt-rt kernel with isolated cores.
|
|
This increase occurs because of the concurrent read of MSR with goroutines.
|
|
|
|
To fix this issue, the read_method parameter was added.
|
|
This parameter accepts two values, concurrent or sequential.
|
|
Concurrent method is the current behavior using goroutines to read
|
|
each MSR value concurrently.
|
|
The sequential method reads each value sequentially. This reduces latency
|
|
overhead, but might cause loss of precision on metrics calculation.
|
|
|
|
NOTE: This issue was reported on upstream:
|
|
https://github.com/influxdata/telegraf/issues/13828
|
|
|
|
[1] https://git.kernel.org/pub/scm/utils/rt-tests/rt-tests.git
|
|
|
|
Signed-off-by: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
|
|
---
|
|
.../inputs/intel_powerstat/intel_powerstat.go | 12 ++-
|
|
plugins/inputs/intel_powerstat/msr.go | 91 +++++++++++++++----
|
|
plugins/inputs/intel_powerstat/msr_test.go | 3 +-
|
|
3 files changed, 88 insertions(+), 18 deletions(-)
|
|
|
|
diff --git a/plugins/inputs/intel_powerstat/intel_powerstat.go b/plugins/inputs/intel_powerstat/intel_powerstat.go
|
|
index 25cdd96b7..e05f8a642 100644
|
|
--- a/plugins/inputs/intel_powerstat/intel_powerstat.go
|
|
+++ b/plugins/inputs/intel_powerstat/intel_powerstat.go
|
|
@@ -37,10 +37,15 @@ const (
|
|
percentageMultiplier = 100
|
|
)
|
|
|
|
+var (
|
|
+ knownReadMethods = []string{"concurrent", "sequential"}
|
|
+)
|
|
+
|
|
// PowerStat plugin enables monitoring of platform metrics (power, TDP) and Core metrics like temperature, power and utilization.
|
|
type PowerStat struct {
|
|
CPUMetrics []string `toml:"cpu_metrics"`
|
|
PackageMetrics []string `toml:"package_metrics"`
|
|
+ ReadMethod string `toml:"read_method"`
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
fs fileService
|
|
@@ -81,6 +86,10 @@ func (p *PowerStat) Init() error {
|
|
return err
|
|
}
|
|
|
|
+ if !contains(knownReadMethods, p.ReadMethod) {
|
|
+ return fmt.Errorf("provided read method %q is not valid", p.ReadMethod)
|
|
+ }
|
|
+
|
|
p.initMSR()
|
|
p.initRaplService()
|
|
|
|
@@ -96,7 +105,7 @@ func (p *PowerStat) initMSR() {
|
|
// Initialize MSR service only when there is at least one metric enabled
|
|
if p.cpuFrequency || p.cpuBusyFrequency || p.cpuTemperature || p.cpuC0StateResidency || p.cpuC1StateResidency ||
|
|
p.cpuC6StateResidency || p.cpuBusyCycles || p.packageTurboLimit || p.packageUncoreFrequency || p.packageCPUBaseFrequency {
|
|
- p.msr = newMsrServiceWithFs(p.Log, p.fs)
|
|
+ p.msr = newMsrServiceWithFs(p.Log, p.fs, p.ReadMethod)
|
|
}
|
|
}
|
|
|
|
@@ -903,6 +912,7 @@ func newPowerStat(fs fileService) *PowerStat {
|
|
skipFirstIteration: true,
|
|
fs: fs,
|
|
logOnce: make(map[string]error),
|
|
+ ReadMethod: "concurrent",
|
|
}
|
|
|
|
return p
|
|
diff --git a/plugins/inputs/intel_powerstat/msr.go b/plugins/inputs/intel_powerstat/msr.go
|
|
index 52690c4a1..2b5902f2c 100644
|
|
--- a/plugins/inputs/intel_powerstat/msr.go
|
|
+++ b/plugins/inputs/intel_powerstat/msr.go
|
|
@@ -62,6 +62,7 @@ type msrServiceImpl struct {
|
|
msrOffsets []int64
|
|
fs fileService
|
|
log telegraf.Logger
|
|
+ readMethod string
|
|
}
|
|
|
|
func (m *msrServiceImpl) getCPUCoresData() map[string]*msrData {
|
|
@@ -195,11 +196,39 @@ func (m *msrServiceImpl) readSingleMsr(core string, msr string) (uint64, error)
|
|
return value, nil
|
|
}
|
|
|
|
-func (m *msrServiceImpl) readDataFromMsr(core string, reader io.ReaderAt) error {
|
|
- g, ctx := errgroup.WithContext(context.Background())
|
|
+func (m *msrServiceImpl) sequentialMSRRead(reader io.ReaderAt) (msrData, error) {
|
|
+ // Create and populate a map that contains msr offsets along with their respective values
|
|
+ msrOffsets := make(map[int64]uint64)
|
|
+ var data msrData
|
|
+
|
|
+ for _, offset := range m.msrOffsets {
|
|
+ value, err := m.fs.readFileAtOffsetToUint64(reader, offset)
|
|
+ if err != nil {
|
|
+ return data, fmt.Errorf("error reading MSR value %x: %w", offset, err)
|
|
+ }
|
|
+ msrOffsets[offset] = value
|
|
+ }
|
|
|
|
+ throttleTemp := msrOffsets[throttleTemperatureLocation]
|
|
+ temp := msrOffsets[temperatureLocation]
|
|
+
|
|
+ data.c3 = msrOffsets[c3StateResidencyLocation]
|
|
+ data.c6 = msrOffsets[c6StateResidencyLocation]
|
|
+ data.c7 = msrOffsets[c7StateResidencyLocation]
|
|
+ data.mperf = msrOffsets[maximumFrequencyClockCountLocation]
|
|
+ data.aperf = msrOffsets[actualFrequencyClockCountLocation]
|
|
+ data.timeStampCounter = msrOffsets[timestampCounterLocation]
|
|
+ data.throttleTemp = int64((throttleTemp >> 16) & 0xFF)
|
|
+ data.temp = int64((temp >> 16) & 0xFF)
|
|
+
|
|
+ return data, nil
|
|
+}
|
|
+
|
|
+func (m *msrServiceImpl) concurrentMSRRead(reader io.ReaderAt) (msrData, error) {
|
|
+ g, ctx := errgroup.WithContext(context.Background())
|
|
// Create and populate a map that contains msr offsets along with their respective channels
|
|
msrOffsetsWithChannels := make(map[int64]chan uint64)
|
|
+ var data msrData
|
|
for _, offset := range m.msrOffsets {
|
|
msrOffsetsWithChannels[offset] = make(chan uint64)
|
|
}
|
|
@@ -221,19 +250,48 @@ func (m *msrServiceImpl) readDataFromMsr(core string, reader io.ReaderAt) error
|
|
}(offset, channel)
|
|
}
|
|
|
|
- newC3 := <-msrOffsetsWithChannels[c3StateResidencyLocation]
|
|
- newC6 := <-msrOffsetsWithChannels[c6StateResidencyLocation]
|
|
- newC7 := <-msrOffsetsWithChannels[c7StateResidencyLocation]
|
|
- newMperf := <-msrOffsetsWithChannels[maximumFrequencyClockCountLocation]
|
|
- newAperf := <-msrOffsetsWithChannels[actualFrequencyClockCountLocation]
|
|
- newTsc := <-msrOffsetsWithChannels[timestampCounterLocation]
|
|
- newThrottleTemp := <-msrOffsetsWithChannels[throttleTemperatureLocation]
|
|
- newTemp := <-msrOffsetsWithChannels[temperatureLocation]
|
|
+ throttleTemp := <-msrOffsetsWithChannels[throttleTemperatureLocation]
|
|
+ temp := <-msrOffsetsWithChannels[temperatureLocation]
|
|
+
|
|
+ data.c3 = <-msrOffsetsWithChannels[c3StateResidencyLocation]
|
|
+ data.c6 = <-msrOffsetsWithChannels[c6StateResidencyLocation]
|
|
+ data.c7 = <-msrOffsetsWithChannels[c7StateResidencyLocation]
|
|
+ data.mperf = <-msrOffsetsWithChannels[maximumFrequencyClockCountLocation]
|
|
+ data.aperf = <-msrOffsetsWithChannels[actualFrequencyClockCountLocation]
|
|
+ data.timeStampCounter = <-msrOffsetsWithChannels[timestampCounterLocation]
|
|
+ data.throttleTemp = int64((throttleTemp >> 16) & 0xFF)
|
|
+ data.temp = int64((temp >> 16) & 0xFF)
|
|
|
|
if err := g.Wait(); err != nil {
|
|
- return fmt.Errorf("received error during reading MSR values in goroutines: %w", err)
|
|
+ return data, fmt.Errorf("received error during reading MSR values in goroutines: %w", err)
|
|
+ }
|
|
+
|
|
+ return data, nil
|
|
+}
|
|
+
|
|
+func (m *msrServiceImpl) readDataFromMsr(core string, reader io.ReaderAt) error {
|
|
+ var data msrData
|
|
+ var err error
|
|
+ switch m.readMethod {
|
|
+ case "concurrent":
|
|
+ data, err = m.concurrentMSRRead(reader)
|
|
+ case "sequential":
|
|
+ data, err = m.sequentialMSRRead(reader)
|
|
}
|
|
|
|
+ if err != nil {
|
|
+ return err
|
|
+ }
|
|
+
|
|
+ newC3 := data.c3
|
|
+ newC6 := data.c6
|
|
+ newC7 := data.c7
|
|
+ newMperf := data.mperf
|
|
+ newAperf := data.aperf
|
|
+ newTsc := data.timeStampCounter
|
|
+ newThrottleTemp := data.throttleTemp
|
|
+ newTemp := data.temp
|
|
+
|
|
m.cpuCoresData[core].c3Delta = newC3 - m.cpuCoresData[core].c3
|
|
m.cpuCoresData[core].c6Delta = newC6 - m.cpuCoresData[core].c6
|
|
m.cpuCoresData[core].c7Delta = newC7 - m.cpuCoresData[core].c7
|
|
@@ -248,9 +306,9 @@ func (m *msrServiceImpl) readDataFromMsr(core string, reader io.ReaderAt) error
|
|
m.cpuCoresData[core].aperf = newAperf
|
|
m.cpuCoresData[core].timeStampCounter = newTsc
|
|
// MSR (1A2h) IA32_TEMPERATURE_TARGET bits 23:16.
|
|
- m.cpuCoresData[core].throttleTemp = int64((newThrottleTemp >> 16) & 0xFF)
|
|
+ m.cpuCoresData[core].throttleTemp = newThrottleTemp
|
|
// MSR (19Ch) IA32_THERM_STATUS bits 22:16.
|
|
- m.cpuCoresData[core].temp = int64((newTemp >> 16) & 0x7F)
|
|
+ m.cpuCoresData[core].temp = newTemp
|
|
|
|
return nil
|
|
}
|
|
@@ -309,10 +367,11 @@ func (m *msrServiceImpl) setCPUCores() error {
|
|
return nil
|
|
}
|
|
|
|
-func newMsrServiceWithFs(logger telegraf.Logger, fs fileService) *msrServiceImpl {
|
|
+func newMsrServiceWithFs(logger telegraf.Logger, fs fileService, readMethod string) *msrServiceImpl {
|
|
msrService := &msrServiceImpl{
|
|
- fs: fs,
|
|
- log: logger,
|
|
+ fs: fs,
|
|
+ log: logger,
|
|
+ readMethod: readMethod,
|
|
}
|
|
err := msrService.setCPUCores()
|
|
if err != nil {
|
|
diff --git a/plugins/inputs/intel_powerstat/msr_test.go b/plugins/inputs/intel_powerstat/msr_test.go
|
|
index 5090d10a6..708297a90 100644
|
|
--- a/plugins/inputs/intel_powerstat/msr_test.go
|
|
+++ b/plugins/inputs/intel_powerstat/msr_test.go
|
|
@@ -179,10 +179,11 @@ func verifyCPUCoresData(cores []string, t *testing.T, msr *msrServiceImpl, expec
|
|
func getMsrServiceWithMockedFs() (*msrServiceImpl, *mockFileService) {
|
|
cores := []string{"cpu0", "cpu1", "cpu2", "cpu3"}
|
|
logger := testutil.Logger{Name: "PowerPluginTest"}
|
|
+ readMethod := "concurrent"
|
|
fsMock := &mockFileService{}
|
|
fsMock.On("getStringsMatchingPatternOnPath", mock.Anything).
|
|
Return(cores, nil).Once()
|
|
- msr := newMsrServiceWithFs(logger, fsMock)
|
|
+ msr := newMsrServiceWithFs(logger, fsMock, readMethod)
|
|
|
|
return msr, fsMock
|
|
}
|
|
--
|
|
2.25.1
|
|
|