diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index b45b99d1816..f1238d61c81 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -18,12 +18,18 @@ package input import ( + "context" "fmt" + "strings" "sync" + "syscall" + "time" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/journalbeat/checkpoint" @@ -158,6 +164,10 @@ func (i *Input) publishAll() { go func() { defer wg.Done() + suppressed := atomic.NewBool(false) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { select { case <-i.done: @@ -168,6 +178,10 @@ func (i *Input) publishAll() { event, err := r.Next() if event == nil { if err != nil { + if i.isErrSuppressed(ctx, err, suppressed) { + i.logger.Debugf("Error message suppressed: EBADMSG") + continue + } i.logger.Errorf("Error while reading event: %v", err) } continue @@ -191,6 +205,26 @@ func (i *Input) publishAll() { } } +// isErrSuppressed checks if the error is due to a corrupt journal. If yes, only the first error message +// is displayed and then it is suppressed for 5 seconds. +func (i *Input) isErrSuppressed(ctx context.Context, err error, suppressed *atomic.Bool) bool { + if strings.Contains(err.Error(), syscall.EBADMSG.Error()) { + if suppressed.Load() { + return true + } + + suppressed.Store(true) + go func(ctx context.Context, suppressed *atomic.Bool) { + if err := timed.Wait(ctx, 5*time.Second); err == nil { + suppressed.Store(false) + } + + }(ctx, suppressed) + } + + return false +} + // Stop stops all readers of the input. func (i *Input) Stop() { for _, r := range i.readers {