Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose CCR stats to monitoring #33617

Merged
merged 12 commits into from
Sep 12, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ followClusterTestCluster {
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.monitoring.collection.enabled', 'true'
extraConfigFile 'roles.yml', 'roles.yml'
setupCommand 'setupTestAdmin',
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;

public class FollowIndexSecurityIT extends ESRestTestCase {
Expand Down Expand Up @@ -80,6 +81,7 @@ public void testFollowIndex() throws Exception {
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Expand Down Expand Up @@ -203,4 +205,46 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
ensureYellow(".monitoring-*");

Request request = new Request("GET", "/.monitoring-*/_search");
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
Map<String, ?> response = toMap(adminClient().performRequest(request));

int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, greaterThanOrEqualTo(1));

int numberOfOperationsReceived = 0;
int numberOfOperationsIndexed = 0;

List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
for (int i = 0; i < numDocs; i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
continue;
}

int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
int foundNumberOfOperationsIndexed =
(int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
}

assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}

}
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class FollowIndexIT extends ESRestTestCase {

Expand Down Expand Up @@ -75,6 +76,7 @@ public void testFollowIndex() throws Exception {
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
assertBusy(() -> verifyCcrMonitoring(leaderIndexName));
}
}

Expand Down Expand Up @@ -104,6 +106,7 @@ public void testAutoFollowPatterns() throws Exception {
ensureYellow("logs-20190101");
verifyDocuments("logs-20190101", 5);
});
assertBusy(() -> verifyCcrMonitoring("logs-20190101"));
}

private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
Expand Down Expand Up @@ -155,6 +158,39 @@ private static void verifyDocuments(String index, int expectedNumDocs) throws IO
}
}

private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
ensureYellow(".monitoring-*");

Request request = new Request("GET", "/.monitoring-*/_search");
request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
Map<String, ?> response = toMap(client().performRequest(request));

int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, greaterThanOrEqualTo(1));

int numberOfOperationsReceived = 0;
int numberOfOperationsIndexed = 0;

List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
for (int i = 0; i < numDocs; i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
continue;
}

int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
int foundNumberOfOperationsIndexed =
(int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
}

assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
}

private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,17 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
Expand All @@ -66,6 +62,10 @@
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -76,8 +76,8 @@
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING;
import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING;
import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING;

/**
* Container class for CCR functionality.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
import java.util.List;
Expand All @@ -22,11 +23,6 @@ private CcrSettings() {

}

/**
* Setting for controlling whether or not CCR is enabled.
*/
static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);

/**
* Index setting for a following index.
*/
Expand All @@ -46,7 +42,7 @@ private CcrSettings() {
*/
static List<Setting<?>> getSettings() {
return Arrays.asList(
CCR_ENABLED_SETTING,
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

public class TransportCcrStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
CcrStatsAction.TasksRequest,
CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> {
CcrStatsAction.StatsRequest,
CcrStatsAction.StatsResponses, CcrStatsAction.StatsResponse> {

private final IndexNameExpressionResolver resolver;
private final CcrLicenseChecker ccrLicenseChecker;
Expand All @@ -54,8 +54,8 @@ public TransportCcrStatsAction(
clusterService,
transportService,
actionFilters,
CcrStatsAction.TasksRequest::new,
CcrStatsAction.TasksResponse::new,
CcrStatsAction.StatsRequest::new,
CcrStatsAction.StatsResponses::new,
Ccr.CCR_THREAD_POOL_NAME);
this.resolver = Objects.requireNonNull(resolver);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
Expand All @@ -64,8 +64,8 @@ public TransportCcrStatsAction(
@Override
protected void doExecute(
final Task task,
final CcrStatsAction.TasksRequest request,
final ActionListener<CcrStatsAction.TasksResponse> listener) {
final CcrStatsAction.StatsRequest request,
final ActionListener<CcrStatsAction.StatsResponses> listener) {
if (ccrLicenseChecker.isCcrAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
Expand All @@ -74,21 +74,21 @@ protected void doExecute(
}

@Override
protected CcrStatsAction.TasksResponse newResponse(
final CcrStatsAction.TasksRequest request,
final List<CcrStatsAction.TaskResponse> taskResponses,
protected CcrStatsAction.StatsResponses newResponse(
final CcrStatsAction.StatsRequest request,
final List<CcrStatsAction.StatsResponse> statsRespons,
final List<TaskOperationFailure> taskOperationFailures,
final List<FailedNodeException> failedNodeExceptions) {
return new CcrStatsAction.TasksResponse(taskOperationFailures, failedNodeExceptions, taskResponses);
return new CcrStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
}

@Override
protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) throws IOException {
return new CcrStatsAction.TaskResponse(in);
protected CcrStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
return new CcrStatsAction.StatsResponse(in);
}

@Override
protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer<ShardFollowNodeTask> operation) {
protected void processTasks(final CcrStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
for (final Task task : taskManager.getTasks().values()) {
Expand All @@ -103,10 +103,10 @@ protected void processTasks(final CcrStatsAction.TasksRequest request, final Con

@Override
protected void taskOperation(
final CcrStatsAction.TasksRequest request,
final CcrStatsAction.StatsRequest request,
final ShardFollowNodeTask task,
final ActionListener<CcrStatsAction.TaskResponse> listener) {
listener.onResponse(new CcrStatsAction.TaskResponse(task.getFollowShardId(), task.getStatus()));
final ActionListener<CcrStatsAction.StatsResponse> listener) {
listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest();
final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions()));
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public void onFailure(final Exception e) {

public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener<CcrStatsAction.TasksResponse>() {
client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.StatsRequest(), new ActionListener<CcrStatsAction.StatsResponses>() {
@Override
public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) {
public void onResponse(final CcrStatsAction.StatsResponses statsResponses) {
latch.countDown();
fail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ private XPackSettings() {
throw new IllegalStateException("Utility class should not be instantiated");
}


/**
* Setting for controlling whether or not CCR is enabled.
*/
public static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);

/** Setting for enabling or disabling security. Defaults to true. */
public static final Setting<Boolean> SECURITY_ENABLED = Setting.boolSetting("xpack.security.enabled", true, Setting.Property.NodeScope);

Expand Down
Loading