From 528267290213593b0186fd1ad023ec770cc78b20 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 May 2013 12:35:14 +0200 Subject: [PATCH] Refactor #2 --- .../action/bulk/TransportShardBulkAction.java | 256 +++++++++++------- .../InstanceShardOperationRequest.java | 2 +- .../action/update/UpdateHelper.java | 3 + 3 files changed, 162 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index af2aa10b49d35..cd8c7900465d9 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; @@ -150,7 +151,7 @@ protected PrimaryResponse shardOperationOnP if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { - IndexResult result = shardIndexOperation(request, indexRequest, item, clusterState, indexShard, true); + WriteResult result = shardIndexOperation(request, indexRequest, item, clusterState, indexShard, true); // add the response responses[i] = result.response; preVersions[i] = result.preVersion; @@ -189,7 +190,7 @@ protected PrimaryResponse shardOperationOnP DeleteRequest deleteRequest = (DeleteRequest) item.request(); try { // add the response - responses[i] = shardDeleteOperation(deleteRequest, item, indexShard); + responses[i] = shardDeleteOperation(deleteRequest, item, indexShard).response; } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -210,35 +211,17 @@ protected PrimaryResponse shardOperationOnP request.items()[i] = null; } } else if (item.request() instanceof UpdateRequest) { - // TODO: move to shardUpdateOperation UpdateRequest updateRequest = (UpdateRequest) item.request(); - updateRequest.shardId = shardRequest.shardId; // TODO: Try to make protected again... int retryCount = 0; do { - UpdateHelper.Result translate; - try { - translate = updateHelper.prepare(updateRequest); - } catch (ElasticSearchIllegalArgumentException e) { // Invalid script - responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(e))); - request.items()[i] = null; - break; - } catch (DocumentSourceMissingException e) { // No source available - responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(e))); - request.items()[i] = null; - break; - } catch (DocumentMissingException e) { // Document doesn't exists and upsert is missing - responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(e))); - request.items()[i] = null; - break; - } - switch (translate.operation()) { - case UPSERT: - case INDEX: - IndexRequest indexRequest = null; - try { - indexRequest = translate.action(); + UpdateResult updateResult = shardUpdateOperation(clusterState, request, updateRequest, item, indexShard); + if (updateResult.writeResult != null) { + switch (updateResult.translate.operation()) { + case UPSERT: + case INDEX: + WriteResult result = updateResult.writeResult; + IndexRequest indexRequest = updateResult.request(); BytesReference indexSourceAsBytes = indexRequest.source(); - IndexResult result = shardIndexOperation(request, indexRequest, item, clusterState, indexShard, false); // add the response UpdateResponse updateResponse = new UpdateResponse(result.response.getIndex(), result.response.getType(), result.response.getId(), result.response.getVersion()); updateResponse.setMatches(((IndexResponse) result.response.getResponse()).getMatches()); @@ -263,78 +246,57 @@ protected PrimaryResponse shardOperationOnP // Replace the update request to the translated index request to execute on the replica. request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest); break; - } catch (Throwable t) { - t = ExceptionsHelper.unwrapCause(t); - if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) { - if (retryCount < updateRequest.retryOnConflict()) { - continue; - } - } - - // rethrow the failure if we are going to retry on primary and let parent failure to handle it - if (retryPrimaryException(t)) { - // restore updated versions... - for (int j = 0; j < i; j++) { - applyVersion(request.items()[j], preVersions[j]); - } - throw (ElasticSearchException) t; - } - if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { - logger.trace("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); - } else { - logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); - } - responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), - new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t))); - // nullify the request so it won't execute on the replicas - request.items()[i] = null; - } - - break; - case DELETE: - retryCount = 0; - DeleteRequest deleteRequest = null; - try { - deleteRequest = translate.action(); - BulkItemResponse response = shardDeleteOperation(deleteRequest, item, indexShard); - UpdateResponse updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null)); + case DELETE: + BulkItemResponse response = updateResult.writeResult.response; + DeleteRequest deleteRequest = updateResult.request(); + updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.translate.updatedSourceAsMap(), updateResult.translate.updateSourceContentType(), null)); responses[i] = new BulkItemResponse(response.getItemId(), "update", updateResponse); // Replace the update request to the translated delete request to execute on the replica. request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest); break; - } catch (Throwable t) { - t = ExceptionsHelper.unwrapCause(t); - if (t instanceof VersionConflictEngineException) { - if (retryCount < updateRequest.retryOnConflict()) { - continue; - } - } - - // rethrow the failure if we are going to retry on primary and let parent failure to handle it - if (retryPrimaryException(t)) { - // restore updated versions... - for (int j = 0; j < i; j++) { - applyVersion(request.items()[j], preVersions[j]); - } - throw (ElasticSearchException) t; + } + } else if (updateResult.failure()) { + Throwable t = updateResult.error; + if (!updateResult.retry) { + // rethrow the failure if we are going to retry on primary and let parent failure to handle it + if (retryPrimaryException(t)) { + // restore updated versions... + for (int j = 0; j < i; j++) { + applyVersion(request.items()[j], preVersions[j]); } - if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { - logger.trace("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); - } else { - logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); + throw (ElasticSearchException) t; + } + if (updateResult.translate == null) { + responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t))); + } else { + switch (updateResult.translate.operation()) { + case UPSERT: + case INDEX: + IndexRequest indexRequest = updateResult.request(); + if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { + logger.trace("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); + } else { + logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); + } + responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t))); + break; + case DELETE: + DeleteRequest deleteRequest = updateResult.request(); + if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { + logger.trace("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); + } else { + logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); + } + responses[i] = new BulkItemResponse(item.id(), "delete", + new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t))); + break; } - responses[i] = new BulkItemResponse(item.id(), "delete", - new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t))); - // nullify the request so it won't execute on the replicas - request.items()[i] = null; } - break; - case NONE: - responses[i] = new BulkItemResponse(item.id(), "update", (UpdateResponse) translate.action()); - break; - default: - throw new ElasticSearchIllegalStateException("Illegal update operation " + translate.operation()); + // nullify the request so it won't execute on the replicas + request.items()[i] = null; + } } } while (++retryCount < updateRequest.retryOnConflict()); } @@ -357,14 +319,14 @@ protected PrimaryResponse shardOperationOnP return new PrimaryResponse(shardRequest.request, response, ops); } - static class IndexResult { + static class WriteResult { final BulkItemResponse response; final long preVersion; final Tuple mappingToUpdate; final Engine.IndexingOperation op; - IndexResult(BulkItemResponse response, long preVersion, Tuple mappingToUpdate, Engine.IndexingOperation op) { + WriteResult(BulkItemResponse response, long preVersion, Tuple mappingToUpdate, Engine.IndexingOperation op) { this.response = response; this.preVersion = preVersion; this.mappingToUpdate = mappingToUpdate; @@ -372,7 +334,7 @@ static class IndexResult { } } - private IndexResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, BulkItemRequest item, + private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, BulkItemRequest item, ClusterState clusterState, IndexShard indexShard, boolean processed) { // validate, if routing is required, that we got routing @@ -418,19 +380,117 @@ private IndexResult shardIndexOperation(BulkShardRequest request, IndexRequest i op = null; } - return new IndexResult(new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + return new WriteResult(new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version)), preVersion, mappingsToUpdate, op ); } - private BulkItemResponse shardDeleteOperation(DeleteRequest deleteRequest, BulkItemRequest item, IndexShard indexShard) { + private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, BulkItemRequest item, IndexShard indexShard) { Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); indexShard.delete(delete); // update the request with the version so it will go to the replicas deleteRequest.version(delete.version()); - return new BulkItemResponse(item.id(), "delete", + BulkItemResponse bulkItemResponse = new BulkItemResponse(item.id(), "delete", new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound())); + return new WriteResult(bulkItemResponse, deleteRequest.version(), null, null); + } + + static class UpdateResult { + + final UpdateHelper.Result translate; + final ActionRequest actionRequest; + final boolean retry; + final Throwable error; + final WriteResult writeResult; + final UpdateResponse noopResult; + + UpdateResult(UpdateHelper.Result translate, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) { + this.translate = translate; + this.actionRequest = actionRequest; + this.retry = retry; + this.error = error; + this.writeResult = writeResult; + this.noopResult = null; + } + + UpdateResult(UpdateHelper.Result translate, ActionRequest actionRequest, WriteResult writeResult) { + this.translate = translate; + this.actionRequest = actionRequest; + this.writeResult = writeResult; + this.retry = false; + this.error = null; + this.noopResult = null; + } + + public UpdateResult(UpdateHelper.Result translate, UpdateResponse updateResponse) { + this.translate = translate; + this.noopResult = updateResponse; + this.actionRequest = null; + this.writeResult = null; + this.retry = false; + this.error = null; + } + + + + boolean failure() { + return error != null; + } + + @SuppressWarnings("unchecked") + T request() { + return (T) actionRequest; + } + + + } + + private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, BulkItemRequest item, IndexShard indexShard) { + UpdateHelper.Result translate; + try { + translate = updateHelper.prepare(updateRequest, indexShard); + } catch (ElasticSearchIllegalArgumentException e) { // Invalid script + return new UpdateResult(null, null, false, e, null); + } catch (DocumentSourceMissingException e) { // No source available + return new UpdateResult(null, null, false, e, null); + } catch (DocumentMissingException e) { // Document doesn't exists and upsert is missing + return new UpdateResult(null, null, false, e, null); + } + switch (translate.operation()) { + case UPSERT: + case INDEX: + IndexRequest indexRequest = translate.action(); + try { + WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, item, clusterState, indexShard, false); + return new UpdateResult(translate, indexRequest, result); + } catch (Throwable t) { + t = ExceptionsHelper.unwrapCause(t); + boolean retry = false; + if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) { + retry = true; + } + return new UpdateResult(translate, indexRequest, retry, t, null); + } + case DELETE: + DeleteRequest deleteRequest = translate.action(); + try { + WriteResult result = shardDeleteOperation(deleteRequest, item, indexShard); + return new UpdateResult(translate, deleteRequest, result); + } catch (Throwable t) { + t = ExceptionsHelper.unwrapCause(t); + boolean retry = false; + if (t instanceof VersionConflictEngineException) { + retry = true; + } + return new UpdateResult(translate, deleteRequest, retry, t, null); + } + case NONE: + UpdateResponse updateResponse = translate.action(); + return new UpdateResult(translate, updateResponse); + default: + throw new ElasticSearchIllegalStateException("Illegal update operation " + translate.operation()); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java b/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java index 1f5b3776d9e39..0bbfbac09490a 100644 --- a/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java @@ -40,7 +40,7 @@ public abstract class InstanceShardOperationRequest