Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaMessageListenerContainer doesn't properly clear threadState of AfterRollbackProcessor #3076

Closed
rroesch1 opened this issue Feb 27, 2024 · 3 comments

Comments

@rroesch1
Copy link
Contributor

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.1.1

Describe the bug

The KafkaMessageListenerContainer doesn't clear the threadState of AfterRollbackProcessor after a successfull processing attempt if transactionManager is set.

To Reproduce

  • I have a KafkaMessageListenerContainer configured with a BatchMessageListener and KafkaTransactionManager (batched exactly-once processing)
  • Additionally i set a DefaultAfterRollbackProcessor with ExponentialBackOff
  • i do not set any CommonErrorHandler (AbstractMessageListenerContainer#setCommonErrorHandler)

Example snippet:

    @Bean
    MessageListenerContainer messageListenerContainer(
            KafkaTemplate<byte[], byte[]> kafkaTemplate,
            ConsumerFactory<byte[], byte[]> consumerFactory,
            KafkaTransactionManager<byte[], byte[]> transactionManager) {

        // initialize MyEventListener which is a BatchMessageListener
        var myEventListener = new MyEventListener(kafkaTemplate);
        // initialize container properties
        var props = new ContainerProperties(sourceTopics);
        props.setMessageListener(myEventListener);
        props.setTransactionManager(transactionManager); // enable transaction manager for exactly-once processing
        // initialize back-off strategy for retries
        var backOff = new ExponentialBackOff(1000, 2);
        backOff.setMaxInterval(300000L); // set back-off limit to 5min
        var afterRollbackProcessor = new DefaultAfterRollbackProcessor<>(backOff);
        // initialize MessageListenerContainer
        var container = new KafkaMessageListenerContainer<>(consumerFactory, props);
        container.setAfterRollbackProcessor(afterRollbackProcessor);
        return container;
    }

(Un-)expected behavior

  • KafkaMessageListenerContainer calls the AfterRollbackProcessor after each failed transacation (as expected)
  • the DefaultAfterRollbackProcessor uses the configured BackOff after each failed transacttion (as expected)
  • after a sucessfull processing attempt, AfterRollbackProcessor#clearThreadState is not being called (that's unexpected to me)
  • thus the state of the BackOffExecution in DefaultAfterRollbackProcessor (which is "some kind of ThreadLocal state") is never reset (that's unexpected to me)
  • thus, on subsequent transaction rollbacks, DefaultAfterRollbackProcessor reuses the previously used BackOffExecution (that's unexpected to me)
  • this unexpected behavior only appears if commonErrorHandler is set to null (that's the default if transactionManager is set)
@sobychacko
Copy link
Contributor

Thanks for the repot. It would be great if you could set up a small sample application where we can quickly reproduce this issue.

@artembilan
Copy link
Member

For the batch mode the logic there is like this in the doInvokeBatchListener():

if (this.batchFailed) {
					this.batchFailed = false;
					if (this.commonErrorHandler != null) {
						this.commonErrorHandler.clearThreadState();
					}
					getAfterRollbackProcessor().clearThreadState();
				}

So, we clear the thread state before calling AfterRollbackProcessor.processBatch().

Looks like similar logic is missed for the invokeRecordListenerInTx() branch.

The simple sample with reproduction to confirm our thinking would be appreciated.
From there we could craft some unit test to rely during the fix.

Thanks

@rroesch1
Copy link
Contributor Author

@artembilan You pointed out the correct code segment. The problem is, that batchFailed is always false if commonErrorHandler == null.
Therefore the getAfterRollbackProcessor().clearThreadState() statement in doInvokeBatchListener is unreachable.
see: catch block in doInvokeBatchListener (i added some comments)

            try {
                invokeBatchOnMessage(records, recordList);
                batchInterceptAfter(records, null);
                successTimer(sample, null);
                if (this.batchFailed) {
                    this.batchFailed = false;
                    if (this.commonErrorHandler != null) {
                        this.commonErrorHandler.clearThreadState();
                    }
                    getAfterRollbackProcessor().clearThreadState(); // this statement is only reachable if `batchFailed==true`
                }
                if (!this.autoCommit && !this.isRecordAck) {
                    processCommits();
                }
            }
            catch (RuntimeException e) {
                failureTimer(sample, null);
                batchInterceptAfter(records, e);
                if (this.commonErrorHandler == null) {
                    throw e;
                }
                try {
                    this.batchFailed = true; // this statement is unreachable if commonErrorHandler == null
                    invokeBatchErrorHandler(records, recordList, e);
                    commitOffsetsIfNeededAfterHandlingError(records);
                }
                // …
            }

The commonErrorHandler is null because transactionManager is not null.
see: determineCommonErrorHandler

        @Nullable
        private CommonErrorHandler determineCommonErrorHandler() {
            CommonErrorHandler common = getCommonErrorHandler();
            if (common == null && this.transactionManager == null) {
                common = new DefaultErrorHandler();
            }
            return common;
        }

rroesch1 added a commit to rroesch1/spring-kafka that referenced this issue Feb 28, 2024
… even if no `commonErrorHandler` is configured

Fixes: spring-projects#3076
@artembilan artembilan added this to the 3.2.0-M2 milestone Feb 28, 2024
rroesch1 added a commit to rroesch1/spring-kafka that referenced this issue Mar 5, 2024
… even if no `commonErrorHandler` is configured

Fixes: spring-projects#3076
rroesch1 added a commit to rroesch1/spring-kafka that referenced this issue Mar 5, 2024
… even if no `commonErrorHandler` is configured

Fixes: spring-projects#3076
artembilan pushed a commit that referenced this issue Mar 5, 2024
…rorHandler`

Fixes: #3076

The `ListenerConsumer` sets `batchFailed` to `true` only if `commonErrorHandler` is provided

* Move `this.batchFailed = true;` in the `doInvokeBatchListener()` before any error handling.

**Auto-cherry-pick to `3.0.x`**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java
spring-builds pushed a commit that referenced this issue Mar 5, 2024
…rorHandler`

Fixes: #3076

The `ListenerConsumer` sets `batchFailed` to `true` only if `commonErrorHandler` is provided

* Move `this.batchFailed = true;` in the `doInvokeBatchListener()` before any error handling.

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

(cherry picked from commit e4d9994)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants