Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Create Index Before Pipeline Execute #32786

Merged
merged 9 commits into from
Aug 14, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,47 @@ teardown:
id: 3
pipeline: ""
body: {bytes_source_field: "1kb"}

---
"Test index with default pipeline from template":

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.put_template:
name: test_template
body:
index_patterns: testindex
settings:
index:
default_pipeline: "my_pipeline"

- do:
index:
index: testindex
type: test
id: 4
body: {bytes_source_field: "2kb"}

- do:
get:
index: testindex
type: test
id: 4
- match: { _source.bytes_source_field: "2kb" }
- match: { _source.bytes_target_field: 2048 }
Original file line number Diff line number Diff line change
Expand Up @@ -127,37 +127,6 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
return;
}

final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

Expand Down Expand Up @@ -191,15 +160,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}

Expand All @@ -215,7 +184,7 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
Expand All @@ -225,7 +194,47 @@ public void onFailure(Exception e) {
}
}
} else {
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}

private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
try {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
} else {
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
}
}

Expand Down