Skip to content

Commit

Permalink
Add retry to Kafka Consumer Create in source (#3399) (#3406)
Browse files Browse the repository at this point in the history
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
(cherry picked from commit 49bf461)

Co-authored-by: Jonah Calvo <caljonah@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] and JonahCalvo committed Oct 5, 2023
1 parent ef21108 commit 87be448
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -82,6 +83,8 @@
@SuppressWarnings("deprecation")
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class)
public class KafkaSource implements Source<Record<Event>> {
private static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers";
private static final long RETRY_SLEEP_INTERVAL = 30000;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private AtomicBoolean shutdownInProgress;
Expand Down Expand Up @@ -130,22 +133,25 @@ public void start(Buffer<Record<Event>> buffer) {
allTopicExecutorServices.add(executorService);

IntStream.range(0, numWorkers).forEach(index -> {
switch (schema) {
case JSON:
kafkaConsumer = new KafkaConsumer<String, JsonNode>(consumerProperties);
while (true) {
try {
kafkaConsumer = createKafkaConsumer(schema, consumerProperties);
break;
case AVRO:
kafkaConsumer = new KafkaConsumer<String, GenericRecord>(consumerProperties);
break;
case PLAINTEXT:
default:
glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig);
if (Objects.nonNull(glueDeserializer)) {
kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} catch (ConfigException ce) {
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) {
LOG.warn("Exception while creating Kafka consumer: ", ce);
LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL);
try {
sleep(RETRY_SLEEP_INTERVAL);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
} else {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
throw ce;
}
break;
}

}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
allTopicConsumers.add(consumer);
Expand All @@ -165,6 +171,23 @@ public void start(Buffer<Record<Event>> buffer) {
});
}

public KafkaConsumer<?, ?> createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) {
switch (schema) {
case JSON:
return new KafkaConsumer<String, JsonNode>(consumerProperties);
case AVRO:
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
case PLAINTEXT:
default:
glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig);
if (Objects.nonNull(glueDeserializer)) {
return new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
} else {
return new KafkaConsumer<String, String>(consumerProperties);
}
}
}

@Override
public void stop() {
shutdownInProgress.set(true);
Expand Down Expand Up @@ -485,4 +508,8 @@ private String getMaskedBootStrapDetails(String serverIP) {
}
return maskedString.append(serverIP.substring(maskedLength)).toString();
}

protected void sleep(final long millis) throws InterruptedException {
Thread.sleep(millis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.kafka.source;

import org.apache.kafka.common.config.ConfigException;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -25,7 +26,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -135,4 +142,18 @@ void test_kafkaSource_basicFunctionality() {
kafkaSource.start(buffer);
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
}

@Test
void test_kafkaSource_retry_consumer_create() throws InterruptedException {
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
kafkaSource = spy(createObjectUnderTest());
doNothing().when(kafkaSource).sleep(anyLong());

doThrow(new ConfigException("No resolvable bootstrap urls given in bootstrap.servers"))
.doCallRealMethod()
.when(kafkaSource)
.createKafkaConsumer(any(), any());
kafkaSource.start(buffer);
}
}

0 comments on commit 87be448

Please sign in to comment.