Skip to content

Commit

Permalink
GH-3076: Set batchFailed state in ListenerConsumer even if no `co…
Browse files Browse the repository at this point in the history
…mmonErrorHandler` is configured

Fixes: #3076
  • Loading branch information
rroesch1 committed Mar 5, 2024
1 parent 265e55f commit f4f184b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
* @author Daniel Gentes
* @author Soby Chacko
* @author Wang Zhiyang
* @author Raphael Rösch
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -2276,13 +2277,13 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
}
}
catch (RuntimeException e) {
this.batchFailed = true;
failureTimer(sample, null);
batchInterceptAfter(records, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
this.batchFailed = true;
invokeBatchErrorHandler(records, recordList, e);
commitOffsetsIfNeededAfterHandlingError(records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@
* @author Artem Bilan
* @author Wang Zhiyang
* @author Soby Chacko
* @author Raphael Rösch
*
* @since 1.3
*
*/
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9},
TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9,
TransactionalContainerTests.topic10},
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
public class TransactionalContainerTests {

Expand All @@ -136,6 +138,8 @@ public class TransactionalContainerTests {

public static final String topic9 = "txTopic9";

public static final String topic10 = "txTopic10";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -1082,4 +1086,67 @@ void testNoAfterRollbackWhenFenced() throws Exception {
assertThatIllegalStateException().isThrownBy(container::start);
}


@Test
void testArbpWithoutRecovery() throws InterruptedException {
// init producer
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
pf.setTransactionIdPrefix("testArbpResetWithoutRecover.batchListener");
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
// init consumer
String group = "groupInARBP3";
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(group, "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProps = new ContainerProperties(topic10);
containerProps.setPollTimeout(10_000);
containerProps.setBatchRecoverAfterRollback(false); // we want to test the behavior if recovery is disabled
final var successLatch = new AtomicReference<>(new CountDownLatch(2));
containerProps.setMessageListener(new BatchMessageListener<Integer, String>() {
private int attempt = 0;

@Override
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
if (3 > attempt++) { // the first three attempts should fail
throw new BatchListenerFailedException("fail for test", data.get(0));
}
data.forEach(d -> successLatch.get().countDown());
}
});
// init container
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
containerProps.setKafkaAwareTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testArbpWithoutRecover");
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor = spy(
new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS))
);
container.setAfterRollbackProcessor(afterRollbackProcessor);
container.start();

// process first batch
template.executeInTransaction(t -> {
template.send(new ProducerRecord<>(topic10, 0, 0, "bar1"));
template.send(new ProducerRecord<>(topic10, 0, 0, "bar2"));
return null;
});
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for first batch

// process second batch
successLatch.set(new CountDownLatch(2)); // reset latch
template.executeInTransaction(t -> {
template.send(new ProducerRecord<>(topic10, 0, 0, "bar4"));
template.send(new ProducerRecord<>(topic10, 0, 0, "bar5"));
return null;
});
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); // wait for second batch

// assert three processBatch calls due to the failed attempts + one call to clearThreadState
verify(afterRollbackProcessor, times(3)).processBatch(any(), any(), any(), any(), any(), anyBoolean(), any());
verify(afterRollbackProcessor).clearThreadState();

container.stop();
}

}

0 comments on commit f4f184b

Please sign in to comment.