Skip to content

Commit

Permalink
Fix Bulk Requests for older OD versions
Browse files Browse the repository at this point in the history
The `require_alias` parameter for bulk requests was only introduced
with ES 7.10. Since DataPrepper needs to be compatible down to 6.8,
the parameter should not be used in earlier OD versions. This change
will apply the parameter only when OpenSearch is detected as target.

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
  • Loading branch information
KarstenSchnitter committed Nov 7, 2023
1 parent 7bba080 commit 03fad8b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias);
final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias);
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class AbstractIndexManager implements IndexManager {
private final TemplateStrategy templateStrategy;
protected String indexPrefix;
private Boolean isIndexAlias;
private boolean isIndexAliasChecked;

private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class);

Expand Down Expand Up @@ -184,11 +185,21 @@ public static ZonedDateTime getCurrentUtcTime() {
}

@Override
public boolean isIndexAlias(final String dynamicIndexAlias) throws IOException {
if (isIndexAlias == null) {
ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException {
if (isIndexAliasChecked == false) {
try {
// Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support
// `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in
// ES 7.10.
openSearchClient.info();
ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
} catch (RuntimeException ex) {
isIndexAlias = null;
} finally {
isIndexAliasChecked = true;
}
}
return isIndexAlias;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface IndexManager{

void setupIndex() throws IOException;
String getIndexName(final String indexAlias) throws IOException;
boolean isIndexAlias(final String indexAlias) throws IOException;
Boolean isIndexAlias(final String indexAlias) throws IOException;
}

0 comments on commit 03fad8b

Please sign in to comment.