From bcb462eb1022616f668ea9fd744b8c4d5339510c Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 14 Aug 2024 13:45:41 -0400 Subject: [PATCH] GH-3409: Improve exceptions during retry Fixes: #3409 - During error handling, records in retry are throwing `KafkaException` after seeking that causes the exception to get logged which maybe misleading. It seems to indicate that the error handling process itself thew the exception, while the exception was thrown in order to prevent accidental committing of a not-yet recovered record. Change this exception during seeks in retry while handling error from `KafkaException` to a new framework-only used exception - `RecordInRetryException` that simply logs the message at INFO level. --- .../KafkaMessageListenerContainer.java | 8 ++++ .../listener/RecordInRetryException.java | 46 +++++++++++++++++++ .../kafka/listener/SeekUtils.java | 4 +- .../DefaultErrorHandlerRecordTests.java | 10 ++-- .../listener/FailedBatchProcessorTests.java | 6 +-- .../listener/SeekToCurrentRecovererTests.java | 18 ++++---- 6 files changed, 73 insertions(+), 19 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7fe587fb2a..d0d3b8e9c9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2212,6 +2212,10 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor invokeBatchErrorHandler(records, recordList, e); commitOffsetsIfNeededAfterHandlingError(records); } + catch (RecordInRetryException rire) { + this.logger.info("Record in retry and not yet recovered"); + return rire; + } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; @@ -2715,6 +2719,10 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco invokeErrorHandler(cRecord, iterator, e); commitOffsetsIfNeededAfterHandlingError(cRecord); } + catch (RecordInRetryException rire) { + this.logger.info("Record in retry and not yet recovered"); + return rire; + } catch (KafkaException ke) { ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java new file mode 100644 index 0000000000..0673b0f006 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInRetryException.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import javax.annotation.Nullable; + +import org.springframework.core.NestedRuntimeException; + +/** + * Internal {@link NestedRuntimeException} that is used as an exception thrown + * when the record is in retry and not yet recovered during error handling. + * This is to prevent the record from being prematurely committed in the middle of a retry. + * + * Intended only for framework use and thus the package-protected access. + * + * @author Soby Chacko + * @since 3.3.0 + */ +@SuppressWarnings("serial") +class RecordInRetryException extends NestedRuntimeException { + + /** + * Package protected constructor to create an instance with the provided properties. + * + * @param message logging message + * @param cause {@link Throwable} + */ + RecordInRetryException(String message, @Nullable Throwable cause) { + super(message, cause); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 4a4aa10419..332294357b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -32,7 +32,6 @@ import org.springframework.core.NestedRuntimeException; import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.KafkaException; import org.springframework.kafka.KafkaException.Level; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.KafkaUtils; @@ -46,6 +45,7 @@ * @author Gary Russell * @author Francois Rosiere * @author Wang Zhiyang + * @author Soby Chacko * @since 2.2 * */ @@ -224,7 +224,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List record, Exception original, Exce List> records = Arrays.asList(record1, record2); IllegalStateException illegalState = new IllegalStateException(); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, mock(MessageListenerContainer.class))) .withCause(illegalState); handler.handleRemaining(new DeserializationException("intended", null, false, illegalState), records, @@ -214,7 +214,7 @@ void testEarlyExitBackOff() { MessageListenerContainer container = mock(MessageListenerContainer.class); given(container.isRunning()).willReturn(false); long t1 = System.currentTimeMillis(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, container)); assertThat(System.currentTimeMillis() < t1 + 5_000); } @@ -230,7 +230,7 @@ void testNoEarlyExitBackOff() { MessageListenerContainer container = mock(MessageListenerContainer.class); given(container.isRunning()).willReturn(true); long t1 = System.currentTimeMillis(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> handler.handleRemaining(illegalState, + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> handler.handleRemaining(illegalState, records, consumer, container)); assertThat(System.currentTimeMillis() >= t1 + 200); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java index 22f5c38d21..382b6eb944 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java @@ -43,13 +43,13 @@ import org.springframework.core.log.LogAccessor; import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper; -import org.springframework.kafka.KafkaException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; /** * @author Gary Russell * @author Francois Rosiere + * @author Soby Chacko * @since 3.0.3 * */ @@ -118,10 +118,10 @@ void testExceptionDuringCommit() { willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any()); final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class); willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties(); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> testFBP.handle(new BatchListenerFailedException("topic", rec2), records, consumer, mockMLC, mock(Runnable.class)) - ).withMessage("Seek to current after exception"); + ).withMessage("Record in retry and not yet recovered"); } static class TestFBP extends FailedBatchProcessor { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index abff94e5a6..1c68947dd1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; @@ -70,6 +69,7 @@ /** * @author Gary Russell + * @author Soby Chacko * @since 2.2 * */ @@ -180,7 +180,7 @@ public void seekToCurrentErrorHandlerRecovers() { records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); @@ -227,14 +227,14 @@ public void recoveryFailed(ConsumerRecord record, Exception original, Exce records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); eh.handleRemaining(new RuntimeException(), records, consumer, null); @@ -267,11 +267,11 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() { records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); - assertThatExceptionOfType(KafkaException.class).isThrownBy( + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy( () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); eh.handleRemaining(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery @@ -308,7 +308,7 @@ private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) { OffsetCommitCallback commitCallback = (offsets, ex) -> { }; properties.setCommitCallback(commitCallback); given(container.getContainerProperties()).willReturn(properties); - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, container)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verify(consumer).seek(new TopicPartition("foo", 1), 0L); @@ -340,7 +340,7 @@ public void testNeverRecover() { records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); for (int i = 0; i < 20; i++) { - assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + assertThatExceptionOfType(RecordInRetryException.class).isThrownBy(() -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); } verify(consumer, times(20)).seek(new TopicPartition("foo", 0), 0L);