Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete API for weighted round robin search routing #4400

Merged
merged 6 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Delete api for weighted shard routing([#4400](https://github.com/opensearch-project/OpenSearch/pull/4400/))
- Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360))
- Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638))
- Load the deprecated master role in a dedicated method instead of in setAdditionalRoles() ([#4582](https://github.com/opensearch-project/OpenSearch/pull/4582))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ public void testApiNamingConventions() throws Exception {
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.get_weighted_routing",
"cluster.delete_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness",
"cluster.delete_decommission_awareness", };
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"cluster.delete_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/delete",
"description": "Delete weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/weights",
"methods": [
"DELETE"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.routing;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -284,4 +285,67 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testDeleteWeightedRouting_WeightsNotSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.junit.Assert;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase {
@Override
protected int numberOfReplicas() {
return 2;
}

public void testSearchWithWRRShardRouting() throws IOException {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

logger.info("--> starting 6 nodes on different zones");
List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
String B_1 = nodes.get(2);
String A_1 = nodes.get(3);
String C_0 = nodes.get(4);
String C_1 = nodes.get(5);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index.number_of_replicas", 2))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
}

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

Set<String> hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}
// search should not go to nodes in zone c
assertThat(hitNodes.size(), lessThanOrEqualTo(4));
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
for (DiscoveryNode node : dataNodes) {
if (node.getAttributes().get("zone").equals("c")) {
nodeIdsFromZoneWithWeightZero.add(node.getId());
}
}
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(hitNodes.contains(nodeId));
}

NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();
for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (stat.getNode().getAttributes().get("zone").equals("c")) {
assertEquals(0, searchStats.getQueryCount());
assertEquals(0, searchStats.getFetchCount());

} else {
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}
}

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertEquals(deleteResponse.isAcknowledged(), true);

hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 100; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

// Check shard routing requests hit data nodes in zone c
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(!hitNodes.contains(nodeId));
}
nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet();

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Assert.assertTrue(searchStats.getFetchCount() > 0L);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
Expand Down Expand Up @@ -302,6 +304,7 @@
import org.opensearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterDeleteWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
Expand Down Expand Up @@ -580,6 +583,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -766,8 +770,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());

registerHandler.accept(new RestClusterPutWeightedRoutingAction());
registerHandler.accept(new RestClusterGetWeightedRoutingAction());
registerHandler.accept(new RestClusterDeleteWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.delete;

import org.opensearch.action.ActionType;

/**
* Action to delete weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterDeleteWeightedRoutingAction extends ActionType<ClusterDeleteWeightedRoutingResponse> {
public static final ClusterDeleteWeightedRoutingAction INSTANCE = new ClusterDeleteWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/delete";

private ClusterDeleteWeightedRoutingAction() {
super(NAME, ClusterDeleteWeightedRoutingResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.delete;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Request to delete weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterDeleteWeightedRoutingRequest> {
public ClusterDeleteWeightedRoutingRequest() {}

public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public String toString() {
return "ClusterDeleteWeightedRoutingRequest";
}
}
Loading