Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset is committed after the rollback of the DB transaction #2775

Closed
RuslanHryn opened this issue Aug 14, 2023 · 5 comments
Closed

Offset is committed after the rollback of the DB transaction #2775

RuslanHryn opened this issue Aug 14, 2023 · 5 comments

Comments

@RuslanHryn
Copy link

RuslanHryn commented Aug 14, 2023

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Tested on v3.0.8, previous versions are affected as well

Describe the bug

We are using JpaTransactionManager in ConcurrentKafkaListenerContainerFactory to start new DB transactions in consumers.
We've noticed that spring-kafka commits offset despite the error during the commit of the DB transaction.
This behavior is unexpected because an error has occurred and retries have been conducted as well.
Therefore, we are expecting that the offset will not be committed and it should work in the same way as when the consumer throws an exception.

Retries are working fine

To Reproduce

Create a container factory and use JpaTransactionManager there

ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

factory.getContainerProperties().setTransactionManager(jpaTransactionManager);

and consumer

        // to propagate transaction from spring-kafka to be able to rollback it
       @Transactional
       @KafkaListener(topics = ROLLING_BACK_TRANSACTIONAL_TOPIC)
        public void rollingBackTransactionalReceiver(SampleMessage message) {
           // the data will not be saved into DB
            persist(message);
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }

Expected behavior
Offset is not committed into the consumer group because of an error during the commit of the DB transaction

@garyrussell
Copy link
Contributor

garyrussell commented Aug 14, 2023

It makes no sense to not commit the offset - a successful commit of the next record in the partition will effectively commit it anyway.

That is the whole intent of providing a recoverer to the DefaultAfterRollbackProcessor (e.g. a DeadLetterPublishingRecoverer) which takes some action (such as storing it in a DLT) after retries are exhausted.

Setting a Jpa transaction manager on the container is not really recommended; this is for a KafkaAwareTransactionManager, to properly manage Kafka transactions.

You can simply use @Transactional on the listener method for a JPA-only transaction.

In that case, add the recoverer to the DefaultErrorHandler.

@orange-buffalo
Copy link

Hi @garyrussell, thanks for you prompt reply. Some questions to build a better understanding here.

(1)

Setting a Jpa transaction manager on the container is not really recommended; this is for a KafkaAwareTransactionManager, to properly manage Kafka transactions.

If that is the case, why does ContainerProperties#setTransactionManager accepts a PlatformTransactionManager? It is really confusing if API declared support for the base type of the transaction manager but expects only a specific sub-type. Wouldn't it be better to accept KafkaAwareTransactionManager to explicitly declare the intention and avoid confusion for library users?

(2)
I do understand the case with the next record commit committing the previous (failing) offset, makes sense to me. However, there is an inconsistency with a regular exception in the consumer, where offset of the failing message is not committed (assuming it is the last message in the partition). We spent quite some time of senior engineers trying to understand why the system behaves differently in rollback scenario until we dug deeply into Spring Kafka code. It would be beneficial for the library users to have the same expectations for error scenarios, would you agree?

@garyrussell
Copy link
Contributor

  1. Yes, on reflection, it should only accept the narrower type; we can deprecate the old setter.
  2. Again, when retries are exhausted, the recoverer is called and the failed record is deemed to have been recovered; the default recoverer just logs the failed record but usually something more sophisticated, such as the DeadLetterPublishingRecoverer is used. The behavior is the same whether or not transactions are used. However, it can be disabled when there is no transaction manager. See

/**
* Return true if the offset should be committed for a handled error (no exception
* thrown).
* @return true to commit.
*/
default boolean isAckAfterHandle() {
return true;
}
/**
* Set to false to prevent the container from committing the offset of a recovered
* record (when the error handler does not itself throw an exception).
* @param ack false to not commit.
*/
default void setAckAfterHandle(boolean ack) {

If you want more control over when offsets are committed, use one of the manual AckModes.

@orange-buffalo
Copy link

For point 2, we observe different behaviour (same configuration) depending on where the exception happens:

  • If listener code fails, offset is not committed.
  • If transaction commit fails, offset is committed.

Other aspects behave the same (retries, error handler, etc), the difference is only in committing offset. It is this difference that we find confusing and hard to explain without knowing the implementation details of Spring Kafka.
Committing offset or not is secondary (as you rightly mentioned, it will at some point anyway committed by the next record), inconsistent system behaviour is what we believe is an issue.

@garyrussell
Copy link
Contributor

If that's the case then it is, indeed, a bug. But, if you switch to using @Transactional instead of injecting a foreign TM into the container, you will see consistent results because, from the container's perspective, the commit is done within the scope of the call to the listener.

Wzy19930507 added a commit to Wzy19930507/spring-kafka that referenced this issue Feb 23, 2024
…nsactionManager

Use `KafkaAwareTransactionManager` to properly manage Kafka transactions.

* Deprecated properties transactionManager and add kafkaAwareTransactionManager at ContainerProperties.
* Modify transaction unit test to `setKafkaAwareTransactionManager.

Resolves spring-projects#2775
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants