Skip to content

Commit

Permalink
[7.x][Transform] implement throttling in indexer (#55011) (#56002)
Browse files Browse the repository at this point in the history
implement throttling in async-indexer used by rollup and transform. The added
docs_per_second parameter is used to calculate a delay before the next
search request is send. With re-throttle its possible to change the parameter
at runtime. When stopping a running job, its ensured that despite throttling
the indexer stops in reasonable time. This change contains the groundwork, but
does not expose the new functionality.

relates #54862
backport: #55011
  • Loading branch information
Hendrik Muhs authored Apr 30, 2020
1 parent 3c7c957 commit d3bcef2
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -38,15 +42,70 @@
public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats> {
private static final Logger logger = LogManager.getLogger(AsyncTwoPhaseIndexer.class.getName());

// max time to wait for during throttling
private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1);
// min time to trigger delayed execution, this avoids scheduling tasks with super short amount of time
private static final TimeValue MIN_THROTTLE_WAIT_TIME = TimeValue.timeValueMillis(10);

private final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(
this::onSearchResponse,
this::finishWithSearchFailure
);

private final JobStats stats;

private final AtomicReference<IndexerState> state;
private final AtomicReference<JobPosition> position;
private final Executor executor;
private final ThreadPool threadPool;
private final String executorName;

// throttling implementation
private volatile float currentMaxDocsPerSecond;
private volatile long lastSearchStartTimeNanos = 0;
private volatile long lastDocCount = 0;
private volatile ScheduledRunnable scheduledNextSearch;

/**
* Task wrapper for throttled execution, we need this wrapper in order to cancel and re-issue scheduled searches
*/
class ScheduledRunnable {
private final ThreadPool threadPool;
private final String executorName;
private final Runnable command;
private Scheduler.ScheduledCancellable scheduled;

ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) {
this.threadPool = threadPool;
this.executorName = executorName;

// with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the
// future is already running and cancel returns true
this.command = new RunOnce(command);
this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName);
}

protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference<IndexerState> initialState,
JobPosition initialPosition, JobStats jobStats) {
this.executor = executor;
public void reschedule(TimeValue delay) {
// note: cancel return true if the runnable is currently executing
if (scheduled.cancel()) {
if (delay.duration() > 0) {
scheduled = threadPool.schedule(() -> command.run(), delay, executorName);
} else {
threadPool.executor(executorName).execute(() -> command.run());
}
}
}

}

protected AsyncTwoPhaseIndexer(
ThreadPool threadPool,
String executorName,
AtomicReference<IndexerState> initialState,
JobPosition initialPosition,
JobStats jobStats
) {
this.threadPool = threadPool;
this.executorName = executorName;
this.state = initialState;
this.position = new AtomicReference<>(initialPosition);
this.stats = jobStats;
Expand Down Expand Up @@ -96,7 +155,7 @@ public synchronized IndexerState start() {
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
return state.updateAndGet(previousState -> {
IndexerState indexerState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
Expand All @@ -105,6 +164,13 @@ public synchronized IndexerState stop() {
return previousState;
}
});

// a throttled search might be waiting to be executed, stop it
if (scheduledNextSearch != null) {
scheduledNextSearch.reschedule(TimeValue.ZERO);
}

return indexerState;
}

/**
Expand Down Expand Up @@ -152,11 +218,11 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {

if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
threadPool.executor(executorName).execute(() -> {
onStart(now, ActionListener.wrap(r -> {
assert r != null;
if (r) {
nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
nextSearch();
} else {
onFinish(ActionListener.wrap(
onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}),
Expand All @@ -178,6 +244,34 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
}
}

/**
* Re-schedules the search request if necessary, this method can be called to apply a change
* in maximumRequestsPerSecond immediately
*/
protected void rethrottle() {
// simple check if the setting has changed, ignores the call if it hasn't
if (getMaxDocsPerSecond() == currentMaxDocsPerSecond) {
return;
}

reQueueThrottledSearch();
}

// protected, so it can be overwritten by tests
protected long getTimeNanos() {
return System.nanoTime();
}

/**
* Called to get max docs per second. To be overwritten if
* throttling is implemented, the default -1 turns off throttling.
*
* @return a float with max docs per second, -1 if throttling is off
*/
protected float getMaxDocsPerSecond() {
return -1;
}

/**
* Called to get the Id of the job, used for logging.
*
Expand All @@ -196,9 +290,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
/**
* Called to build the next search request.
*
* In case the indexer is throttled waitTimeInNanos can be used as hint for building a less resource hungry
* search request.
*
* @param waitTimeInNanos duration in nanoseconds the indexer has waited due to throttling.
* @return SearchRequest to be passed to the search phase.
*/
protected abstract SearchRequest buildSearchRequest();
protected abstract SearchRequest buildSearchRequest(long waitTimeInNanos);

/**
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
Expand Down Expand Up @@ -349,10 +447,15 @@ private void onSearchResponse(SearchResponse searchResponse) {
assert (searchResponse.getShardFailures().length == 0);
stats.markStartProcessing();
stats.incrementNumPages(1);

long numDocumentsBefore = stats.getNumDocuments();
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);

// record the number of documents returned to base throttling on the output
lastDocCount = stats.getNumDocuments() - numDocumentsBefore;

if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
logger.debug("Finished indexing for job [{}], saving state and shutting down.", getJobId());

position.set(iterationResult.getPosition());
stats.markEndProcessing();
Expand All @@ -375,7 +478,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
if (bulkResponse.hasFailures()) {
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);

Expand All @@ -396,8 +499,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);

ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
nextSearch(listener);
nextSearch();
} catch (Exception e) {
finishWithFailure(e);
}
Expand All @@ -409,26 +511,74 @@ private void onSearchResponse(SearchResponse searchResponse) {

private void onBulkResponse(BulkResponse response, JobPosition position) {
stats.markEndIndexing();

// check if we should stop
if (checkState(getState()) == false) {
return;
}

try {
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> {
nextSearch(listener);
nextSearch();
});
} else {
nextSearch(listener);
nextSearch();
}
} catch (Exception e) {
finishWithIndexingFailure(e);
}
}

private void nextSearch(ActionListener<SearchResponse> listener) {
protected void nextSearch() {
currentMaxDocsPerSecond = getMaxDocsPerSecond();
if (currentMaxDocsPerSecond > 0 && lastDocCount > 0) {
TimeValue executionDelay = calculateThrottlingDelay(
currentMaxDocsPerSecond,
lastDocCount,
lastSearchStartTimeNanos,
getTimeNanos()
);

if (executionDelay.duration() > 0) {
logger.debug(
"throttling job [{}], wait for {} ({} {})",
getJobId(),
executionDelay,
currentMaxDocsPerSecond,
lastDocCount
);
scheduledNextSearch = new ScheduledRunnable(
threadPool,
executorName,
executionDelay,
() -> triggerNextSearch(executionDelay.getNanos())
);

// corner case: if for whatever reason stop() has been called meanwhile fast forward
if (getState().equals(IndexerState.STOPPING)) {
scheduledNextSearch.reschedule(TimeValue.ZERO);
}
return;
}
}

triggerNextSearch(0L);
}

private void triggerNextSearch(long waitTimeInNanos) {
if (checkState(getState()) == false) {
return;
}

stats.markStartSearch();
lastSearchStartTimeNanos = getTimeNanos();

// ensure that partial results are not accepted and cause a search failure
SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false);
doNextSearch(searchRequest, listener);
SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false);

doNextSearch(searchRequest, searchResponseListener);
}

/**
Expand Down Expand Up @@ -461,4 +611,41 @@ private boolean checkState(IndexerState currentState) {
}
}

private synchronized void reQueueThrottledSearch() {
currentMaxDocsPerSecond = getMaxDocsPerSecond();

if (scheduledNextSearch != null) {
TimeValue executionDelay = calculateThrottlingDelay(
currentMaxDocsPerSecond,
lastDocCount,
lastSearchStartTimeNanos,
getTimeNanos()
);

logger.trace(
"[{}] rethrottling job, wait {} until next search",
getJobId(),
executionDelay
);
scheduledNextSearch.reschedule(executionDelay);
}
}

static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, long startTimeNanos, long now) {
if (docsPerSecond <= 0) {
return TimeValue.ZERO;
}
float timeToWaitNanos = (docCount / docsPerSecond) * TimeUnit.SECONDS.toNanos(1);

// from timeToWaitNanos - (now - startTimeNanos)
TimeValue executionDelay = TimeValue.timeValueNanos(
Math.min(MAX_THROTTLE_WAIT_TIME.getNanos(), Math.max(0, (long) timeToWaitNanos + startTimeNanos - now))
);

if (executionDelay.compareTo(MIN_THROTTLE_WAIT_TIME) < 0) {
return TimeValue.ZERO;
}
return executionDelay;
}

}
Loading

0 comments on commit d3bcef2

Please sign in to comment.