Skip to content

Commit

Permalink
[Segment Replication] Add random replication strategy (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#12297)

* Add new method to pick a random replication strategy.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* replace usage of refresh with refreshandWaitForReplication()

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add replication strategy logging.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Signed-off-by: Aman Khare <amkhar@amazon.com>
  • Loading branch information
Rishikesh1159 authored and Aman Khare committed Mar 12, 2024
1 parent 7e3b3a6 commit f9f1999
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,17 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(PrivateCustomPlugin.class);
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
}

@Before
public void indexData() throws Exception {
index("foo", "bar", "1", XContentFactory.jsonBuilder().startObject().field("foo", "foo").endObject());
index("fuu", "buu", "1", XContentFactory.jsonBuilder().startObject().field("fuu", "fuu").endObject());
index("baz", "baz", "1", XContentFactory.jsonBuilder().startObject().field("baz", "baz").endObject());
refresh();
refreshAndWaitForReplication();
}

public void testRoutingTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public static Collection<Object[]> parameters() {
);
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
}

// One of the primary purposes of the query cache is to cache aggs results
public void testCacheAggs() throws Exception {
Client client = client();
Expand Down Expand Up @@ -180,7 +185,7 @@ public void testQueryRewrite() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -250,7 +255,7 @@ public void testQueryRewriteMissingValues() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -316,7 +321,7 @@ public void testQueryRewriteDates() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -389,7 +394,7 @@ public void testQueryRewriteDatesWithNow() throws Exception {
.setFlush(true)
.get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index-1", "index-2", "index-3");

assertCacheState(client, "index-1", 0, 0);
Expand Down Expand Up @@ -460,7 +465,7 @@ public void testCanCache() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -554,7 +559,7 @@ public void testCacheWithFilteredAlias() throws InterruptedException {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();

indexRandomForConcurrentSearch("index");

Expand Down Expand Up @@ -661,7 +666,7 @@ public void testCacheWithInvalidation() throws Exception {
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
refreshAndWaitForReplication();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.monitor.os.OsInfo;
import org.opensearch.node.NodeMocksPlugin;
Expand Down Expand Up @@ -208,6 +209,7 @@
import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.XContentTestUtils.convertToMap;
import static org.opensearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -1887,9 +1889,25 @@ protected Settings nodeSettings(int nodeOrdinal) {
builder.put(TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING.getKey(), true);
builder.put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), true);
}

// Randomly set a replication strategy for the node. Replication Strategy can still be manually overridden by subclass if needed.
if (useRandomReplicationStrategy()) {
ReplicationType replicationType = randomBoolean() ? ReplicationType.DOCUMENT : ReplicationType.SEGMENT;
logger.info("Randomly using Replication Strategy as {}.", replicationType.toString());
builder.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), replicationType);
}
return builder.build();
}

/**
* Used for selecting random replication strategy, either DOCUMENT or SEGMENT.
* This method must be overridden by subclass to use random replication strategy.
* Should be used only on test classes where replication strategy is not critical for tests.
*/
protected boolean useRandomReplicationStrategy() {
return false;
}

protected Path nodeConfigPath(int nodeOrdinal) {
return null;
}
Expand Down

0 comments on commit f9f1999

Please sign in to comment.