Skip to content

Commit

Permalink
Merge pull request brianfrankcooper#108 from phaneesh/master
Browse files Browse the repository at this point in the history
TransportClient support for connecting to remote elastic search clusters
  • Loading branch information
busbey committed May 29, 2015
2 parents 502fbb4 + c0c157a commit 8e55575
Showing 1 changed file with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import static org.elasticsearch.common.settings.ImmutableSettings.*;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.FilterBuilders.*;
Expand All @@ -37,10 +40,13 @@ public class ElasticSearchClient extends DB {

public static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster";
public static final String DEFAULT_INDEX_KEY = "es.ycsb";
public static final String DEFAULT_REMOTE_HOST = "localhost:9300";
private Node node;
private Client client;
private String indexKey;

private Boolean remoteMode;

/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
Expand All @@ -51,6 +57,8 @@ public void init() throws DBException {
Properties props = getProperties();
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
String clusterName = props.getProperty("cluster.name", DEFAULT_CLUSTER_NAME);
//Check if transport client needs to be used (To connect to multiple elasticsearch nodes)
remoteMode = Boolean.parseBoolean(props.getProperty("elasticsearch.remote", "false"));
Boolean newdb = Boolean.parseBoolean(props.getProperty("elasticsearch.newdb", "false"));
Builder settings = settingsBuilder()
.put("node.local", "true")
Expand All @@ -68,10 +76,28 @@ public void init() throws DBException {
settings.put(props);
System.out.println("ElasticSearch starting node = " + settings.get("cluster.name"));
System.out.println("ElasticSearch node data path = " + settings.get("path.data"));
System.out.println("ElasticSearch Remote Mode = " +remoteMode);
//Remote mode support for connecting to remote elasticsearch cluster
if(remoteMode) {
settings.put("client.transport.sniff", true)
.put("client.transport.ignore_cluster_name", false)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s");
//Default it to localhost:9300
String nodeList[] = props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST).split(",");
System.out.println("ElasticSearch Remote Hosts = " +props.getProperty("elasticsearch.hosts.list", DEFAULT_REMOTE_HOST));
TransportClient tClient = new TransportClient(settings);
for(String h : nodeList) {
String node[] = h.split(":");
tClient.addTransportAddress(new InetSocketTransportAddress(node[0], Integer.parseInt(node[1])));
}
client = tClient;
} else { //Start node only if transport client mode is disabled
node = nodeBuilder().clusterName(clusterName).settings(settings).node();
node.start();
client = node.client();
}

node = nodeBuilder().clusterName(clusterName).settings(settings).node();
node.start();
client = node.client();

if (newdb) {
client.admin().indices().prepareDelete(indexKey).execute().actionGet();
Expand All @@ -86,10 +112,14 @@ public void init() throws DBException {

@Override
public void cleanup() throws DBException {
if (!node.isClosed()) {
if(!remoteMode) {
if (!node.isClosed()) {
client.close();
node.stop();
node.close();
}
} else {
client.close();
node.stop();
node.close();
}
}

Expand Down

0 comments on commit 8e55575

Please sign in to comment.