Skip to content

Commit

Permalink
add an integration test for remote address
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Mar 14, 2024
1 parent ec91f15 commit 41ea201
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -49,6 +56,8 @@ public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase {

private final int TOTAL_NUMBER_OF_NODES = 2;
private final int TOTAL_SEARCH_REQUESTS = 5;
private final String remoteAddress = "1.2.3.4";
private final int remotePort = 1234;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -143,7 +152,7 @@ public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionExc
/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException, UnknownHostException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand All @@ -168,10 +177,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
SearchRequestBuilder requestBuilder = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
.setQuery(QueryBuilders.matchAllQuery());
requestBuilder.request().remoteAddress(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort));
SearchResponse searchResponse = requestBuilder.get();
assertEquals(searchResponse.getFailedShards(), 0);
}
// Sleep to wait for queue drained to top queries store
Expand All @@ -181,6 +191,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());
for (TopQueries nodeRecords : response.getNodes()) {
for (SearchQueryRecord record : nodeRecords.getTopQueriesRecord()) {
Assert.assertEquals(remoteAddress + ":" + remotePort, record.getAttributes().get(Attribute.REMOTE_ADDRESS));
}
}

internalCluster().stopAllNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.REMOTE_ADDRESS, searchRequestContext.getRequestRemoteAddress());
attributes.put(Attribute.REMOTE_ADDRESS, String.valueOf(searchRequestContext.getRequestRemoteAddress()));
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.Attribute;
Expand All @@ -27,6 +28,8 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -55,18 +58,19 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final String remoteAddress = "1.2.3.4";
private final int remotePort = 1234;
private ClusterService clusterService;

@Before
public void setup() {
public void setup() throws UnknownHostException {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
when(searchRequestContext.getRequestRemoteAddress()).thenReturn(remoteAddress);
when(searchRequestContext.getRequestRemoteAddress()).thenReturn(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort));
}

public void testOnRequestEnd() {
Expand Down Expand Up @@ -107,7 +111,7 @@ public void testOnRequestEnd() {
assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS));
assertEquals(indices, attrs.get(Attribute.INDICES));
assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP));
assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS));
assertEquals(remoteAddress + ":" + remotePort, attrs.get(Attribute.REMOTE_ADDRESS));
}

public void testConcurrentOnRequestEnd() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.transport.TransportAddress;

import java.util.EnumMap;
import java.util.HashMap;
Expand Down Expand Up @@ -51,8 +52,8 @@ public Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

public String getRequestRemoteAddress() {
return searchRequest.remoteAddress().toString();
public TransportAddress getRequestRemoteAddress() {
return searchRequest.remoteAddress();
}

SearchResponse.PhaseTook getPhaseTook() {
Expand Down

0 comments on commit 41ea201

Please sign in to comment.