Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Add reporting interface with error #10727

Merged
merged 14 commits into from
Mar 6, 2019
7 changes: 5 additions & 2 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,18 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "ReportingMetricSetV2")
}

if _, ok := ms.(ReportingMetricSetV2Error); ok {
ifcs = append(ifcs, "ReportingMetricSetV2Error")
}

if _, ok := ms.(PushMetricSetV2); ok {
ifcs = append(ifcs, "PushMetricSetV2")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, ReportingMetricSetV2, PushMetricSet, or "+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+
"PushMetricSetV2)",
ms.Module().Name(), ms.Name())
case 1:
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ type ReportingMetricSetV2 interface {
Fetch(r ReporterV2)
}

// ReportingMetricSetV2Error is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2Error interface {
ruflin marked this conversation as resolved.
Show resolved Hide resolved
MetricSet
Fetch(r ReporterV2) error
}

// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
Expand Down
9 changes: 8 additions & 1 deletion metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
case mb.PushMetricSetV2:
ms.Run(reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2:
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
default:
// Earlier startup stages prevent this from happening.
Expand Down Expand Up @@ -236,6 +236,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
case mb.ReportingMetricSetV2:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V2())
case mb.ReportingMetricSetV2Error:
ruflin marked this conversation as resolved.
Show resolved Hide resolved
reporter.StartFetchTimer()
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
ruflin marked this conversation as resolved.
Show resolved Hide resolved
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down
27 changes: 26 additions & 1 deletion metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string)
return WriteEventsReporterV2Cond(f, t, path, nil)
}

// WriteEventsReporterV2Error fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, path string) error {
return WriteEventsReporterV2ErrorCond(f, t, path, nil)
}

// WriteEventsReporterV2Cond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error {
Expand All @@ -105,6 +111,25 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

// WriteEventsReporterV2ErrorCond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB, path string, cond func(common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, errs := ReportingFetchV2Error(f)
if len(errs) > 0 {
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error {
if len(events) == 0 {
return fmt.Errorf("no events were generated")
}
Expand Down Expand Up @@ -198,7 +223,7 @@ func SelectEvent(events []common.MapStr, cond func(e common.MapStr) bool) (commo
}

// SelectEventV2 selects the first event that matches an specific condition
func SelectEventV2(f mb.ReportingMetricSetV2, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) {
func SelectEventV2(f mb.MetricSet, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) {
if cond == nil && len(events) > 0 {
return events[0], nil
}
Expand Down
17 changes: 15 additions & 2 deletions metricbeat/mb/testing/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,21 @@ func runTest(t *testing.T, file string, module, metricSetName, url string) {
s := server(t, file, url)
defer s.Close()

metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL))
events, errs := ReportingFetchV2(metricSet)
metricSet := newMetricSet(t, getConfig(module, metricSetName, s.URL))

var events []mb.Event
var errs []error

switch v := metricSet.(type) {
case mb.ReportingMetricSetV2:
metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL))
events, errs = ReportingFetchV2(metricSet)
case mb.ReportingMetricSetV2Error:
metricSet := NewReportingMetricSetV2Error(t, getConfig(module, metricSetName, s.URL))
events, errs = ReportingFetchV2Error(metricSet)
default:
t.Fatalf("unknown type: %T", v)
}

// Gather errors to build also error events
for _, e := range errs {
Expand Down
24 changes: 24 additions & 0 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetri
return reportingMetricSetV2
}

// NewReportingMetricSetV2Error returns a new ReportingMetricSetV2 instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error {
metricSet := newMetricSet(t, config)

reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSetV2Error")
}

return reportingMetricSetV2Error
}

// CapturingReporterV2 is a reporter used for testing which stores all events and errors
type CapturingReporterV2 struct {
events []mb.Event
Expand Down Expand Up @@ -204,6 +217,17 @@ func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) {
return r.events, r.errs
}

// ReportingFetchV2Error runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) {
r := &CapturingReporterV2{}
err := metricSet.Fetch(r)
if err != nil {
r.errs = append(r.errs, err)
}
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/module/php_fpm/process/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ type phpFpmProcess struct {
LastRequestMemory int `json:"last request memory"`
}

func eventsMapping(r mb.ReporterV2, content []byte) {
func eventsMapping(r mb.ReporterV2, content []byte) error {
var status phpFpmStatus
err := json.Unmarshal(content, &status)
if err != nil {
r.Error(err)
return
return err
}
//remapping process details to match the naming format
for _, process := range status.Processes {
Expand Down Expand Up @@ -94,4 +93,5 @@ func eventsMapping(r mb.ReporterV2, content []byte) {
event.ModuleFields.Put("pool.name", status.Name)
r.Event(event)
}
return nil
}
10 changes: 4 additions & 6 deletions metricbeat/module/php_fpm/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
u, err := url.Parse(m.GetURI())
if err != nil {
report.Error(err)
return
return err
}
u, err = parse.SetQueryParams(u, "full")
if err == nil {
m.SetURI(u.String())
}
content, err := m.HTTP.FetchContent()
if err != nil {
report.Error(err)
return
return err
}
eventsMapping(report, content)
return eventsMapping(report, content)
}
4 changes: 2 additions & 2 deletions metricbeat/module/php_fpm/process/process_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "phpfpm")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
Expand Down