diff --git a/x-pack/filebeat/input/lumberjack/server.go b/x-pack/filebeat/input/lumberjack/server.go index 96d0366e2b5..f12d988cb5b 100644 --- a/x-pack/filebeat/input/lumberjack/server.go +++ b/x-pack/filebeat/input/lumberjack/server.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/transport/tlscommon" + "github.com/elastic/go-lumber/lj" lumber "github.com/elastic/go-lumber/server" ) @@ -68,35 +69,39 @@ func (s *server) Close() error { func (s *server) Run() error { // Process batches until the input is stopped. for batch := range s.ljSvr.ReceiveChan() { - s.metrics.batchesReceivedTotal.Inc() + s.processBatch(batch) + } - if len(batch.Events) == 0 { - batch.ACK() - s.metrics.batchesACKedTotal.Inc() - continue - } - s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events))) - - // Track all the Beat events associated to the Lumberjack batch so that - // the batch can be ACKed after the Beat events are delivered successfully. - start := time.Now() - acker := newBatchACKTracker(func() { - batch.ACK() - s.metrics.batchesACKedTotal.Inc() - s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) - }) - - for _, ljEvent := range batch.Events { - acker.Add() - s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker)) - } + return nil +} - // Mark the batch as "ready" after Beat events are generated for each - // Lumberjack event. - acker.Ready() +func (s *server) processBatch(batch *lj.Batch) { + s.metrics.batchesReceivedTotal.Inc() + + if len(batch.Events) == 0 { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + return } + s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events))) + + // Track all the Beat events associated to the Lumberjack batch so that + // the batch can be ACKed after the Beat events are delivered successfully. + start := time.Now() + acker := newBatchACKTracker(func() { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) + }) - return nil + for _, ljEvent := range batch.Events { + acker.Add() + s.publish(makeEvent(batch.RemoteAddr, batch.TLS, ljEvent, acker)) + } + + // Mark the batch as "ready" after Beat events are generated for each + // Lumberjack event. + acker.Ready() } func makeEvent(remoteAddr string, tlsState *tls.ConnectionState, lumberjackEvent interface{}, acker *batchACKTracker) beat.Event {