From 41ea2019db44440341a6ef8885237de6cb222b89 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 14 Mar 2024 14:40:22 -0700 Subject: [PATCH] add an integration test for remote address Signed-off-by: Chenyang Ji --- .../QueryInsightsPluginTransportIT.java | 23 +++++++++++++++---- .../core/listener/QueryInsightsListener.java | 2 +- .../listener/QueryInsightsListenerTests.java | 10 +++++--- .../action/search/SearchRequestContext.java | 5 ++-- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java index 04e715444f50a..93e03143a9c17 100644 --- a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -14,19 +14,26 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Assert; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -49,6 +56,8 @@ public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { private final int TOTAL_NUMBER_OF_NODES = 2; private final int TOTAL_SEARCH_REQUESTS = 5; + private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; @Override protected Collection> nodePlugins() { @@ -143,7 +152,7 @@ public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionExc /** * Test get top queries when feature enabled */ - public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException, UnknownHostException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -168,10 +177,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { } // making search requests to get top queries for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { - SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + SearchRequestBuilder requestBuilder = internalCluster().client(randomFrom(nodes)) .prepareSearch() - .setQuery(QueryBuilders.matchAllQuery()) - .get(); + .setQuery(QueryBuilders.matchAllQuery()); + requestBuilder.request().remoteAddress(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); + SearchResponse searchResponse = requestBuilder.get(); assertEquals(searchResponse.getFailedShards(), 0); } // Sleep to wait for queue drained to top queries store @@ -181,6 +191,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { Assert.assertEquals(0, response.failures().size()); Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + for (TopQueries nodeRecords : response.getNodes()) { + for (SearchQueryRecord record : nodeRecords.getTopQueriesRecord()) { + Assert.assertEquals(remoteAddress + ":" + remotePort, record.getAttributes().get(Attribute.REMOTE_ADDRESS)); + } + } internalCluster().stopAllNodes(); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index ca9cd279191b2..672e32fefce2b 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -138,7 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); - attributes.put(Attribute.REMOTE_ADDRESS, searchRequestContext.getRequestRemoteAddress()); + attributes.put(Attribute.REMOTE_ADDRESS, String.valueOf(searchRequestContext.getRequestRemoteAddress())); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index c8e110b1fed9b..591313440c221 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.Attribute; @@ -27,6 +28,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,10 +58,11 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final Settings.Builder settingsBuilder = Settings.builder(); private final Settings settings = settingsBuilder.build(); private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; private ClusterService clusterService; @Before - public void setup() { + public void setup() throws UnknownHostException { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); @@ -66,7 +70,7 @@ public void setup() { clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); - when(searchRequestContext.getRequestRemoteAddress()).thenReturn(remoteAddress); + when(searchRequestContext.getRequestRemoteAddress()).thenReturn(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); } public void testOnRequestEnd() { @@ -107,7 +111,7 @@ public void testOnRequestEnd() { assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); assertEquals(indices, attrs.get(Attribute.INDICES)); assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); - assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS)); + assertEquals(remoteAddress + ":" + remotePort, attrs.get(Attribute.REMOTE_ADDRESS)); } public void testConcurrentOnRequestEnd() throws InterruptedException { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index be5239ca57448..4a89c548f97ca 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.transport.TransportAddress; import java.util.EnumMap; import java.util.HashMap; @@ -51,8 +52,8 @@ public Map phaseTookMap() { return phaseTookMap; } - public String getRequestRemoteAddress() { - return searchRequest.remoteAddress().toString(); + public TransportAddress getRequestRemoteAddress() { + return searchRequest.remoteAddress(); } SearchResponse.PhaseTook getPhaseTook() {