Skip to content

Commit

Permalink
Introduce compute listener (#110400) (#110556)
Browse files Browse the repository at this point in the history
Currently, if a child request fails, we automatically trigger  cancellation
for ES|QL requests. This can result in TaskCancelledException being
collected by the RefCountingListener first, which then returns that
exception to the caller. For example, if we encounter a
CircuitBreakingException (429), we might incorrectly return a
TaskCancelledException (400) instead.

This change introduces the ComputeListener, a variant of
RefCountingListener, which selects the most appropriate exception to return
to the caller. I also integrated the following features into ComputeListener to
simplify ComputeService:

- Automatic cancellation of sub-tasks on failure.
- Collection of profiles from sub-tasks.
- Collection of response headers from sub-tasks.
  • Loading branch information
dnhatn authored Jul 5, 2024
1 parent b5eedf2 commit d55f984
Show file tree
Hide file tree
Showing 9 changed files with 658 additions and 228 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/110400.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 110400
summary: Introduce compute listener
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
Expand All @@ -40,7 +38,7 @@ public abstract class AsyncOperator implements Operator {
private volatile SubscribableListener<Void> blockedFuture;

private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
private final AtomicReference<Exception> failure = new AtomicReference<>();
private final FailureCollector failureCollector = new FailureCollector();
private final DriverContext driverContext;

private final int maxOutstandingRequests;
Expand Down Expand Up @@ -77,7 +75,7 @@ public boolean needsInput() {

@Override
public void addInput(Page input) {
if (failure.get() != null) {
if (failureCollector.hasFailure()) {
input.releaseBlocks();
return;
}
Expand All @@ -90,7 +88,7 @@ public void addInput(Page input) {
onSeqNoCompleted(seqNo);
}, e -> {
releasePageOnAnyThread(input);
onFailure(e);
failureCollector.unwrapAndCollect(e);
onSeqNoCompleted(seqNo);
});
final long startNanos = System.nanoTime();
Expand Down Expand Up @@ -121,31 +119,12 @@ private void releasePageOnAnyThread(Page page) {

protected abstract void doClose();

private void onFailure(Exception e) {
failure.getAndUpdate(first -> {
if (first == null) {
return e;
}
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
return first;
}
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
return e;
}
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
first.addSuppressed(e);
}
return first;
});
}

private void onSeqNoCompleted(long seqNo) {
checkpoint.markSeqNoAsProcessed(seqNo);
if (checkpoint.getPersistedCheckpoint() < checkpoint.getProcessedCheckpoint()) {
notifyIfBlocked();
}
if (closed || failure.get() != null) {
if (closed || failureCollector.hasFailure()) {
discardPages();
}
}
Expand All @@ -164,7 +143,7 @@ private void notifyIfBlocked() {
}

private void checkFailure() {
Exception e = failure.get();
Exception e = failureCollector.getFailure();
if (e != null) {
discardPages();
throw ExceptionsHelper.convertToElastic(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* Run a set of drivers to completion.
Expand All @@ -35,8 +32,8 @@ public DriverRunner(ThreadContext threadContext) {
* Run all drivers to completion asynchronously.
*/
public void runToCompletion(List<Driver> drivers, ActionListener<Void> listener) {
AtomicReference<Exception> failure = new AtomicReference<>();
var responseHeadersCollector = new ResponseHeadersCollector(threadContext);
var failure = new FailureCollector();
CountDown counter = new CountDown(drivers.size());
for (int i = 0; i < drivers.size(); i++) {
Driver driver = drivers.get(i);
Expand All @@ -48,23 +45,7 @@ public void onResponse(Void unused) {

@Override
public void onFailure(Exception e) {
failure.getAndUpdate(first -> {
if (first == null) {
return e;
}
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
return first;
} else {
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
return e;
} else {
if (first != e) {
first.addSuppressed(e);
}
return first;
}
}
});
failure.unwrapAndCollect(e);
for (Driver d : drivers) {
if (driver != d) {
d.cancel("Driver [" + driver.sessionId() + "] was cancelled or failed");
Expand All @@ -77,7 +58,7 @@ private void done() {
responseHeadersCollector.collect();
if (counter.countDown()) {
responseHeadersCollector.finish();
Exception error = failure.get();
Exception error = failure.getFailure();
if (error != null) {
listener.onFailure(error);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
* over task-cancelled ones as they are more useful for diagnosing issues.
*/
public final class FailureCollector {
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();

private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();

private final int maxExceptions;
private volatile boolean hasFailure = false;
private Exception finalFailure = null;

public FailureCollector() {
this(10);
}

public FailureCollector(int maxExceptions) {
if (maxExceptions <= 0) {
throw new IllegalArgumentException("maxExceptions must be at least one");
}
this.maxExceptions = maxExceptions;
}

public void unwrapAndCollect(Exception originEx) {
final Exception e = originEx instanceof TransportException
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
: originEx;
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
cancelledExceptions.add(e);
}
} else {
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
nonCancelledExceptions.add(e);
}
}
hasFailure = true;
}

/**
* @return {@code true} if any failure has been collected, {@code false} otherwise
*/
public boolean hasFailure() {
return hasFailure;
}

/**
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
* Once this method builds the failure, incoming failures are discarded.
*
* @return the accumulated failure, or {@code null} if no failure has been collected
*/
public Exception getFailure() {
if (hasFailure == false) {
return null;
}
synchronized (this) {
if (finalFailure == null) {
finalFailure = buildFailure();
}
return finalFailure;
}
}

private Exception buildFailure() {
assert hasFailure;
assert Thread.holdsLock(this);
int total = 0;
Exception first = null;
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
for (Exception e : exceptions) {
if (first == null) {
first = e;
total++;
} else if (first != e) {
first.addSuppressed(e);
total++;
}
if (total >= maxExceptions) {
return first;
}
}
}
assert first != null;
return first;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
Expand All @@ -37,7 +34,7 @@ public final class ExchangeSourceHandler {

private final PendingInstances outstandingSinks;
private final PendingInstances outstandingSources;
private final AtomicReference<Exception> failure = new AtomicReference<>();
private final FailureCollector failure = new FailureCollector();

public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
this.buffer = new ExchangeBuffer(maxBufferSize);
Expand All @@ -54,7 +51,7 @@ private class ExchangeSourceImpl implements ExchangeSource {
}

private void checkFailure() {
Exception e = failure.get();
Exception e = failure.getFailure();
if (e != null) {
throw ExceptionsHelper.convertToElastic(e);
}
Expand Down Expand Up @@ -172,7 +169,7 @@ void fetchPage() {
while (loopControl.isRunning()) {
loopControl.exiting();
// finish other sinks if one of them failed or source no longer need pages.
boolean toFinishSinks = buffer.noMoreInputs() || failure.get() != null;
boolean toFinishSinks = buffer.noMoreInputs() || failure.hasFailure();
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
Page page = resp.takePage();
if (page != null) {
Expand All @@ -199,26 +196,8 @@ void fetchPage() {
loopControl.exited();
}

void onSinkFailed(Exception originEx) {
final Exception e = originEx instanceof TransportException
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
: originEx;
failure.getAndUpdate(first -> {
if (first == null) {
return e;
}
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
return first;
}
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
return e;
}
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
first.addSuppressed(e);
}
return first;
});
void onSinkFailed(Exception e) {
failure.unwrapAndCollect(e);
buffer.waitForReading().onResponse(null); // resume the Driver if it is being blocked on reading
onSinkComplete();
}
Expand Down
Loading

0 comments on commit d55f984

Please sign in to comment.