Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add host inventory metrics to system module #20415

Merged
merged 18 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `scope` setting for elasticsearch module, allowing it to monitor an Elasticsearch cluster behind a load-balancing proxy. {issue}18539[18539] {pull}18547[18547]
- Add host inventory metrics to azure compute_vm metricset. {pull}20641[20641]
- Add host inventory metrics to googlecloud compute metricset. {pull}20391[20391]
- Add host inventory metrics to system module. {pull}20415[20415]

*Packetbeat*

Expand Down
4 changes: 3 additions & 1 deletion metricbeat/module/system/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}

event := common.MapStr{"cores": cpu.NumCores}

hostFields := common.MapStr{}
for _, metric := range m.config.Metrics {
switch strings.ToLower(metric) {
case percentages:
Expand All @@ -95,6 +95,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
event.Put("softirq.norm.pct", normalizedPct.SoftIRQ)
event.Put("steal.norm.pct", normalizedPct.Steal)
event.Put("total.norm.pct", normalizedPct.Total)
hostFields.Put("host.cpu.pct", normalizedPct.Total)
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other metricsets this PR adds host data to just constructs a MapStr in Event(). I wonder if we want to do that instead for the sake of consistency?

case ticks:
ticks := sample.Ticks()
event.Put("user.ticks", ticks.User)
Expand All @@ -109,6 +110,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}

r.Event(mb.Event{
RootFields: hostFields,
MetricSetFields: event,
})

Expand Down
32 changes: 32 additions & 0 deletions metricbeat/module/system/diskio/diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type MetricSet struct {
mb.BaseMetricSet
statistics *DiskIOStat
includeDevices []string
prevCounters diskCounter
}

// diskCounter stores previous disk counter values for calculating gauges in next collection
type diskCounter struct {
prevDiskReadBytes uint64
prevDiskWriteBytes uint64
}

// New is a mb.MetricSetFactory that returns a new MetricSet.
Expand All @@ -54,6 +61,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
statistics: NewDiskIOStat(),
includeDevices: config.IncludeDevices,
prevCounters: diskCounter{},
}, nil
}

Expand All @@ -70,6 +78,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Store the last cpu counter when finished
defer m.statistics.CloseSampling()

var diskReadBytes, diskWriteBytes uint64
for _, counters := range stats {
event := common.MapStr{
"name": counters.Name,
Expand All @@ -87,6 +96,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
"time": counters.IoTime,
},
}

// accumulate values from all interfaces
diskReadBytes += counters.ReadBytes
diskWriteBytes += counters.WriteBytes

var extraMetrics DiskIOMetric
err := m.statistics.CalIOStatistics(&extraMetrics, counters)
if err == nil {
Expand Down Expand Up @@ -135,5 +149,23 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}
}

if m.prevCounters != (diskCounter{}) {
// convert network metrics from counters to gauges
r.Event(mb.Event{
RootFields: common.MapStr{
"host": common.MapStr{
"disk": common.MapStr{
"read.bytes": diskReadBytes - m.prevCounters.prevDiskReadBytes,
"write.bytes": diskWriteBytes - m.prevCounters.prevDiskWriteBytes,
},
},
},
})
}

// update prevCounters
m.prevCounters.prevDiskReadBytes = diskReadBytes
m.prevCounters.prevDiskWriteBytes = diskWriteBytes

return nil
}
47 changes: 46 additions & 1 deletion metricbeat/module/system/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ func init() {
// MetricSet for fetching system network IO metrics.
type MetricSet struct {
mb.BaseMetricSet
interfaces map[string]struct{}
interfaces map[string]struct{}
prevCounters networkCounter
}

// networkCounter stores previous network counter values for calculating gauges in next collection
type networkCounter struct {
prevNetworkInBytes uint64
prevNetworkInPackets uint64
prevNetworkOutBytes uint64
prevNetworkOutPackets uint64
}

// New is a mb.MetricSetFactory that returns a new MetricSet.
Expand All @@ -69,6 +78,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
interfaces: interfaceSet,
prevCounters: networkCounter{},
}, nil
}

Expand All @@ -79,6 +89,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return errors.Wrap(err, "network io counters")
}

var networkInBytes, networkOutBytes, networkInPackets, networkOutPackets uint64

for _, counters := range stats {
if m.interfaces != nil {
// Select stats by interface name.
Expand All @@ -91,11 +103,44 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isOpen := r.Event(mb.Event{
MetricSetFields: ioCountersToMapStr(counters),
})

// accumulate values from all interfaces
networkInBytes += counters.BytesRecv
networkOutBytes += counters.BytesSent
networkInPackets += counters.PacketsRecv
networkOutPackets += counters.PacketsSent

if !isOpen {
return nil
}
}

if m.prevCounters != (networkCounter{}) {
// convert network metrics from counters to gauges
r.Event(mb.Event{
RootFields: common.MapStr{
"host": common.MapStr{
"network": common.MapStr{
"in": common.MapStr{
"bytes": networkInBytes - m.prevCounters.prevNetworkInBytes,
"packets": networkInPackets - m.prevCounters.prevNetworkInPackets,
},
"out": common.MapStr{
"bytes": networkOutBytes - m.prevCounters.prevNetworkOutBytes,
"packets": networkOutPackets - m.prevCounters.prevNetworkOutPackets,
},
},
},
},
})
}

// update prevCounters
m.prevCounters.prevNetworkInBytes = networkInBytes
m.prevCounters.prevNetworkInPackets = networkInPackets
m.prevCounters.prevNetworkOutBytes = networkOutBytes
m.prevCounters.prevNetworkOutPackets = networkOutPackets

return nil
}

Expand Down
38 changes: 30 additions & 8 deletions metricbeat/module/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
SYSTEM_NETWORK_FIELDS = ["name", "out.bytes", "in.bytes", "out.packets",
"in.packets", "in.error", "out.error", "in.dropped", "out.dropped"]

SYSTEM_CPU_HOST_FIELDS = ["pct"]

SYSTEM_NETWORK_HOST_FIELDS = ["in.bytes", "out.bytes", "in.packets", "out.packets"]

SYSTEM_DISK_HOST_FIELDS = ["read.bytes", "write.bytes"]

# cmdline is also part of the system process fields, but it may not be present
# for some kernel level processes. fd is also part of the system process, but
# is not available on all OSes and requires root to read for all processes.
Expand Down Expand Up @@ -76,8 +82,12 @@ def test_cpu(self):
evt = output[0]
self.assert_fields_are_documented(evt)

cpu = evt["system"]["cpu"]
self.assertCountEqual(self.de_dot(SYSTEM_CPU_FIELDS), cpu.keys())
if "system" in evt:
cpu = evt["system"]["cpu"]
self.assertCountEqual(self.de_dot(SYSTEM_CPU_FIELDS), cpu.keys())
else:
host_cpu = evt["host"]["cpu"]
self.assertCountEqual(self.de_dot(SYSTEM_CPU_HOST_FIELDS), host_cpu.keys())

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_cpu_ticks_option(self):
Expand Down Expand Up @@ -198,8 +208,12 @@ def test_diskio(self):
for evt in output:
self.assert_fields_are_documented(evt)
if 'error' not in evt:
diskio = evt["system"]["diskio"]
self.assertCountEqual(self.de_dot(SYSTEM_DISKIO_FIELDS), diskio.keys())
if "system" in evt:
diskio = evt["system"]["diskio"]
self.assertCountEqual(self.de_dot(SYSTEM_DISKIO_FIELDS), diskio.keys())
elif "host" in evt:
host_disk = evt["host"]["disk"]
self.assertCountEqual(SYSTEM_DISK_HOST_FIELDS, host_disk.keys())

@unittest.skipUnless(re.match("(?i)linux", sys.platform), "os")
def test_diskio_linux(self):
Expand All @@ -221,8 +235,12 @@ def test_diskio_linux(self):

for evt in output:
self.assert_fields_are_documented(evt)
diskio = evt["system"]["diskio"]
self.assertCountEqual(self.de_dot(SYSTEM_DISKIO_FIELDS_LINUX), diskio.keys())
if "system" in evt:
diskio = evt["system"]["diskio"]
self.assertCountEqual(self.de_dot(SYSTEM_DISKIO_FIELDS_LINUX), diskio.keys())
else:
host_disk = evt["host"]["disk"]
self.assertCountEqual(SYSTEM_DISK_HOST_FIELDS, host_disk.keys())

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_filesystem(self):
Expand Down Expand Up @@ -330,8 +348,12 @@ def test_network(self):

for evt in output:
self.assert_fields_are_documented(evt)
network = evt["system"]["network"]
self.assertCountEqual(self.de_dot(SYSTEM_NETWORK_FIELDS), network.keys())
if "system" in evt:
network = evt["system"]["network"]
self.assertCountEqual(self.de_dot(SYSTEM_NETWORK_FIELDS), network.keys())
else:
host_network = evt["host"]["network"]
self.assertCountEqual(self.de_dot(SYSTEM_NETWORK_HOST_FIELDS), host_network.keys())

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os")
def test_process_summary(self):
Expand Down