From cdbde879df36144aae906ab2b945a07e8446f5f8 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 16 Jun 2023 11:07:20 -0400 Subject: [PATCH] filebeat,packetbeat,winlogbeat - Register input metrics diagnostic hook Register an elastic-agent diagnostics hook to return the input metrics (encoded to JSON) in agent diagnostic dumps. --- CHANGELOG.next.asciidoc | 3 +++ filebeat/beater/filebeat.go | 12 +++++++++++ libbeat/monitoring/inputmon/httphandler.go | 13 ++++++++---- libbeat/monitoring/inputmon/input.go | 7 +++++++ libbeat/monitoring/inputmon/input_test.go | 23 ++++++++++++++++++++++ packetbeat/beater/packetbeat.go | 12 +++++++++++ winlogbeat/beater/winlogbeat.go | 12 +++++++++++ 7 files changed, 78 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eff5482954a7..7e9279040ee3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -311,6 +311,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add metrics for gcp-pubsub input. {pull}35614[35614] - [GCS] Added scheduler debug logs and improved the context passing mechanism by removing them from struct params and passing them as function arguments. {pull}35674[35674] - Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520] +- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] @@ -349,6 +350,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Added `packetbeat.interfaces.fanout_group` to allow a Packetbeat sniffer to join an AF_PACKET fanout group. {issue}35451[35451] {pull}35453[35453] - Add AF_PACKET metrics. {issue}35428[35428] {pull}35489[35489] +- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798] *Winlogbeat* @@ -360,6 +362,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Set `host.os.type` and `host.os.family` to "windows" if not already set. {pull}35435[35435] - Handle empty DNS answer data in QueryResults for the Sysmon Pipeline {pull}35207[35207] +- Under elastic-agent the input metrics will now be included in agent diagnostics dumps. {pull}35798[35798] *Elastic Log Driver* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 737c2aa9344b..532d8132110f 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -128,6 +128,18 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea } } + if b.Manager != nil { + b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.", + "input_metrics.json", "application/json", func() []byte { + data, err := inputmon.MetricSnapshotJSON() + if err != nil { + logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) + return []byte(err.Error()) + } + return data + }) + } + // Add inputs created by the modules config.Inputs = append(config.Inputs, moduleInputs...) diff --git a/libbeat/monitoring/inputmon/httphandler.go b/libbeat/monitoring/inputmon/httphandler.go index 46368ee5f79b..d4a8d9e4499d 100644 --- a/libbeat/monitoring/inputmon/httphandler.go +++ b/libbeat/monitoring/inputmon/httphandler.go @@ -64,7 +64,14 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) { return } - metrics := monitoring.CollectStructSnapshot(h.registry, monitoring.Full, false) + filtered := filteredSnapshot(h.registry, requestedType) + + w.Header().Set(contentType, applicationJSON) + serveJSON(w, filtered, requestedPretty) +} + +func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string]any { + metrics := monitoring.CollectStructSnapshot(r, monitoring.Full, false) filtered := make([]map[string]any, 0, len(metrics)) for _, ifc := range metrics { @@ -84,9 +91,7 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) { filtered = append(filtered, m) } - - w.Header().Set(contentType, applicationJSON) - serveJSON(w, filtered, requestedPretty) + return filtered } func serveJSON(w http.ResponseWriter, value any, pretty bool) { diff --git a/libbeat/monitoring/inputmon/input.go b/libbeat/monitoring/inputmon/input.go index 39d51d94cbe8..7814f79234f9 100644 --- a/libbeat/monitoring/inputmon/input.go +++ b/libbeat/monitoring/inputmon/input.go @@ -18,6 +18,7 @@ package inputmon import ( + "encoding/json" "strings" "github.com/google/uuid" @@ -78,3 +79,9 @@ func sanitizeID(id string) string { func globalRegistry() *monitoring.Registry { return monitoring.GetNamespace("dataset").GetRegistry() } + +// MetricSnapshotJSON returns a snapshot of the input metric values from the +// global 'dataset' monitoring namespace encoded as a JSON array (pretty formatted). +func MetricSnapshotJSON() ([]byte, error) { + return json.MarshalIndent(filteredSnapshot(globalRegistry(), ""), "", " ") +} diff --git a/libbeat/monitoring/inputmon/input_test.go b/libbeat/monitoring/inputmon/input_test.go index 011113fb5436..37377c6cb7c0 100644 --- a/libbeat/monitoring/inputmon/input_test.go +++ b/libbeat/monitoring/inputmon/input_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -79,3 +80,25 @@ func TestNewInputMonitor(t *testing.T) { }) } } + +func TestMetricSnapshotJSON(t *testing.T) { + globalRegistry().Clear() + defer globalRegistry().Clear() + + r, cancel := NewInputRegistry("test", "my-id", nil) + defer cancel() + monitoring.NewInt(r, "foo_total").Set(100) + + jsonBytes, err := MetricSnapshotJSON() + require.NoError(t, err) + + const expected = `[ + { + "foo_total": 100, + "id": "my-id", + "input": "test" + } +]` + + assert.Equal(t, expected, string(jsonBytes)) +} diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index fbc0e1c1fb90..29e61d7f8337 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -133,6 +133,18 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } } + if b.Manager != nil { + b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.", + "input_metrics.json", "application/json", func() []byte { + data, err := inputmon.MetricSnapshotJSON() + if err != nil { + logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) + return []byte(err.Error()) + } + return data + }) + } + if !b.Manager.Enabled() { return pb.runStatic(b, pb.factory) } diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index 89e2144d4c38..78e7f24ff0be 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -162,6 +162,18 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error { } } + if b.Manager != nil { + b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.", + "input_metrics.json", "application/json", func() []byte { + data, err := inputmon.MetricSnapshotJSON() + if err != nil { + logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) + return []byte(err.Error()) + } + return data + }) + } + var wg sync.WaitGroup for _, log := range eb.eventLogs { state := persistedState[log.source.Name()]