Skip to content
This repository has been archived by the owner on Sep 24, 2019. It is now read-only.

Commit

Permalink
Support for remote path in reindex api
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Dolzhenko committed Jun 13, 2018
1 parent 1f6e874 commit 213af31
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 56 deletions.
10 changes: 5 additions & 5 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,11 @@ POST _reindex
// TEST[s/"username": "user",//]
// TEST[s/"password": "pass"//]

The `host` parameter must contain a scheme, host, and port (e.g.
`https://otherhost:9200`). The `username` and `password` parameters are
optional, and when they are present `_reindex` will connect to the remote
Elasticsearch node using basic auth. Be sure to use `https` when using
basic auth or the password will be sent in plain text.
The `host` parameter must contain a scheme, host, port (e.g.
`https://otherhost:9200`) and optional path (e.g. `https://otherhost:9200/proxy`).
The `username` and `password` parameters are optional, and when they are present `_reindex`
will connect to the remote Elasticsearch node using basic auth. Be sure to use `https` when
using basic auth or the password will be sent in plain text.

Remote hosts have to be explicitly whitelisted in elasticsearch.yaml using the
`reindex.remote.whitelist` property. It can be set to a comma delimited list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexAction> {
static final ObjectParser<ReindexRequest, Void> PARSER = new ObjectParser<>("reindex");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)(?<pathPrefix>(/.*)?)");

static {
ObjectParser.Parser<ReindexRequest, Void> sourceParser = (parser, request, context) -> {
Expand Down Expand Up @@ -139,10 +139,12 @@ static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException
String hostInRequest = requireNonNull(extractString(remote, "host"), "[host] must be specified to reindex from a remote cluster");
Matcher hostMatcher = HOST_PATTERN.matcher(hostInRequest);
if (false == hostMatcher.matches()) {
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port] but was [" + hostInRequest + "]");
throw new IllegalArgumentException("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was ["
+ hostInRequest + "]");
}
String scheme = hostMatcher.group("scheme");
String host = hostMatcher.group("host");
String pathPrefix = hostMatcher.group("pathPrefix");
int port = Integer.parseInt(hostMatcher.group("port"));
Map<String, String> headers = extractStringStringMap(remote, "headers");
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
Expand All @@ -151,7 +153,8 @@ static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException
throw new IllegalArgumentException(
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
}
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout);
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
username, password, headers, socketTimeout, connectTimeout);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
Expand Down Expand Up @@ -206,34 +207,39 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, long taskId, List<Threa
for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
}
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
return c;
})
.setHttpClientConfigCallback(c -> {
// Enable basic auth if it is configured
if (remoteInfo.getUsername() != null) {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
remoteInfo.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
}
// Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger();
c.setThreadFactory(r -> {
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
Thread t = new Thread(r, name);
threadCollector.add(t);
return t;
});
// Limit ourselves to one reactor thread because for now the search process is single threaded.
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
return c;
}).build();
final RestClientBuilder builder =
RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
return c;
})
.setHttpClientConfigCallback(c -> {
// Enable basic auth if it is configured
if (remoteInfo.getUsername() != null) {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
remoteInfo.getPassword());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, creds);
c.setDefaultCredentialsProvider(credentialsProvider);
}
// Stick the task id in the thread name so we can track down tasks from stack traces
AtomicInteger threads = new AtomicInteger();
c.setThreadFactory(r -> {
String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
Thread t = new Thread(r, name);
threadCollector.add(t);
return t;
});
// Limit ourselves to one reactor thread because for now the search process is single threaded.
c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
return c;
});
if (Strings.hasLength(remoteInfo.getPathPrefix())) {
builder.setPathPrefix(remoteInfo.getPathPrefix());
}
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@

public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
public void testBuildRestClient() throws Exception {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(),
for(final String path: new String[]{"", null, "path"}) {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
try {
assertBusy(() -> assertThat(threads, hasSize(2)));
int i = 0;
for (Thread thread : threads) {
assertEquals("es-client-" + taskId + "-" + i, thread.getName());
i++;
long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>());
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);
try {
assertBusy(() -> assertThat(threads, hasSize(2)));
int i = 0;
for (Thread thread : threads) {
assertEquals("es-client-" + taskId + "-" + i, thread.getName());
i++;
}
} finally {
client.close();
}
} finally {
client.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,30 @@ public void testBuildRemoteInfoWithAllHostParts() throws IOException {
assertEquals("http", info.getScheme());
assertEquals("example.com", info.getHost());
assertEquals(9200, info.getPort());
assertEquals("", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default

info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());

info = buildRemoteInfoHostTestCase("https://other.example.com:9201/proxy-path/");
assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort());
assertEquals("/proxy-path/", info.getPathPrefix());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());

final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> buildRemoteInfoHostTestCase("https"));
assertEquals("[host] must be of the form [scheme]://[host]:[port](/[pathPrefix])? but was [https]",
exception.getMessage());
}

public void testReindexFromRemoteRequestParsing() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@
import static java.util.Collections.emptyMap;

public class RemoteInfoTests extends ESTestCase {
private RemoteInfo newRemoteInfo(String scheme, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(),
private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}

public void testToString() {
assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery",
newRemoteInfo("http", null, null, null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser",
newRemoteInfo("http", null, "testuser", null).toString());
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("http", "testuser", "testpass").toString());
newRemoteInfo("http", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "testuser", "testpass").toString());
newRemoteInfo("https", null, "testuser", "testpass").toString());
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class RemoteInfo implements Writeable {
private final String scheme;
private final String host;
private final int port;
private final String pathPrefix;
private final BytesReference query;
private final String username;
private final String password;
Expand All @@ -62,10 +63,16 @@ public class RemoteInfo implements Writeable {
private final TimeValue connectTimeout;

public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password,
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
this(scheme, host, port, null, query, username, password, headers, socketTimeout, connectTimeout);
}

public RemoteInfo(String scheme, String host, int port, String pathPrefix, BytesReference query, String username, String password,
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
this.port = port;
this.pathPrefix = pathPrefix;
this.query = requireNonNull(query, "[query] must be specified to reindex from a remote cluster");
this.username = username;
this.password = password;
Expand Down Expand Up @@ -97,6 +104,11 @@ public RemoteInfo(StreamInput in) throws IOException {
socketTimeout = DEFAULT_SOCKET_TIMEOUT;
connectTimeout = DEFAULT_CONNECT_TIMEOUT;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
pathPrefix = in.readOptionalString();
} else {
pathPrefix = null;
}
}

@Override
Expand All @@ -116,6 +128,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(socketTimeout);
out.writeTimeValue(connectTimeout);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalString(pathPrefix);
}
}

public String getScheme() {
Expand All @@ -130,6 +145,11 @@ public int getPort() {
return port;
}

@Nullable
public String getPathPrefix() {
return pathPrefix;
}

public BytesReference getQuery() {
return query;
}
Expand Down Expand Up @@ -169,7 +189,11 @@ public String toString() {
// http is the default so it isn't worth taking up space if it is the scheme
b.append("scheme=").append(scheme).append(' ');
}
b.append("host=").append(host).append(" port=").append(port).append(" query=").append(query.utf8ToString());
b.append("host=").append(host).append(" port=").append(port);
if (pathPrefix != null) {
b.append(" pathPrefix=").append(pathPrefix);
}
b.append(" query=").append(query.utf8ToString());
if (username != null) {
b.append(" username=").append(username);
}
Expand Down

0 comments on commit 213af31

Please sign in to comment.