From f4f184b0fe644fa5b9d64450644ffa8d2514e944 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20R=C3=B6sch?= Date: Wed, 28 Feb 2024 12:46:01 +0100 Subject: [PATCH] GH-3076: Set `batchFailed` state in `ListenerConsumer` even if no `commonErrorHandler` is configured Fixes: #3076 --- .../KafkaMessageListenerContainer.java | 3 +- .../listener/TransactionalContainerTests.java | 69 ++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index d02faa72db..f6be9d2bb7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -163,6 +163,7 @@ * @author Daniel Gentes * @author Soby Chacko * @author Wang Zhiyang + * @author Raphael Rösch */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -2276,13 +2277,13 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } } catch (RuntimeException e) { + this.batchFailed = true; failureTimer(sample, null); batchInterceptAfter(records, e); if (this.commonErrorHandler == null) { throw e; } try { - this.batchFailed = true; invokeBatchErrorHandler(records, recordList, e); commitOffsetsIfNeededAfterHandlingError(records); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index cd339423dd..e204fdef41 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -103,6 +103,7 @@ * @author Artem Bilan * @author Wang Zhiyang * @author Soby Chacko + * @author Raphael Rösch * * @since 1.3 * @@ -110,7 +111,8 @@ @EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2, TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4, TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7, - TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9}, + TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9, + TransactionalContainerTests.topic10}, brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) public class TransactionalContainerTests { @@ -136,6 +138,8 @@ public class TransactionalContainerTests { public static final String topic9 = "txTopic9"; + public static final String topic10 = "txTopic10"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -1082,4 +1086,67 @@ void testNoAfterRollbackWhenFenced() throws Exception { assertThatIllegalStateException().isThrownBy(container::start); } + + @Test + void testArbpWithoutRecovery() throws InterruptedException { + // init producer + Map producerProperties = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProperties); + pf.setTransactionIdPrefix("testArbpResetWithoutRecover.batchListener"); + final KafkaTemplate template = new KafkaTemplate<>(pf); + // init consumer + String group = "groupInARBP3"; + Map consumerProperties = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProperties); + ContainerProperties containerProps = new ContainerProperties(topic10); + containerProps.setPollTimeout(10_000); + containerProps.setBatchRecoverAfterRollback(false); // we want to test the behavior if recovery is disabled + final var successLatch = new AtomicReference<>(new CountDownLatch(2)); + containerProps.setMessageListener(new BatchMessageListener() { + private int attempt = 0; + + @Override + public void onMessage(List> data) { + if (3 > attempt++) { // the first three attempts should fail + throw new BatchListenerFailedException("fail for test", data.get(0)); + } + data.forEach(d -> successLatch.get().countDown()); + } + }); + // init container + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + containerProps.setKafkaAwareTransactionManager(tm); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testArbpWithoutRecover"); + DefaultAfterRollbackProcessor afterRollbackProcessor = spy( + new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS)) + ); + container.setAfterRollbackProcessor(afterRollbackProcessor); + container.start(); + + // process first batch + template.executeInTransaction(t -> { + template.send(new ProducerRecord<>(topic10, 0, 0, "bar1")); + template.send(new ProducerRecord<>(topic10, 0, 0, "bar2")); + return null; + }); + assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for first batch + + // process second batch + successLatch.set(new CountDownLatch(2)); // reset latch + template.executeInTransaction(t -> { + template.send(new ProducerRecord<>(topic10, 0, 0, "bar4")); + template.send(new ProducerRecord<>(topic10, 0, 0, "bar5")); + return null; + }); + assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for second batch + + // assert three processBatch calls due to the failed attempts + one call to clearThreadState + verify(afterRollbackProcessor, times(3)).processBatch(any(), any(), any(), any(), any(), anyBoolean(), any()); + verify(afterRollbackProcessor).clearThreadState(); + + container.stop(); + } + }