Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] lumberjack - refactor loop logic into method #33071

Merged

Conversation

andrewkroh
Copy link
Member

What does this PR do?

Move the batch processing logic contained with the lumberjack server into its own method.

The closure passed to newBatchACKTracker referenced the loop variable (line 84) whose value would change on each iteration. This resulted in incorrect ACKing behavior. This is a fix for a new unreleased input targeting v8.5.0 hence no changelog or backport.

for batch := range s.ljSvr.ReceiveChan() {
s.metrics.batchesReceivedTotal.Inc()
if len(batch.Events) == 0 {
batch.ACK()
s.metrics.batchesACKedTotal.Inc()
continue
}
s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events)))
// Track all the Beat events associated to the Lumberjack batch so that
// the batch can be ACKed after the Beat events are delivered successfully.
start := time.Now()
acker := newBatchACKTracker(func() {
batch.ACK()

Why is it important?

Any concurrent batch processing can result in incorrect behavior or panics.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

Unit testing didn't expose this since it requires multiple concurrent batches. My test strategy was to run three filebeats. One receiving data on 5044. And two others sending batches and exiting.

  • ./filebeat -e -c ./filebeat.lumberjack.yml > /dev/null
  • while [ true ]; do gtimeout 5 ./filebeat -e -c ./filebeat.logstash.yml --path.data=data1; rm -rf data; done
  • while [ true ]; do gtimeout 5 ./filebeat -e -c ./filebeat.logstash.yml --path.data=data2; rm -rf data; done

Related issues

Move the batch processing logic contained with the lumberjack server
into its own method.

The closure passed to newBatchACKTracker referenced the loop
variable whose value would change on each iteration. This resulted
in incorrect ACKing behavior.
@andrewkroh andrewkroh added the Filebeat Filebeat label Sep 13, 2022
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Sep 13, 2022
@mergify
Copy link
Contributor

mergify bot commented Sep 13, 2022

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @andrewkroh? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-09-13T19:54:58.714+0000

  • Duration: 73 min 11 sec

Test stats 🧪

Test Results
Failed 0
Passed 2237
Skipped 166
Total 2403

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Sep 13, 2022
@andrewkroh andrewkroh marked this pull request as ready for review September 13, 2022 23:42
@andrewkroh andrewkroh requested a review from a team as a code owner September 13, 2022 23:42
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@andrewkroh andrewkroh merged commit 804e2f6 into elastic:main Sep 14, 2022
@efd6
Copy link
Contributor

efd6 commented Sep 14, 2022

Can you paste an example panic here?

@andrewkroh
Copy link
Member Author

panic: close of closed channel

goroutine 23 [running]:
github.com/elastic/go-lumber/lj.(*Batch).ACK(...)
        github.com/elastic/go-lumber@v0.1.2-0.20220819171948-335fde24ea0f/lj/lj.go:55
github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack.(*server).Run.func1()
        github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack/server.go:84 +0x4c
github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack.(*batchACKTracker).ACK(0x0?)
        github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack/ack.go:60 +0xe8
github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack.newEventACKHandler.func1(0x0?, {0x14005788000?, 0x800, 0x0?})
        github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack/ack.go:73 +0x6c
github.com/elastic/beats/v7/libbeat/common/acker.(*eventDataACKer).onACK(0x140000719c0, 0x10062b65c?, 0x800)
        github.com/elastic/beats/v7/libbeat/common/acker/acker.go:257 +0x1a0
github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).ACKEvents(0x140010f9470, 0x800)
        github.com/elastic/beats/v7/libbeat/common/acker/acker.go:206 +0x344
github.com/elastic/beats/v7/libbeat/common/acker.(*clientOnlyACKer).ACKEvents(0x0?, 0x14000af81d8?)
        github.com/elastic/beats/v7/libbeat/common/acker/acker.go:329 +0xfc
github.com/elastic/beats/v7/libbeat/common/acker.ackerList.ACKEvents(...)
        github.com/elastic/beats/v7/libbeat/common/acker/acker.go:294
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).processACK.func1()
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/eventloop.go:515 +0x30
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).processACK(0x140001c6120, {0x0, 0x0}, 0x800)
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/eventloop.go:525 +0x16c
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig(0x14000516620)
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/ackloop.go:73 +0xa0
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run(0x14000516620)
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/ackloop.go:52 +0x154
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2()
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:222 +0x5c
created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue
        github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue/broker.go:220 +0x530

chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
Move the batch processing logic contained with the lumberjack server
into its own method.

The closure passed to newBatchACKTracker referenced the loop
variable whose value would change on each iteration. This resulted
in incorrect ACKing behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Filebeat Filebeat
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants