Skip to content

Commit

Permalink
lumberjack - refactor loop logic into method (#33071)
Browse files Browse the repository at this point in the history
Move the batch processing logic contained with the lumberjack server
into its own method.

The closure passed to newBatchACKTracker referenced the loop
variable whose value would change on each iteration. This resulted
in incorrect ACKing behavior.
  • Loading branch information
andrewkroh authored and chrisberkhout committed Jun 1, 2023
1 parent e8695d3 commit 76838f6
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions x-pack/filebeat/input/lumberjack/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/go-lumber/lj"
lumber "github.com/elastic/go-lumber/server"
)

Expand Down Expand Up @@ -68,35 +69,39 @@ func (s *server) Close() error {
func (s *server) Run() error {
// Process batches until the input is stopped.
for batch := range s.ljSvr.ReceiveChan() {
s.metrics.batchesReceivedTotal.Inc()
s.processBatch(batch)
}

if len(batch.Events) == 0 {
batch.ACK()
s.metrics.batchesACKedTotal.Inc()
continue
}
s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events)))

// Track all the Beat events associated to the Lumberjack batch so that
// the batch can be ACKed after the Beat events are delivered successfully.
start := time.Now()
acker := newBatchACKTracker(func() {
batch.ACK()
s.metrics.batchesACKedTotal.Inc()
s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
})

for _, ljEvent := range batch.Events {
acker.Add()
s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker))
}
return nil
}

// Mark the batch as "ready" after Beat events are generated for each
// Lumberjack event.
acker.Ready()
func (s *server) processBatch(batch *lj.Batch) {
s.metrics.batchesReceivedTotal.Inc()

if len(batch.Events) == 0 {
batch.ACK()
s.metrics.batchesACKedTotal.Inc()
return
}
s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events)))

// Track all the Beat events associated to the Lumberjack batch so that
// the batch can be ACKed after the Beat events are delivered successfully.
start := time.Now()
acker := newBatchACKTracker(func() {
batch.ACK()
s.metrics.batchesACKedTotal.Inc()
s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
})

return nil
for _, ljEvent := range batch.Events {
acker.Add()
s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker))
}

// Mark the batch as "ready" after Beat events are generated for each
// Lumberjack event.
acker.Ready()
}

func makeEvent(remoteAddr string, tlsState *tls.ConnectionState, lumberjackEvent interface{}, acker *batchACKTracker) beat.Event {
Expand Down

0 comments on commit 76838f6

Please sign in to comment.