Skip to content

Commit

Permalink
getting the stats of a transform can timeout, this change lets the
Browse files Browse the repository at this point in the history
retrieval of checkpointing info timeout if it takes too long. As stats are
usually requested for all transforms, this prevents a failure if just 1
transform does not respond
  • Loading branch information
Hendrik Muhs committed Sep 26, 2023
1 parent 1a48c59 commit 8782ab7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
Expand Down Expand Up @@ -63,6 +64,9 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans

private static final Logger logger = LogManager.getLogger(TransportGetTransformStatsAction.class);

// default timeout share to receive checkpoint info, with the default of 30s: 30s * 0.8 = 24s
private static final double CHECKPOINT_INFO_TIMEOUT_SHARE = 0.8;

private final TransformConfigManager transformConfigManager;
private final TransformCheckpointService transformCheckpointService;
private final Client client;
Expand Down Expand Up @@ -114,6 +118,7 @@ protected void taskOperation(CancellableTask actionTask, Request request, Transf
// Little extra insurance, make sure we only return transforms that aren't cancelled
ClusterState state = clusterService.state();
String nodeId = state.nodes().getLocalNode().getId();

if (task.isCancelled() == false) {
task.getCheckpointingInfo(
transformCheckpointService,
Expand All @@ -130,7 +135,10 @@ protected void taskOperation(CancellableTask actionTask, Request request, Transf
)
);
}
)
),
// at this point the transport already spend some time budget in `doExecute`, it is hard to tell what is left:
// recording the time spend would be complex and crosses machine boundaries, that's why we use a heuristic here
TimeValue.timeValueMillis((long) (request.getTimeout().millis() * CHECKPOINT_INFO_TIMEOUT_SHARE))
);
} else {
listener.onResponse(new Response(Collections.emptyList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
Expand Down Expand Up @@ -180,18 +183,28 @@ public TransformIndexerStats getStats() {

public void getCheckpointingInfo(
TransformCheckpointService transformsCheckpointService,
ActionListener<TransformCheckpointingInfo> listener
ActionListener<TransformCheckpointingInfo> listener,
TimeValue timeout
) {
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ActionListener.wrap(infoBuilder -> {
if (context.getChangesLastDetectedAt() != null) {
infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt());
}
if (context.getLastSearchTime() != null) {
infoBuilder.setLastSearchTime(context.getLastSearchTime());
}
listener.onResponse(infoBuilder.build());
}, listener::onFailure);
ActionListener<TransformCheckpointingInfoBuilder> checkPointInfoListener = ListenerTimeouts.wrapWithTimeout(
threadPool,
timeout,
threadPool.generic(),
ActionListener.wrap(infoBuilder -> {
if (context.getChangesLastDetectedAt() != null) {
infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt());
}
if (context.getLastSearchTime() != null) {
infoBuilder.setLastSearchTime(context.getLastSearchTime());
}
listener.onResponse(infoBuilder.build());
}, listener::onFailure),
(ignore) -> listener.onFailure(
new ElasticsearchTimeoutException(format("Timed out retrieving checkpointing info after [%s]", timeout))
)
);

// TODO: pass `timeout` to the lower layers
ClientTransformIndexer transformIndexer = getIndexer();
if (transformIndexer == null) {
transformsCheckpointService.getCheckpointingInfo(
Expand Down

0 comments on commit 8782ab7

Please sign in to comment.