Skip to content

Commit

Permalink
dynamically generate CPU counts for metrics (#23154)
Browse files Browse the repository at this point in the history
* dynamically generate CPU counts for metrics

* dynamic-numcpu

* fix exported vars

* clean up CPU code

* fix CPU count in system/cpu

* update load

* add changelog

* fix tests
  • Loading branch information
fearful-symmetry committed Dec 18, 2020
1 parent 8ada7b1 commit 40aeeef
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]
- Remove `NumCPU` as clients should update the CPU count on the fly in case of config changes in a VM. {pull}23154[23154]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func reportSystemCPUUsage(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

monitoring.ReportInt(V, "cores", int64(process.NumCPU))
monitoring.ReportInt(V, "cores", int64(runtime.NumCPU()))
}

func reportRuntime(_ monitoring.Mode, V monitoring.Visitor) {
Expand Down
23 changes: 9 additions & 14 deletions libbeat/metric/system/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
sigar "github.com/elastic/gosigar"
)

var (
// NumCores is the number of CPU cores in the system. Changes to operating
// system CPU allocation after process startup are not reflected.
NumCores = runtime.NumCPU()
)

// CPU Monitor

// Monitor is used to monitor the overall CPU usage of the system.
Expand Down Expand Up @@ -83,16 +77,16 @@ type Metrics struct {
}

// NormalizedPercentages returns CPU percentage usage information that is
// normalized by the number of CPU cores (NumCores). The values will range from
// normalized by the number of CPU cores. The values will range from
// 0 to 100%.
func (m *Metrics) NormalizedPercentages() Percentages {
return cpuPercentages(m.previousSample, m.currentSample, 1)
}

// Percentages returns CPU percentage usage information. The values range from
// 0 to 100% * NumCores.
// 0 to 100% * NumCPU.
func (m *Metrics) Percentages() Percentages {
return cpuPercentages(m.previousSample, m.currentSample, NumCores)
return cpuPercentages(m.previousSample, m.currentSample, runtime.NumCPU())
}

// cpuPercentages calculates the amount of CPU time used between the two given
Expand Down Expand Up @@ -215,7 +209,7 @@ type LoadAverages struct {
}

// Averages return the CPU load averages. These values should range from
// 0 to NumCores.
// 0 to NumCPU.
func (m *LoadMetrics) Averages() LoadAverages {
return LoadAverages{
OneMinute: common.Round(m.sample.One, common.DefaultDecimalPlacesCount),
Expand All @@ -224,12 +218,13 @@ func (m *LoadMetrics) Averages() LoadAverages {
}
}

// NormalizedAverages return the CPU load averages normalized by the NumCores.
// NormalizedAverages return the CPU load averages normalized by the NumCPU.
// These values should range from 0 to 1.
func (m *LoadMetrics) NormalizedAverages() LoadAverages {
cpus := runtime.NumCPU()
return LoadAverages{
OneMinute: common.Round(m.sample.One/float64(NumCores), common.DefaultDecimalPlacesCount),
FiveMinute: common.Round(m.sample.Five/float64(NumCores), common.DefaultDecimalPlacesCount),
FifteenMinute: common.Round(m.sample.Fifteen/float64(NumCores), common.DefaultDecimalPlacesCount),
OneMinute: common.Round(m.sample.One/float64(cpus), common.DefaultDecimalPlacesCount),
FiveMinute: common.Round(m.sample.Five/float64(cpus), common.DefaultDecimalPlacesCount),
FifteenMinute: common.Round(m.sample.Fifteen/float64(cpus), common.DefaultDecimalPlacesCount),
}
}
23 changes: 15 additions & 8 deletions libbeat/metric/system/cpu/cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
"github.com/elastic/gosigar"
)

var (
// numCores is the number of CPU cores in the system. Changes to operating
// system CPU allocation after process startup are not reflected.
numCores = runtime.NumCPU()
)

func TestMonitorSample(t *testing.T) {
cpu := &Monitor{lastSample: &gosigar.Cpu{}}
s, err := cpu.Sample()
Expand All @@ -55,7 +61,7 @@ func TestMonitorSample(t *testing.T) {
}

func TestCoresMonitorSample(t *testing.T) {
cores := &CoresMonitor{lastSample: make([]gosigar.Cpu, NumCores)}
cores := &CoresMonitor{lastSample: make([]gosigar.Cpu, numCores)}
sample, err := cores.Sample()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -102,8 +108,8 @@ func TestMetricsRounding(t *testing.T) {
// TestMetricsPercentages tests that Metrics returns the correct
// percentages and normalized percentages.
func TestMetricsPercentages(t *testing.T) {
NumCores = 10
defer func() { NumCores = runtime.NumCPU() }()
numCores = 10
defer func() { numCores = runtime.NumCPU() }()

// This test simulates 30% user and 70% system (normalized), or 3% and 7%
// respectively when there are 10 CPUs.
Expand Down Expand Up @@ -132,9 +138,10 @@ func TestMetricsPercentages(t *testing.T) {
assert.EqualValues(t, .0, pct.Idle)
assert.EqualValues(t, 1., pct.Total)

pct = sample.Percentages()
assert.EqualValues(t, .3*float64(NumCores), pct.User)
assert.EqualValues(t, .7*float64(NumCores), pct.System)
assert.EqualValues(t, .0*float64(NumCores), pct.Idle)
assert.EqualValues(t, 1.*float64(NumCores), pct.Total)
//bypass the Metrics API so we can have a constant CPU value
pct = cpuPercentages(&s0, &s1, numCores)
assert.EqualValues(t, .3*float64(numCores), pct.User)
assert.EqualValues(t, .7*float64(numCores), pct.System)
assert.EqualValues(t, .0*float64(numCores), pct.Idle)
assert.EqualValues(t, 1.*float64(numCores), pct.Total)
}
6 changes: 3 additions & 3 deletions libbeat/metric/system/diskio/diskstat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package diskio

import (
"runtime"

"github.com/pkg/errors"
"github.com/shirou/gopsutil/disk"

"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
)

// GetCLKTCK emulates the _SC_CLK_TCK syscall
Expand Down Expand Up @@ -63,7 +63,7 @@ func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, err
}

// calculate the delta ms between the CloseSampling and OpenSampling
deltams := 1000.0 * float64(stat.curCPU.Total()-stat.lastCPU.Total()) / float64(cpu.NumCores) / float64(GetCLKTCK())
deltams := 1000.0 * float64(stat.curCPU.Total()-stat.lastCPU.Total()) / float64(runtime.NumCPU()) / float64(GetCLKTCK())
if deltams <= 0 {
return IOMetric{}, errors.New("The delta cpu time between close sampling and open sampling is less or equal to 0")
}
Expand Down
5 changes: 1 addition & 4 deletions libbeat/metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ import (
sigar "github.com/elastic/gosigar"
)

// NumCPU is the number of CPUs of the host
var NumCPU = runtime.NumCPU()

// ProcsMap is a map where the keys are the names of processes and the value is the Process with that name
type ProcsMap map[int]*Process

Expand Down Expand Up @@ -365,7 +362,7 @@ func GetProcCPUPercentage(s0, s1 *Process) (normalizedPct, pct, totalPct float64
totalCPUDeltaMillis := int64(s1.Cpu.Total - s0.Cpu.Total)

pct := float64(totalCPUDeltaMillis) / float64(timeDeltaMillis)
normalizedPct := pct / float64(NumCPU)
normalizedPct := pct / float64(runtime.NumCPU())

return common.Round(normalizedPct, common.DefaultDecimalPlacesCount),
common.Round(pct, common.DefaultDecimalPlacesCount),
Expand Down
14 changes: 10 additions & 4 deletions libbeat/metric/system/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/elastic/gosigar"
)

// numCPU is the number of CPUs of the host
var numCPU = runtime.NumCPU()

func TestPids(t *testing.T) {
pids, err := Pids()

Expand Down Expand Up @@ -157,11 +160,14 @@ func TestProcCpuPercentage(t *testing.T) {
SampleTime: p1.SampleTime.Add(time.Second),
}

NumCPU = 48
defer func() { NumCPU = runtime.NumCPU() }()

totalPercentNormalized, totalPercent, totalValue := GetProcCPUPercentage(p1, p2)
assert.EqualValues(t, 0.0721, totalPercentNormalized)
//GetProcCPUPercentage wil return a number that varies based on the host, due to NumCPU()
// So "un-normalize" it, then re-normalized with a constant.
cpu := float64(runtime.NumCPU())
unNormalized := totalPercentNormalized * cpu
normalizedTest := common.Round(unNormalized/48, common.DefaultDecimalPlacesCount)

assert.EqualValues(t, 0.0721, normalizedTest)
assert.EqualValues(t, 3.459, totalPercent)
assert.EqualValues(t, 14841, totalValue)
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/system/cpu/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func getPlatformCPUMetrics(sample *cpu.Metrics, selectors []string, event common

// gather CPU metrics
func collectCPUMetrics(selectors []string, sample *cpu.Metrics) mb.Event {
event := common.MapStr{"cores": cpu.NumCores}
event := common.MapStr{"cores": runtime.NumCPU()}
getPlatformCPUMetrics(sample, selectors, event)

//generate the host fields here, since we don't want users disabling it.
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/module/system/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package load

import (
"runtime"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -58,7 +60,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
normAvgs := load.NormalizedAverages()

event := common.MapStr{
"cores": cpu.NumCores,
"cores": runtime.NumCPU(),
"1": avgs.OneMinute,
"5": avgs.FiveMinute,
"15": avgs.FifteenMinute,
Expand Down

0 comments on commit 40aeeef

Please sign in to comment.