diff --git a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java index c31bdee2..e7929fe8 100644 --- a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java +++ b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; @@ -66,7 +67,6 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.ErrorMessage; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -102,7 +102,10 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMessageHandler implements Lifecycle { - private static final long DEFAULT_SEND_TIMEOUT = 10000; + /** + * Buffer added to ensure our timeout is longer than Kafka's. + */ + private static final int TIMEOUT_BUFFER = 5000; private final Map> replyTopicsAndPartitions = new HashMap<>(); @@ -116,6 +119,8 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private final AtomicBoolean running = new AtomicBoolean(); + private final long deliveryTimeoutMsProperty; + private EvaluationContext evaluationContext; private Expression topicExpression; @@ -131,7 +136,7 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private boolean sync; - private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT); + private Expression sendTimeoutExpression; private KafkaHeaderMapper headerMapper; @@ -151,7 +156,7 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private ProducerRecordCreator producerRecordCreator = (message, topic, partition, timestamp, key, value, headers) -> - new ProducerRecord<>(topic, partition, timestamp, key, value, headers); + new ProducerRecord<>(topic, partition, timestamp, key, value, headers); private volatile byte[] singleReplyTopic; @@ -176,6 +181,27 @@ public KafkaProducerMessageHandler(final KafkaTemplate kafkaTemplate) { logger.warn("The KafkaTemplate is transactional; this gateway will only work if the consumer is " + "configured to read uncommitted records"); } + determineSendTimeout(); + this.deliveryTimeoutMsProperty = + this.sendTimeoutExpression.getValue(Long.class) // NOSONAR - never null after determineSendTimeout() + - TIMEOUT_BUFFER; + } + + private void determineSendTimeout() { + Map props = this.kafkaTemplate.getProducerFactory().getConfigurationProperties(); + Object dt = props.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); + if (dt == null) { + dt = ProducerConfig.configDef().defaultValues().get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); + } + if (dt instanceof Long) { + setSendTimeout(((Long) dt) + TIMEOUT_BUFFER); + } + else if (dt instanceof Integer) { + setSendTimeout(Long.valueOf((Integer) dt) + TIMEOUT_BUFFER); + } + else if (dt instanceof String) { + setSendTimeout(Long.parseLong((String) dt) + TIMEOUT_BUFFER); + } } public void setTopicExpression(Expression topicExpression) { @@ -244,24 +270,25 @@ public void setSync(boolean sync) { /** * Specify a timeout in milliseconds for how long this - * {@link KafkaProducerMessageHandler} should wait wait for send operation - * results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode. - * Also applies when sending to the success or failure channels. - * @param sendTimeout the timeout to wait for result fo send operation. + * {@link KafkaProducerMessageHandler} should wait wait for send operation results. + * Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout + * is applied Also applies when sending to the success or failure channels. + * @param sendTimeout the timeout to wait for result for a send operation. * @since 2.0.1 */ @Override - public void setSendTimeout(long sendTimeout) { + public final void setSendTimeout(long sendTimeout) { super.setSendTimeout(sendTimeout); setSendTimeoutExpression(new ValueExpression<>(sendTimeout)); } /** * Specify a SpEL expression to evaluate a timeout in milliseconds for how long this - * {@link KafkaProducerMessageHandler} should wait wait for send operation - * results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode. + * {@link KafkaProducerMessageHandler} should wait wait for send operation results. + * Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout + * is applied only in {@link #sync} mode. * @param sendTimeoutExpression the {@link Expression} for timeout to wait for result - * fo send operation. + * for a send operation. * @since 2.1.1 */ public void setSendTimeoutExpression(Expression sendTimeoutExpression) { @@ -270,7 +297,8 @@ public void setSendTimeoutExpression(Expression sendTimeoutExpression) { } /** - * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent + * Set the failure channel. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be sent * to this channel with a payload of a {@link KafkaSendFailureException} with the * failed message and cause. * @param sendFailureChannel the failure channel. @@ -281,7 +309,8 @@ public void setSendFailureChannel(MessageChannel sendFailureChannel) { } /** - * Set the failure channel name. After a send failure, an {@link ErrorMessage} will be + * Set the failure channel name. After a send failure, an + * {@link org.springframework.messaging.support.ErrorMessage} will be * sent to this channel name with a payload of a {@link KafkaSendFailureException} * with the failed message and cause. * @param sendFailureChannelName the failure channel name. @@ -392,10 +421,8 @@ public void start() { @Override public void stop() { - if (this.running.compareAndSet(true, false)) { - if (!this.transactional || this.allowNonTransactional) { - this.kafkaTemplate.flush(); - } + if (this.running.compareAndSet(true, false) && (!this.transactional || this.allowNonTransactional)) { + this.kafkaTemplate.flush(); } } @@ -404,11 +431,12 @@ public boolean isRunning() { return this.running.get(); } - @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") // NOSONAR - complexity @Override protected Object handleRequestMessage(final Message message) { final ProducerRecord producerRecord; - boolean flush = this.flushExpression.getValue(this.evaluationContext, message, Boolean.class); + boolean flush = + Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class)); boolean preBuilt = message.getPayload() instanceof ProducerRecord; if (preBuilt) { producerRecord = (ProducerRecord) message.getPayload(); @@ -430,9 +458,7 @@ protected Object handleRequestMessage(final Message message) { if (this.transactional && TransactionSynchronizationManager.getResource(this.kafkaTemplate.getProducerFactory()) == null && !this.allowNonTransactional) { - sendFuture = this.kafkaTemplate.executeInTransaction(template -> { - return template.send(producerRecord); - }); + sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord)); } else { sendFuture = this.kafkaTemplate.send(producerRecord); @@ -446,7 +472,7 @@ protected Object handleRequestMessage(final Message message) { throw new MessageHandlingException(message, e); } catch (ExecutionException e) { - throw new MessageHandlingException(message, e.getCause()); + throw new MessageHandlingException(message, e.getCause()); // NOSONAR } if (flush) { this.kafkaTemplate.flush(); @@ -488,12 +514,11 @@ private ProducerRecord createProducerRecord(final Message message) { headers = new RecordHeaders(); this.headerMapper.fromHeaders(messageHeaders, headers); } - final ProducerRecord producerRecord = this.producerRecordCreator.create(message, topic, partitionId, - timestamp, (K) messageKey, payload, headers); - return producerRecord; + return this.producerRecordCreator.create(message, topic, partitionId, timestamp, (K) messageKey, payload, + headers); } - private byte[] getReplyTopic(final Message message) { + private byte[] getReplyTopic(Message message) { // NOSONAR if (this.replyTopicsAndPartitions.isEmpty()) { determineValidReplyTopicsAndPartitions(); } @@ -569,8 +594,9 @@ public void processSendResult(final Message message, final ProducerRecord> future, MessageChannel metadataChannel) throws InterruptedException, ExecutionException { - if (getSendFailureChannel() != null || metadataChannel != null) { - future.addCallback(new ListenableFutureCallback>() { + final MessageChannel failureChannel = getSendFailureChannel(); + if (failureChannel != null || metadataChannel != null) { + future.addCallback(new ListenableFutureCallback>() { // NOSONAR @Override public void onSuccess(SendResult result) { @@ -583,8 +609,8 @@ public void onSuccess(SendResult result) { @Override public void onFailure(Throwable ex) { - if (getSendFailureChannel() != null) { - KafkaProducerMessageHandler.this.messagingTemplate.send(getSendFailureChannel(), + if (failureChannel != null) { + KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel, KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage( new KafkaSendFailureException(message, producerRecord, ex), null)); } @@ -593,8 +619,15 @@ public void onFailure(Throwable ex) { }); } - if (this.sync) { + if (this.sync || this.isGateway) { Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class); + if (sendTimeout != null && sendTimeout <= this.deliveryTimeoutMsProperty) { + this.logger.debug("'sendTimeout' increased to " + + (this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER) + + "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer " + + "property to avoid false failures"); + sendTimeout = this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER; + } if (sendTimeout == null || sendTimeout < 0) { future.get(); } @@ -623,7 +656,7 @@ private final class ConvertingReplyFuture extends SettableListenableFuture future) { - future.addCallback(new ListenableFutureCallback>() { + future.addCallback(new ListenableFutureCallback>() { // NOSONAR @Override public void onSuccess(ConsumerRecord result) { diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml index f6484ace..6f6a9ac9 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml @@ -3,9 +3,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd - http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> + xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/integration/kafka https://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd + http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd"> - - - + - diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java index d1eb64be..a406e034 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java @@ -17,15 +17,24 @@ package org.springframework.integration.kafka.config.xml; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.test.util.TestUtils; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -73,8 +82,20 @@ public void testProps() { .isSameAs(this.context.getBean("customHeaderMapper")); } + @Component public static class EMS extends DefaultErrorMessageStrategy { + @SuppressWarnings("rawtypes") + @Bean + public KafkaTemplate template() { + ProducerFactory pf = mock(ProducerFactory.class); + Map props = new HashMap<>(); + given(pf.getConfigurationProperties()).willReturn(props); + KafkaTemplate template = mock(KafkaTemplate.class); + given(template.getProducerFactory()).willReturn(pf); + return template; + } + } } diff --git a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java index 058a31f2..93f88210 100644 --- a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -147,11 +148,14 @@ static void tearDown() { @Test void testOutbound() { - DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>( - KafkaTestUtils.producerProps(embeddedKafka)); + Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 40_000); + DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProps); KafkaTemplate template = new KafkaTemplate<>(producerFactory); KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template); handler.setBeanFactory(mock(BeanFactory.class)); + handler.setSendTimeout(50_000); + handler.setSync(true); handler.afterPropertiesSet(); Message message = MessageBuilder.withPayload("foo")