-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaMessageListenerContainer
doesn't properly clear threadState of AfterRollbackProcessor
#3076
Comments
Thanks for the repot. It would be great if you could set up a small sample application where we can quickly reproduce this issue. |
For the batch mode the logic there is like this in the
So, we clear the thread state before calling Looks like similar logic is missed for the The simple sample with reproduction to confirm our thinking would be appreciated. Thanks |
@artembilan You pointed out the correct code segment. The problem is, that try {
invokeBatchOnMessage(records, recordList);
batchInterceptAfter(records, null);
successTimer(sample, null);
if (this.batchFailed) {
this.batchFailed = false;
if (this.commonErrorHandler != null) {
this.commonErrorHandler.clearThreadState();
}
getAfterRollbackProcessor().clearThreadState(); // this statement is only reachable if `batchFailed==true`
}
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
}
catch (RuntimeException e) {
failureTimer(sample, null);
batchInterceptAfter(records, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
this.batchFailed = true; // this statement is unreachable if commonErrorHandler == null
invokeBatchErrorHandler(records, recordList, e);
commitOffsetsIfNeededAfterHandlingError(records);
}
// …
} The @Nullable
private CommonErrorHandler determineCommonErrorHandler() {
CommonErrorHandler common = getCommonErrorHandler();
if (common == null && this.transactionManager == null) {
common = new DefaultErrorHandler();
}
return common;
} |
… even if no `commonErrorHandler` is configured Fixes: spring-projects#3076
… even if no `commonErrorHandler` is configured Fixes: spring-projects#3076
… even if no `commonErrorHandler` is configured Fixes: spring-projects#3076
…rorHandler` Fixes: #3076 The `ListenerConsumer` sets `batchFailed` to `true` only if `commonErrorHandler` is provided * Move `this.batchFailed = true;` in the `doInvokeBatchListener()` before any error handling. **Auto-cherry-pick to `3.0.x`** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java
…rorHandler` Fixes: #3076 The `ListenerConsumer` sets `batchFailed` to `true` only if `commonErrorHandler` is provided * Move `this.batchFailed = true;` in the `doInvokeBatchListener()` before any error handling. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java (cherry picked from commit e4d9994)
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.1.1
Describe the bug
The
KafkaMessageListenerContainer
doesn't clear the threadState ofAfterRollbackProcessor
after a successfull processing attempt iftransactionManager
is set.To Reproduce
KafkaMessageListenerContainer
configured with aBatchMessageListener
andKafkaTransactionManager
(batched exactly-once processing)DefaultAfterRollbackProcessor
withExponentialBackOff
AbstractMessageListenerContainer#setCommonErrorHandler
)Example snippet:
(Un-)expected behavior
KafkaMessageListenerContainer
calls theAfterRollbackProcessor
after each failed transacation (as expected)DefaultAfterRollbackProcessor
uses the configuredBackOff
after each failed transacttion (as expected)AfterRollbackProcessor#clearThreadState
is not being called (that's unexpected to me)BackOffExecution
inDefaultAfterRollbackProcessor
(which is "some kind ofThreadLocal
state") is never reset (that's unexpected to me)DefaultAfterRollbackProcessor
reuses the previously usedBackOffExecution
(that's unexpected to me)commonErrorHandler
is set tonull
(that's the default iftransactionManager
is set)The text was updated successfully, but these errors were encountered: