From 03fad8bcbcbc4f0b57adb1e9287b7d662645e4c0 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Tue, 7 Nov 2023 15:41:11 +0100 Subject: [PATCH] Fix Bulk Requests for older OD versions The `require_alias` parameter for bulk requests was only introduced with ES 7.10. Since DataPrepper needs to be compatible down to 6.8, the parameter should not be used in earlier OD versions. This change will apply the parameter only when OpenSearch is detected as target. Signed-off-by: Karsten Schnitter --- .../sink/opensearch/OpenSearchSink.java | 2 +- .../index/AbstractIndexManager.java | 21 ++++++++++++++----- .../sink/opensearch/index/IndexManager.java | 2 +- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index bd43d95b84..e7f12313c3 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -217,7 +217,7 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); - boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias); + final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias); final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index c2f5d2ff64..86974bec13 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -53,6 +53,7 @@ public abstract class AbstractIndexManager implements IndexManager { private final TemplateStrategy templateStrategy; protected String indexPrefix; private Boolean isIndexAlias; + private boolean isIndexAliasChecked; private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class); @@ -184,11 +185,21 @@ public static ZonedDateTime getCurrentUtcTime() { } @Override - public boolean isIndexAlias(final String dynamicIndexAlias) throws IOException { - if (isIndexAlias == null) { - ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build(); - BooleanResponse response = openSearchClient.indices().existsAlias(request); - isIndexAlias = response.value() && checkISMEnabled(); + public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException { + if (isIndexAliasChecked == false) { + try { + // Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support + // `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in + // ES 7.10. + openSearchClient.info(); + ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build(); + BooleanResponse response = openSearchClient.indices().existsAlias(request); + isIndexAlias = response.value() && checkISMEnabled(); + } catch (RuntimeException ex) { + isIndexAlias = null; + } finally { + isIndexAliasChecked = true; + } } return isIndexAlias; } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java index 030474db55..1e271b0e14 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java @@ -6,5 +6,5 @@ public interface IndexManager{ void setupIndex() throws IOException; String getIndexName(final String indexAlias) throws IOException; - boolean isIndexAlias(final String indexAlias) throws IOException; + Boolean isIndexAlias(final String indexAlias) throws IOException; }