From 52700bc8b5c9aec32eae6390e39c9a4408b3b0d5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 May 2013 13:13:53 +0200 Subject: [PATCH] Refactor #3 --- .../action/bulk/TransportShardBulkAction.java | 66 +++++++++++-------- .../test/integration/document/BulkTests.java | 38 +++++++++++ 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cd8c7900465d9..27890f8107b1c 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -151,9 +151,10 @@ protected PrimaryResponse shardOperationOnP if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { - WriteResult result = shardIndexOperation(request, indexRequest, item, clusterState, indexShard, true); + WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true); // add the response - responses[i] = result.response; + IndexResponse indexResponse = result.response(); + responses[i] = new BulkItemResponse(item.id(), "index", indexResponse); preVersions[i] = result.preVersion; if (result.mappingToUpdate != null) { if (mappingsToUpdate == null) { @@ -190,7 +191,8 @@ protected PrimaryResponse shardOperationOnP DeleteRequest deleteRequest = (DeleteRequest) item.request(); try { // add the response - responses[i] = shardDeleteOperation(deleteRequest, item, indexShard).response; + IndexResponse indexResponse = shardDeleteOperation(deleteRequest, indexShard).response(); + responses[i] = new BulkItemResponse(item.id(), "delete", indexResponse); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -214,8 +216,8 @@ protected PrimaryResponse shardOperationOnP UpdateRequest updateRequest = (UpdateRequest) item.request(); int retryCount = 0; do { - UpdateResult updateResult = shardUpdateOperation(clusterState, request, updateRequest, item, indexShard); - if (updateResult.writeResult != null) { + UpdateResult updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard); + if (updateResult.success()) { switch (updateResult.translate.operation()) { case UPSERT: case INDEX: @@ -223,13 +225,14 @@ protected PrimaryResponse shardOperationOnP IndexRequest indexRequest = updateResult.request(); BytesReference indexSourceAsBytes = indexRequest.source(); // add the response - UpdateResponse updateResponse = new UpdateResponse(result.response.getIndex(), result.response.getType(), result.response.getId(), result.response.getVersion()); - updateResponse.setMatches(((IndexResponse) result.response.getResponse()).getMatches()); + IndexResponse indexResponse = result.response(); + UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion()); + updateResponse.setMatches(indexResponse.getMatches()); if (updateRequest.fields() != null && updateRequest.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, result.response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); } - responses[i] = new BulkItemResponse(result.response.getItemId(), "update", updateResponse); + responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); preVersions[i] = result.preVersion; if (result.mappingToUpdate != null) { if (mappingsToUpdate == null) { @@ -247,14 +250,18 @@ protected PrimaryResponse shardOperationOnP request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest); break; case DELETE: - BulkItemResponse response = updateResult.writeResult.response; + DeleteResponse response = updateResult.writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.translate.updatedSourceAsMap(), updateResult.translate.updateSourceContentType(), null)); - responses[i] = new BulkItemResponse(response.getItemId(), "update", updateResponse); + responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); // Replace the update request to the translated delete request to execute on the replica. request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest); break; + case NONE: + responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult); + request.items()[i] = null; // No need to go to the replica + break; } } else if (updateResult.failure()) { Throwable t = updateResult.error; @@ -321,21 +328,27 @@ protected PrimaryResponse shardOperationOnP static class WriteResult { - final BulkItemResponse response; + final Object response; final long preVersion; final Tuple mappingToUpdate; final Engine.IndexingOperation op; - WriteResult(BulkItemResponse response, long preVersion, Tuple mappingToUpdate, Engine.IndexingOperation op) { + WriteResult(Object response, long preVersion, Tuple mappingToUpdate, Engine.IndexingOperation op) { this.response = response; this.preVersion = preVersion; this.mappingToUpdate = mappingToUpdate; this.op = op; } + + @SuppressWarnings("unchecked") + T response() { + return (T) response; + } + } - private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, BulkItemRequest item, - ClusterState clusterState, IndexShard indexShard, boolean processed) { + private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState, + IndexShard indexShard, boolean processed) { // validate, if routing is required, that we got routing MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type()); @@ -380,20 +393,17 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i op = null; } - return new WriteResult(new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), - new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version)), preVersion, - mappingsToUpdate, op - ); + IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version); + return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op); } - private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, BulkItemRequest item, IndexShard indexShard) { + private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) { Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); indexShard.delete(delete); // update the request with the version so it will go to the replicas deleteRequest.version(delete.version()); - BulkItemResponse bulkItemResponse = new BulkItemResponse(item.id(), "delete", - new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound())); - return new WriteResult(bulkItemResponse, deleteRequest.version(), null, null); + DeleteResponse deleteResponse = new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()); + return new WriteResult(deleteResponse, deleteRequest.version(), null, null); } static class UpdateResult { @@ -438,6 +448,10 @@ boolean failure() { return error != null; } + boolean success() { + return noopResult != null || writeResult != null; + } + @SuppressWarnings("unchecked") T request() { return (T) actionRequest; @@ -446,7 +460,7 @@ T request() { } - private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, BulkItemRequest item, IndexShard indexShard) { + private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) { UpdateHelper.Result translate; try { translate = updateHelper.prepare(updateRequest, indexShard); @@ -462,7 +476,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe case INDEX: IndexRequest indexRequest = translate.action(); try { - WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, item, clusterState, indexShard, false); + WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false); return new UpdateResult(translate, indexRequest, result); } catch (Throwable t) { t = ExceptionsHelper.unwrapCause(t); @@ -475,7 +489,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe case DELETE: DeleteRequest deleteRequest = translate.action(); try { - WriteResult result = shardDeleteOperation(deleteRequest, item, indexShard); + WriteResult result = shardDeleteOperation(deleteRequest, indexShard); return new UpdateResult(translate, deleteRequest, result); } catch (Throwable t) { t = ExceptionsHelper.unwrapCause(t); diff --git a/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java index f068bf236b2c7..b6232fc61c3ce 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java @@ -217,6 +217,44 @@ public void testBulkUpdate_largerVolume() throws Exception { assertThat(response.getItems()[i].getOpType(), equalTo("update")); } } + + builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"none\"") + ); + } + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getItemId(), equalTo(i)); + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + } + + builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"delete\"") + ); + } + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getItemId(), equalTo(i)); + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(false)); + } } }