Skip to content

Commit

Permalink
GH-3291: Inconsistency in DLT topic naming convention
Browse files Browse the repository at this point in the history
Fixes: #3291 

* Standardize the creation of DLT topics to use a consistent naming convention, preferably using a dash suffix, such as "-dlt". This change ensures compatibility and avoids conflicts when transitioning between different retry solutions.

* Fix checkstyle 

* Docs changes
  • Loading branch information
Watlas committed Jun 6, 2024
1 parent 10dc58d commit 5529cbb
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ The framework provides the `DeadLetterPublishingRecoverer`, which publishes the
The recoverer requires a `KafkaTemplate<Object, Object>`, which is used to send the record.
You can also, optionally, configure it with a `BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>`, which is called to resolve the destination topic and partition.

IMPORTANT: By default, the dead-letter record is sent to a topic named `<originalTopic>.DLT` (the original topic name suffixed with `.DLT`) and to the same partition as the original record.
IMPORTANT: By default, the dead-letter record is sent to a topic named `<originalTopic>-dlt` (the original topic name suffixed with `-dlt`) and to the same partition as the original record.
Therefore, when you use the default resolver, the dead-letter topic **must have at least as many partitions as the original topic.**

If the returned `TopicPartition` has a negative partition, the partition is not set in the `ProducerRecord`, so the partition is selected by Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@
This section covers the changes made from version 3.2 to version 3.3.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].


[[x33-dlt-topic-naming]]
=== DLT Topic Naming Convention

The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property.





Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
*
* @author Gary Russell
* @author Tomaz Fernandes
* @author Watlas R
* @since 2.2
*
*/
Expand All @@ -75,7 +76,7 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());

private static final long FIVE = 5L;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,7 +84,7 @@
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(kraft = false, partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6.DLT" })
@EmbeddedKafka(kraft = false, partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6-dlt" })
public class BatchListenerConversionTests {

private static final String DEFAULT_TEST_GROUP_ID = "blc";
Expand Down Expand Up @@ -378,7 +378,7 @@ public void listen5(List<Foo> foos,
}
}

@KafkaListener(topics = "blc6.DLT", groupId = "blc6.DLT",
@KafkaListener(topics = "blc6-dlt", groupId = "blc6-dlt",
properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
":org.apache.kafka.common.serialization.StringDeserializer")
public void listen5Dlt(String in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ void testTxNoTx() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
Consumer consumer = mock(Consumer.class);
given(consumer.partitionsFor("foo.DLT", Duration.ofSeconds(5)))
given(consumer.partitionsFor("foo-dlt", Duration.ofSeconds(5)))
.willReturn(Collections.singletonList(new PartitionInfo("foo", 0, null, null, null)));
recoverer.accept(record, consumer, new RuntimeException());
verify(template, never()).executeInTransaction(any());
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(template).send(captor.capture());
assertThat(captor.getValue().partition()).isEqualTo(0);
verify(consumer).partitionsFor("foo.DLT", Duration.ofSeconds(5));
verify(consumer).partitionsFor("foo-dlt", Duration.ofSeconds(5));

record = new ConsumerRecord<>("foo", 1, 0L, "bar", "baz");
recoverer.accept(record, consumer, new RuntimeException());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,11 +61,11 @@ public class DefaultErrorHandlerBatchIntegrationTests {

public static final String topic1 = "dehTopic1";

public static final String topic1DLT = "dehTopic1.DLT";
public static final String topic1DLT = "dehTopic1-dlt";

public static final String topic2 = "dehTopic2";

public static final String topic2DLT = "dehTopic2.DLT";
public static final String topic2DLT = "dehTopic2-dlt";

private static EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -133,7 +133,7 @@ public void recoveryAndDlt() throws Exception {
"baz", "qux", "fiz", "buz",
"qux", "fiz", "buz");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
"baz", "qux", "fiz", "buz",
"qux", "fiz", "buz");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch2.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch2-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic2DLT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,11 +63,11 @@ public class FallbackBatchErrorHandlerIntegrationTests {

public static final String topic1 = "retryTopic1";

public static final String topic1DLT = "retryTopic1.DLT";
public static final String topic1DLT = "retryTopic1-dlt";

public static final String topic2 = "retryTopic2";

public static final String topic2DLT = "retryTopic2.DLT";
public static final String topic2DLT = "retryTopic2-dlt";

private static EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -141,7 +141,7 @@ public void publishEvent(Object event) {
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("retryBatch");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down Expand Up @@ -219,7 +219,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("retryBatch2");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch2.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch2-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic2DLT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,7 +153,7 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("seekTestMaxFailures");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "seekTestMaxFailures.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "seekTestMaxFailures-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class TransactionalContainerTests {

public static final String topic3 = "txTopic3";

public static final String topic3DLT = "txTopic3.DLT";
public static final String topic3DLT = "txTopic3-dlt";

public static final String topic4 = "txTopic4";

Expand All @@ -134,7 +134,7 @@ public class TransactionalContainerTests {

public static final String topic8 = "txTopic8";

public static final String topic8DLT = "txTopic8.DLT";
public static final String topic8DLT = "txTopic8-dlt";

public static final String topic9 = "txTopic9";

Expand Down

0 comments on commit 5529cbb

Please sign in to comment.