Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Index API to High Level Rest Client #23040

Merged
merged 5 commits into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,21 @@
package org.elasticsearch.client;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

Expand All @@ -34,6 +46,8 @@

final class Request {

private static final String DELIMITER = "/";

final String method;
final String endpoint;
final Map<String, String> params;
Expand All @@ -60,56 +74,182 @@ static Request ping() {
}

static Request exists(GetRequest getRequest) {
return new Request("HEAD", getEndpoint(getRequest), getParams(getRequest), null);
Request request = get(getRequest);
return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null);
}

static Request get(GetRequest getRequest) {
return new Request("GET", getEndpoint(getRequest), getParams(getRequest), null);
String endpoint = endpoint(getRequest.index(), getRequest.type(), getRequest.id());

Params parameters = Params.builder();
parameters.withPreference(getRequest.preference());
parameters.withRouting(getRequest.routing());
parameters.withParent(getRequest.parent());
parameters.withRefresh(getRequest.refresh());
parameters.withRealtime(getRequest.realtime());
parameters.withStoredFields(getRequest.storedFields());
parameters.withVersion(getRequest.version());
parameters.withVersionType(getRequest.versionType());
parameters.withFetchSourceContext(getRequest.fetchSourceContext());

return new Request(HttpGet.METHOD_NAME, endpoint, parameters.getParams(), null);
}

private static Map<String, String> getParams(GetRequest getRequest) {
Map<String, String> params = new HashMap<>();
putParam("preference", getRequest.preference(), params);
putParam("routing", getRequest.routing(), params);
putParam("parent", getRequest.parent(), params);
if (getRequest.refresh()) {
params.put("refresh", Boolean.TRUE.toString());
static Request index(IndexRequest indexRequest) {
String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME;

boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);

Params parameters = Params.builder();
parameters.withRouting(indexRequest.routing());
parameters.withParent(indexRequest.parent());
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());

BytesRef source = indexRequest.source().toBytesRef();
ContentType contentType = ContentType.create(indexRequest.getContentType().mediaType());
HttpEntity entity = new ByteArrayEntity(source.bytes, source.offset, source.length, contentType);

return new Request(method, endpoint, parameters.getParams(), entity);
}

/**
* Utility method to build request's endpoint.
*/
static String endpoint(String... parts) {
if (parts == null || parts.length == 0) {
return DELIMITER;
}
if (getRequest.realtime() == false) {
params.put("realtime", Boolean.FALSE.toString());

StringJoiner joiner = new StringJoiner(DELIMITER, DELIMITER, "");
for (String part : parts) {
if (part != null) {
joiner.add(part);
}
}
if (getRequest.storedFields() != null && getRequest.storedFields().length > 0) {
params.put("stored_fields", String.join(",", getRequest.storedFields()));
return joiner.toString();
}

/**
* Utility class to build request's parameters map and centralize all parameter names.
*/
static class Params {
private final Map<String, String> params = new HashMap<>();

private Params() {
}
if (getRequest.version() != Versions.MATCH_ANY) {
params.put("version", Long.toString(getRequest.version()));

Params putParam(String key, String value) {
if (Strings.hasLength(value)) {
if (params.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException("Request parameter [" + key + "] is already registered");
}
}
return this;
}
if (getRequest.versionType() != VersionType.INTERNAL) {
params.put("version_type", getRequest.versionType().name().toLowerCase(Locale.ROOT));

Params putParam(String key, TimeValue value) {
if (value != null) {
return putParam(key, value.getStringRep());
}
return this;
}
if (getRequest.fetchSourceContext() != null) {
FetchSourceContext fetchSourceContext = getRequest.fetchSourceContext();
if (fetchSourceContext.fetchSource() == false) {
params.put("_source", Boolean.FALSE.toString());

Params withFetchSourceContext(FetchSourceContext fetchSourceContext) {
if (fetchSourceContext != null) {
if (fetchSourceContext.fetchSource() == false) {
putParam("_source", Boolean.FALSE.toString());
}
if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) {
putParam("_source_include", String.join(",", fetchSourceContext.includes()));
}
if (fetchSourceContext.excludes() != null && fetchSourceContext.excludes().length > 0) {
putParam("_source_exclude", String.join(",", fetchSourceContext.excludes()));
}
}
if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) {
params.put("_source_include", String.join(",", fetchSourceContext.includes()));
return this;
}

Params withParent(String parent) {
return putParam("parent", parent);
}

Params withPipeline(String pipeline) {
return putParam("pipeline", pipeline);
}

Params withPreference(String preference) {
return putParam("preference", preference);
}

Params withRealtime(boolean realtime) {
if (realtime == false) {
return putParam("realtime", Boolean.FALSE.toString());
}
if (fetchSourceContext.excludes() != null && fetchSourceContext.excludes().length > 0) {
params.put("_source_exclude", String.join(",", fetchSourceContext.excludes()));
return this;
}

Params withRefresh(boolean refresh) {
if (refresh) {
return withRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
return this;
}
return Collections.unmodifiableMap(params);
}

private static String getEndpoint(GetRequest getRequest) {
StringJoiner pathJoiner = new StringJoiner("/", "/", "");
return pathJoiner.add(getRequest.index()).add(getRequest.type()).add(getRequest.id()).toString();
}
Params withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
putParam("refresh", refreshPolicy.getValue());
}
return this;
}

Params withRouting(String routing) {
return putParam("routing", routing);
}

Params withStoredFields(String[] storedFields) {
if (storedFields != null && storedFields.length > 0) {
return putParam("stored_fields", String.join(",", storedFields));
}
return this;
}

Params withTimeout(TimeValue timeout) {
return putParam("timeout", timeout);
}

Params withVersion(long version) {
if (version != Versions.MATCH_ANY) {
return putParam("version", Long.toString(version));
}
return this;
}

Params withVersionType(VersionType versionType) {
if (versionType != VersionType.INTERNAL) {
return putParam("version_type", versionType.name().toLowerCase(Locale.ROOT));
}
return this;
}

Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
if (activeShardCount != null && activeShardCount != ActiveShardCount.DEFAULT) {
return putParam("wait_for_active_shards", activeShardCount.toString().toLowerCase(Locale.ROOT));
}
return this;
}

Map<String, String> getParams() {
return Collections.unmodifiableMap(params);
}

private static void putParam(String key, String value, Map<String, String> params) {
if (Strings.hasLength(value)) {
params.put(key, value);
static Params builder() {
return new Params();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -42,6 +44,9 @@
import java.util.Set;
import java.util.function.Function;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

/**
* High level REST client that wraps an instance of the low level {@link RestClient} and allows to build requests and read responses.
* The provided {@link RestClient} is externally built and closed.
Expand All @@ -59,37 +64,61 @@ public RestHighLevelClient(RestClient client) {
*/
public boolean ping(Header... headers) throws IOException {
return performRequest(new MainRequest(), (request) -> Request.ping(), RestHighLevelClient::convertExistsResponse,
Collections.emptySet(), headers);
emptySet(), headers);
}

/**
* Retrieves a document by id using the get api
* Retrieves a document by id using the Get API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on elastic.co</a>
*/
public GetResponse get(GetRequest getRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, Collections.singleton(404), headers);
return performRequestAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, singleton(404), headers);
}

/**
* Asynchronously retrieves a document by id using the get api
* Asynchronously retrieves a document by id using the Get API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on elastic.co</a>
*/
public void getAsync(GetRequest getRequest, ActionListener<GetResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, listener,
Collections.singleton(404), headers);
performRequestAsyncAndParseEntity(getRequest, Request::get, GetResponse::fromXContent, listener, singleton(404), headers);
}

/**
* Checks for the existence of a document. Returns true if it exists, false otherwise
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on elastic.co</a>
*/
public boolean exists(GetRequest getRequest, Header... headers) throws IOException {
return performRequest(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, Collections.emptySet(), headers);
return performRequest(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, emptySet(), headers);
}

/**
* Asynchronously checks for the existence of a document. Returns true if it exists, false otherwise
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on elastic.co</a>
*/
public void existsAsync(GetRequest getRequest, ActionListener<Boolean> listener, Header... headers) {
performRequestAsync(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, listener,
Collections.emptySet(), headers);
performRequestAsync(getRequest, Request::exists, RestHighLevelClient::convertExistsResponse, listener, emptySet(), headers);
}

/**
* Index a document using the Index API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html">Index API on elastic.co</a>
*/
public IndexResponse index(IndexRequest indexRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(indexRequest, Request::index, IndexResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously index a document using the Index API
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html">Index API on elastic.co</a>
*/
public void indexAsync(IndexRequest indexRequest, ActionListener<IndexResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(indexRequest, Request::index, IndexResponse::fromXContent, listener, emptySet(), headers);
}

private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request, Function<Req, Request> requestConverter,
Expand Down
Loading