Skip to content

Commit

Permalink
Refactor #3
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed May 3, 2013
1 parent 5282672 commit 52700bc
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
WriteResult result = shardIndexOperation(request, indexRequest, item, clusterState, indexShard, true);
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
// add the response
responses[i] = result.response;
IndexResponse indexResponse = result.response();
responses[i] = new BulkItemResponse(item.id(), "index", indexResponse);
preVersions[i] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
Expand Down Expand Up @@ -190,7 +191,8 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
// add the response
responses[i] = shardDeleteOperation(deleteRequest, item, indexShard).response;
IndexResponse indexResponse = shardDeleteOperation(deleteRequest, indexShard).response();
responses[i] = new BulkItemResponse(item.id(), "delete", indexResponse);
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
Expand All @@ -214,22 +216,23 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
UpdateRequest updateRequest = (UpdateRequest) item.request();
int retryCount = 0;
do {
UpdateResult updateResult = shardUpdateOperation(clusterState, request, updateRequest, item, indexShard);
if (updateResult.writeResult != null) {
UpdateResult updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
if (updateResult.success()) {
switch (updateResult.translate.operation()) {
case UPSERT:
case INDEX:
WriteResult result = updateResult.writeResult;
IndexRequest indexRequest = updateResult.request();
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
UpdateResponse updateResponse = new UpdateResponse(result.response.getIndex(), result.response.getType(), result.response.getId(), result.response.getVersion());
updateResponse.setMatches(((IndexResponse) result.response.getResponse()).getMatches());
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion());
updateResponse.setMatches(indexResponse.getMatches());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, result.response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[i] = new BulkItemResponse(result.response.getItemId(), "update", updateResponse);
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
preVersions[i] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
Expand All @@ -247,14 +250,18 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest);
break;
case DELETE:
BulkItemResponse response = updateResult.writeResult.response;
DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.translate.updatedSourceAsMap(), updateResult.translate.updateSourceContentType(), null));
responses[i] = new BulkItemResponse(response.getItemId(), "update", updateResponse);
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
// Replace the update request to the translated delete request to execute on the replica.
request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest);
break;
case NONE:
responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
request.items()[i] = null; // No need to go to the replica
break;
}
} else if (updateResult.failure()) {
Throwable t = updateResult.error;
Expand Down Expand Up @@ -321,21 +328,27 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP

static class WriteResult {

final BulkItemResponse response;
final Object response;
final long preVersion;
final Tuple<String, String> mappingToUpdate;
final Engine.IndexingOperation op;

WriteResult(BulkItemResponse response, long preVersion, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
WriteResult(Object response, long preVersion, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
this.response = response;
this.preVersion = preVersion;
this.mappingToUpdate = mappingToUpdate;
this.op = op;
}

@SuppressWarnings("unchecked")
<T> T response() {
return (T) response;
}

}

private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, BulkItemRequest item,
ClusterState clusterState, IndexShard indexShard, boolean processed) {
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, boolean processed) {

// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
Expand Down Expand Up @@ -380,20 +393,17 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
op = null;
}

return new WriteResult(new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version)), preVersion,
mappingsToUpdate, op
);
IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version);
return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op);
}

private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, BulkItemRequest item, IndexShard indexShard) {
private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.version(delete.version());
BulkItemResponse bulkItemResponse = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()));
return new WriteResult(bulkItemResponse, deleteRequest.version(), null, null);
DeleteResponse deleteResponse = new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound());
return new WriteResult(deleteResponse, deleteRequest.version(), null, null);
}

static class UpdateResult {
Expand Down Expand Up @@ -438,6 +448,10 @@ boolean failure() {
return error != null;
}

boolean success() {
return noopResult != null || writeResult != null;
}

@SuppressWarnings("unchecked")
<T extends ActionRequest> T request() {
return (T) actionRequest;
Expand All @@ -446,7 +460,7 @@ <T extends ActionRequest> T request() {

}

private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, BulkItemRequest item, IndexShard indexShard) {
private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
UpdateHelper.Result translate;
try {
translate = updateHelper.prepare(updateRequest, indexShard);
Expand All @@ -462,7 +476,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
case INDEX:
IndexRequest indexRequest = translate.action();
try {
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, item, clusterState, indexShard, false);
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false);
return new UpdateResult(translate, indexRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
Expand All @@ -475,7 +489,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
case DELETE:
DeleteRequest deleteRequest = translate.action();
try {
WriteResult result = shardDeleteOperation(deleteRequest, item, indexShard);
WriteResult result = shardDeleteOperation(deleteRequest, indexShard);
return new UpdateResult(translate, deleteRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,44 @@ public void testBulkUpdate_largerVolume() throws Exception {
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
}
}

builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"none\"")
);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getItemId(), equalTo(i));
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
}

builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"delete\"")
);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getItemId(), equalTo(i));
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(false));
}
}

}

0 comments on commit 52700bc

Please sign in to comment.