Skip to content

Commit

Permalink
Suppress too many errors (#26224)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2f9ae33)
  • Loading branch information
kvch authored and mergify-bot committed Jun 28, 2021
1 parent 284307e commit 9a37c3d
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
package input

import (
"context"
"fmt"
"strings"
"sync"
"syscall"
"time"

"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
Expand Down Expand Up @@ -158,6 +164,10 @@ func (i *Input) publishAll() {
go func() {
defer wg.Done()

suppressed := atomic.NewBool(false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-i.done:
Expand All @@ -168,6 +178,10 @@ func (i *Input) publishAll() {
event, err := r.Next()
if event == nil {
if err != nil {
if i.isErrSuppressed(ctx, err, suppressed) {
i.logger.Debugf("Error message suppressed: EBADMSG")
continue
}
i.logger.Errorf("Error while reading event: %v", err)
}
continue
Expand All @@ -191,6 +205,26 @@ func (i *Input) publishAll() {
}
}

// isErrSuppressed checks if the error is due to a corrupt journal. If yes, only the first error message
// is displayed and then it is suppressed for 5 seconds.
func (i *Input) isErrSuppressed(ctx context.Context, err error, suppressed *atomic.Bool) bool {
if strings.Contains(err.Error(), syscall.EBADMSG.Error()) {
if suppressed.Load() {
return true
}

suppressed.Store(true)
go func(ctx context.Context, suppressed *atomic.Bool) {
if err := timed.Wait(ctx, 5*time.Second); err == nil {
suppressed.Store(false)
}

}(ctx, suppressed)
}

return false
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
for _, r := range i.readers {
Expand Down

0 comments on commit 9a37c3d

Please sign in to comment.