From d3bcef296218fc210ee70043299ec96a75da3505 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 30 Apr 2020 11:20:35 +0200 Subject: [PATCH] [7.x][Transform] implement throttling in indexer (#55011) (#56002) implement throttling in async-indexer used by rollup and transform. The added docs_per_second parameter is used to calculate a delay before the next search request is send. With re-throttle its possible to change the parameter at runtime. When stopping a running job, its ensured that despite throttling the indexer stops in reasonable time. This change contains the groundwork, but does not expose the new functionality. relates #54862 backport: #55011 --- .../core/indexing/AsyncTwoPhaseIndexer.java | 225 ++++++++++++++-- .../indexing/AsyncTwoPhaseIndexerTests.java | 244 ++++++++++++++++-- .../xpack/rollup/job/RollupIndexer.java | 22 +- .../xpack/rollup/job/RollupJobTask.java | 2 +- .../job/RollupIndexerIndexingTests.java | 16 +- .../rollup/job/RollupIndexerStateTests.java | 117 ++++----- .../transforms/ClientTransformIndexer.java | 8 +- .../ClientTransformIndexerBuilder.java | 7 +- .../transforms/TransformIndexer.java | 18 +- .../transform/transforms/TransformTask.java | 2 +- .../ClientTransformIndexerTests.java | 6 +- .../transforms/TransformIndexerTests.java | 45 ++-- 12 files changed, 540 insertions(+), 172 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index fef4592d75067..63149267b1657 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -14,9 +14,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -38,15 +42,70 @@ public abstract class AsyncTwoPhaseIndexer { private static final Logger logger = LogManager.getLogger(AsyncTwoPhaseIndexer.class.getName()); + // max time to wait for during throttling + private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1); + // min time to trigger delayed execution, this avoids scheduling tasks with super short amount of time + private static final TimeValue MIN_THROTTLE_WAIT_TIME = TimeValue.timeValueMillis(10); + + private final ActionListener searchResponseListener = ActionListener.wrap( + this::onSearchResponse, + this::finishWithSearchFailure + ); + private final JobStats stats; private final AtomicReference state; private final AtomicReference position; - private final Executor executor; + private final ThreadPool threadPool; + private final String executorName; + + // throttling implementation + private volatile float currentMaxDocsPerSecond; + private volatile long lastSearchStartTimeNanos = 0; + private volatile long lastDocCount = 0; + private volatile ScheduledRunnable scheduledNextSearch; + + /** + * Task wrapper for throttled execution, we need this wrapper in order to cancel and re-issue scheduled searches + */ + class ScheduledRunnable { + private final ThreadPool threadPool; + private final String executorName; + private final Runnable command; + private Scheduler.ScheduledCancellable scheduled; + + ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) { + this.threadPool = threadPool; + this.executorName = executorName; + + // with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the + // future is already running and cancel returns true + this.command = new RunOnce(command); + this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName); + } - protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference initialState, - JobPosition initialPosition, JobStats jobStats) { - this.executor = executor; + public void reschedule(TimeValue delay) { + // note: cancel return true if the runnable is currently executing + if (scheduled.cancel()) { + if (delay.duration() > 0) { + scheduled = threadPool.schedule(() -> command.run(), delay, executorName); + } else { + threadPool.executor(executorName).execute(() -> command.run()); + } + } + } + + } + + protected AsyncTwoPhaseIndexer( + ThreadPool threadPool, + String executorName, + AtomicReference initialState, + JobPosition initialPosition, + JobStats jobStats + ) { + this.threadPool = threadPool; + this.executorName = executorName; this.state = initialState; this.position = new AtomicReference<>(initialPosition); this.stats = jobStats; @@ -96,7 +155,7 @@ public synchronized IndexerState start() { * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ public synchronized IndexerState stop() { - return state.updateAndGet(previousState -> { + IndexerState indexerState = state.updateAndGet(previousState -> { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { @@ -105,6 +164,13 @@ public synchronized IndexerState stop() { return previousState; } }); + + // a throttled search might be waiting to be executed, stop it + if (scheduledNextSearch != null) { + scheduledNextSearch.reschedule(TimeValue.ZERO); + } + + return indexerState; } /** @@ -152,11 +218,11 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) { // fire off the search. Note this is async, the method will return from here - executor.execute(() -> { + threadPool.executor(executorName).execute(() -> { onStart(now, ActionListener.wrap(r -> { assert r != null; if (r) { - nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure)); + nextSearch(); } else { onFinish(ActionListener.wrap( onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}), @@ -178,6 +244,34 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { } } + /** + * Re-schedules the search request if necessary, this method can be called to apply a change + * in maximumRequestsPerSecond immediately + */ + protected void rethrottle() { + // simple check if the setting has changed, ignores the call if it hasn't + if (getMaxDocsPerSecond() == currentMaxDocsPerSecond) { + return; + } + + reQueueThrottledSearch(); + } + + // protected, so it can be overwritten by tests + protected long getTimeNanos() { + return System.nanoTime(); + } + + /** + * Called to get max docs per second. To be overwritten if + * throttling is implemented, the default -1 turns off throttling. + * + * @return a float with max docs per second, -1 if throttling is off + */ + protected float getMaxDocsPerSecond() { + return -1; + } + /** * Called to get the Id of the job, used for logging. * @@ -196,9 +290,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { /** * Called to build the next search request. * + * In case the indexer is throttled waitTimeInNanos can be used as hint for building a less resource hungry + * search request. + * + * @param waitTimeInNanos duration in nanoseconds the indexer has waited due to throttling. * @return SearchRequest to be passed to the search phase. */ - protected abstract SearchRequest buildSearchRequest(); + protected abstract SearchRequest buildSearchRequest(long waitTimeInNanos); /** * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the @@ -349,10 +447,15 @@ private void onSearchResponse(SearchResponse searchResponse) { assert (searchResponse.getShardFailures().length == 0); stats.markStartProcessing(); stats.incrementNumPages(1); + + long numDocumentsBefore = stats.getNumDocuments(); IterationResult iterationResult = doProcess(searchResponse); + // record the number of documents returned to base throttling on the output + lastDocCount = stats.getNumDocuments() - numDocumentsBefore; + if (iterationResult.isDone()) { - logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); + logger.debug("Finished indexing for job [{}], saving state and shutting down.", getJobId()); position.set(iterationResult.getPosition()); stats.markEndProcessing(); @@ -375,7 +478,7 @@ private void onSearchResponse(SearchResponse searchResponse) { // TODO we should check items in the response and move after accordingly to // resume the failing buckets ? if (bulkResponse.hasFailures()) { - logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage()); + logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage()); } stats.incrementNumOutputDocuments(bulkResponse.getItems().length); @@ -396,8 +499,7 @@ private void onSearchResponse(SearchResponse searchResponse) { JobPosition newPosition = iterationResult.getPosition(); position.set(newPosition); - ActionListener listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); - nextSearch(listener); + nextSearch(); } catch (Exception e) { finishWithFailure(e); } @@ -409,26 +511,74 @@ private void onSearchResponse(SearchResponse searchResponse) { private void onBulkResponse(BulkResponse response, JobPosition position) { stats.markEndIndexing(); + + // check if we should stop + if (checkState(getState()) == false) { + return; + } + try { - ActionListener listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); // TODO probably something more intelligent than every-50 is needed if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) { doSaveState(IndexerState.INDEXING, position, () -> { - nextSearch(listener); + nextSearch(); }); } else { - nextSearch(listener); + nextSearch(); } } catch (Exception e) { finishWithIndexingFailure(e); } } - private void nextSearch(ActionListener listener) { + protected void nextSearch() { + currentMaxDocsPerSecond = getMaxDocsPerSecond(); + if (currentMaxDocsPerSecond > 0 && lastDocCount > 0) { + TimeValue executionDelay = calculateThrottlingDelay( + currentMaxDocsPerSecond, + lastDocCount, + lastSearchStartTimeNanos, + getTimeNanos() + ); + + if (executionDelay.duration() > 0) { + logger.debug( + "throttling job [{}], wait for {} ({} {})", + getJobId(), + executionDelay, + currentMaxDocsPerSecond, + lastDocCount + ); + scheduledNextSearch = new ScheduledRunnable( + threadPool, + executorName, + executionDelay, + () -> triggerNextSearch(executionDelay.getNanos()) + ); + + // corner case: if for whatever reason stop() has been called meanwhile fast forward + if (getState().equals(IndexerState.STOPPING)) { + scheduledNextSearch.reschedule(TimeValue.ZERO); + } + return; + } + } + + triggerNextSearch(0L); + } + + private void triggerNextSearch(long waitTimeInNanos) { + if (checkState(getState()) == false) { + return; + } + stats.markStartSearch(); + lastSearchStartTimeNanos = getTimeNanos(); + // ensure that partial results are not accepted and cause a search failure - SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false); - doNextSearch(searchRequest, listener); + SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false); + + doNextSearch(searchRequest, searchResponseListener); } /** @@ -461,4 +611,41 @@ private boolean checkState(IndexerState currentState) { } } + private synchronized void reQueueThrottledSearch() { + currentMaxDocsPerSecond = getMaxDocsPerSecond(); + + if (scheduledNextSearch != null) { + TimeValue executionDelay = calculateThrottlingDelay( + currentMaxDocsPerSecond, + lastDocCount, + lastSearchStartTimeNanos, + getTimeNanos() + ); + + logger.trace( + "[{}] rethrottling job, wait {} until next search", + getJobId(), + executionDelay + ); + scheduledNextSearch.reschedule(executionDelay); + } + } + + static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, long startTimeNanos, long now) { + if (docsPerSecond <= 0) { + return TimeValue.ZERO; + } + float timeToWaitNanos = (docCount / docsPerSecond) * TimeUnit.SECONDS.toNanos(1); + + // from timeToWaitNanos - (now - startTimeNanos) + TimeValue executionDelay = TimeValue.timeValueNanos( + Math.min(MAX_THROTTLE_WAIT_TIME.getNanos(), Math.max(0, (long) timeToWaitNanos + startTimeNanos - now)) + ); + + if (executionDelay.compareTo(MIN_THROTTLE_WAIT_TIME) < 0) { + return TimeValue.ZERO; + } + return executionDelay; + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 487bf5a7a434e..2c18a7d44faad 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -16,18 +16,23 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -52,9 +57,9 @@ private class MockIndexer extends AsyncTwoPhaseIndexer { private volatile int step; private final boolean stoppedBeforeFinished; - protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, - CountDownLatch latch, boolean stoppedBeforeFinished) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexer(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); this.latch = latch; this.stoppedBeforeFinished = stoppedBeforeFinished; } @@ -81,7 +86,7 @@ private void awaitForLatch() { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { assertThat(step, equalTo(1)); ++step; return new SearchRequest(); @@ -114,8 +119,10 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - int expectedStep = stoppedBeforeFinished ? 3 : 5; - assertThat(step, equalTo(expectedStep)); + // for stop before finished we do not know if its stopped before are after the search + if (stoppedBeforeFinished == false) { + assertThat(step, equalTo(5)); + } ++step; next.run(); } @@ -150,15 +157,29 @@ public int getStep() { private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer { + private final long startTime; + private final CountDownLatch latch; + private volatile float maxDocsPerSecond; + // counters private volatile boolean started = false; + private volatile boolean waitingForLatch = false; private volatile int searchRequests = 0; private volatile int searchOps = 0; private volatile int processOps = 0; private volatile int bulkOps = 0; - protected MockIndexerFiveRuns(Executor executor, AtomicReference initialState, Integer initialPosition) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexerFiveRuns(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition, float maxDocsPerSecond, CountDownLatch latch) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); + startTime = System.nanoTime(); + this.latch = latch; + this.maxDocsPerSecond = maxDocsPerSecond; + } + + public void rethrottle(float maxDocsPerSecond) { + this.maxDocsPerSecond = maxDocsPerSecond; + rethrottle(); } @Override @@ -166,8 +187,16 @@ protected String getJobId() { return "mock_5_runs"; } + @Override + protected float getMaxDocsPerSecond() { + return maxDocsPerSecond; + } + @Override protected IterationResult doProcess(SearchResponse searchResponse) { + // increment doc count for throttling + getStats().incrementNumDocuments(1000); + ++processOps; if (processOps == 5) { return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true); @@ -180,7 +209,7 @@ else if (processOps % 2 == 0) { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { ++searchRequests; return new SearchRequest(); } @@ -191,6 +220,23 @@ protected void onStart(long now, ActionListener listener) { listener.onResponse(true); } + private void awaitForLatch() { + if (latch == null) { + return; + } + try { + waitingForLatch = true; + latch.await(10, TimeUnit.SECONDS); + waitingForLatch = false; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public boolean waitingForLatchCountDown() { + return waitingForLatch; + } + @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { ++searchOps; @@ -198,6 +244,10 @@ protected void doNextSearch(SearchRequest request, ActionListener initialState, Integer initialPosition) { - super(executor, initialState, initialPosition, new MockJobStats()); + protected MockIndexerThrowsFirstSearch(ThreadPool threadPool, String executorName, AtomicReference initialState, + Integer initialPosition) { + super(threadPool, executorName, initialState, initialPosition, new MockJobStats()); } @Override @@ -263,7 +319,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { assertThat(step, equalTo(1)); ++step; return new SearchRequest(); @@ -321,12 +377,32 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } + private class MockThreadPool extends TestThreadPool { + + private List delays = new ArrayList<>(); + + MockThreadPool(String name, ExecutorBuilder... customBuilders) { + super(name, Settings.EMPTY, customBuilders); + } + + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { + delays.add(delay); + + return super.schedule(command, TimeValue.ZERO, executor); + } + + public void assertCountersAndDelay(Collection expectedDelays) { + assertThat(delays, equalTo(expectedDelays)); + } + } + public void testStateMachine() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, false); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -344,33 +420,32 @@ public void testStateMachine() throws Exception { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStateMachineBrokenSearch() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); + MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(threadPool, ThreadPool.Names.GENERIC, state, 2); indexer.start(); - assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS); assertThat(indexer.getStep(), equalTo(3)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStop_WhileIndexing() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true); + MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, true); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -382,22 +457,135 @@ public void testStop_WhileIndexing() throws Exception { assertBusy(() -> assertTrue(isStopped.get())); assertFalse(isFinished.get()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testFiveRuns() throws Exception { + doTestFiveRuns(-1, Collections.emptyList()); + } + + public void testFiveRunsThrottled100() throws Exception { + // expect throttling to kick in + doTestFiveRuns(100, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 9950L)); + } + + public void testFiveRunsThrottled1000() throws Exception { + // expect throttling to kick in + doTestFiveRuns(1_000, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 950L)); + } + + public void testFiveRunsThrottled18000() throws Exception { + // expect throttling to not kick in due to min wait time + doTestFiveRuns(18_000, Collections.emptyList()); + } + + public void testFiveRunsThrottled1000000() throws Exception { + // docs per seconds is set high, so throttling does not kick in + doTestFiveRuns(1_000_000, Collections.emptyList()); + } + + public void doTestFiveRuns(float docsPerSecond, Collection expectedDelays) throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final MockThreadPool threadPool = new MockThreadPool(getTestName()); try { - MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (executor, state, 2); + MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond, + null); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertBusy(() -> assertTrue(isFinished.get())); indexer.assertCounters(); + threadPool.assertCountersAndDelay(expectedDelays); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } + + public void testFiveRunsRethrottle0_100() throws Exception { + doTestFiveRunsRethrottle(-1, 100, timeValueCollectionFromMilliseconds(9950L)); + } + + public void testFiveRunsRethrottle100_0() throws Exception { + doTestFiveRunsRethrottle(100, 0, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L)); + } + + public void testFiveRunsRethrottle100_1000() throws Exception { + doTestFiveRunsRethrottle(100, 1000, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 950L)); + } + + public void testFiveRunsRethrottle1000_100() throws Exception { + doTestFiveRunsRethrottle(1000, 100, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 9950L)); + } + + public void doTestFiveRunsRethrottle( + float docsPerSecond, + float docsPerSecondRethrottle, + Collection expectedDelays + ) throws Exception { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + + final MockThreadPool threadPool = new MockThreadPool(getTestName()); + try { + CountDownLatch latch = new CountDownLatch(1); + MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond, + latch); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + // wait until the indexer starts waiting on the latch + assertBusy(() -> assertTrue(indexer.waitingForLatchCountDown())); + // rethrottle + indexer.rethrottle(docsPerSecondRethrottle); + latch.countDown(); + // let it finish + assertBusy(() -> assertTrue(isFinished.get())); + indexer.assertCounters(); + threadPool.assertCountersAndDelay(expectedDelays); + } finally { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + } + + public void testCalculateThrottlingDelay() { + // negative docs per second, throttling turned off + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(-100, 100, 1_000, 1_000), equalTo(TimeValue.ZERO)); + + // negative docs per second, throttling turned off + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(0, 100, 1_000, 1_000), equalTo(TimeValue.ZERO)); + + // 100 docs/s with 100 docs -> 1s delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000, 1_000_000), equalTo(TimeValue.timeValueSeconds(1))); + + // 100 docs/s with 100 docs, 200ms passed -> 800ms delay + assertThat( + AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_200_000_000L), + equalTo(TimeValue.timeValueMillis(800)) + ); + + // 100 docs/s with 100 docs done, time passed -> no delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 5_000_000_000L), equalTo(TimeValue.ZERO)); + + // 1_000_000 docs/s with 1 doc done, time passed -> no delay + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(1_000_000, 1, 1_000_000_000L, 1_000_000_000L), equalTo(TimeValue.ZERO)); + + // max: 1 docs/s with 1_000_000 docs done, time passed -> no delay + assertThat( + AsyncTwoPhaseIndexer.calculateThrottlingDelay(1, 1_000_000, 1_000_000_000L, 1_000_000_000L), + equalTo(TimeValue.timeValueHours(1)) + ); + + // min: 100 docs/s with 100 docs, 995ms passed -> no delay, because minimum not reached + assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_995_000_000L), equalTo(TimeValue.ZERO)); + + } + + private static Collection timeValueCollectionFromMilliseconds(Long... milliseconds) { + List timeValues = new ArrayList<>(); + for (Long m: milliseconds) { + timeValues.add(TimeValue.timeValueMillis(m)); + } + + return timeValues; + } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index f9c063f32664f..57281c75cc225 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -48,7 +49,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -67,29 +67,31 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer initialState, Map initialPosition, - AtomicBoolean upgradedDocumentID) { - this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats()); + RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, + Map initialPosition, AtomicBoolean upgradedDocumentID) { + this(threadPool, executorName, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats()); } /** * Ctr - * @param executor Executor to use to fire the first request of a background job. + * @param threadPool ThreadPool to use to fire the first request of a background job. + * @param executorName Name of the executor to use to fire the first request of a background job. * @param job The rollup job * @param initialState Initial state for the indexer * @param initialPosition The last indexed bucket of the task * @param upgradedDocumentID whether job has updated IDs (for BWC) * @param jobStats jobstats instance for collecting stats */ - RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, - AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) { - super(executor, initialState, initialPosition, jobStats); + RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, + Map initialPosition, AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) { + super(threadPool, executorName, initialState, initialPosition, jobStats); this.job = job; this.compositeBuilder = createCompositeBuilder(job.getConfig()); this.upgradedDocumentID = upgradedDocumentID; @@ -123,7 +125,7 @@ protected void onStart(long now, ActionListener listener) { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { // Indexer is single-threaded, and only place that the ID scheme can get upgraded is doSaveState(), so // we can pass down the boolean value rather than the atomic here final Map position = getPosition(); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 21fdb2f4998be..599e8ea9ee063 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -103,7 +103,7 @@ protected class ClientRollupPageManager extends RollupIndexer { ClientRollupPageManager(RollupJob job, IndexerState initialState, Map initialPosition, Client client, AtomicBoolean upgradedDocumentID) { - super(threadPool.executor(ThreadPool.Names.GENERIC), job, new AtomicReference<>(initialState), + super(threadPool, ThreadPool.Names.GENERIC, job, new AtomicReference<>(initialState), initialPosition, upgradedDocumentID); this.client = client; this.job = job; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 16e5a3eeffad1..8ab5b68b7ae7c 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -47,6 +47,8 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; @@ -71,9 +73,6 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -524,14 +523,15 @@ private void executeTestCase(List> docs, RollupJobConfig con IndexReader reader = DirectoryReader.open(dir); IndexSearcher searcher = new IndexSearcher(reader); String dateHistoField = config.getGroupConfig().getDateHistogram().getField(); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); + try { RollupJob job = new RollupJob(config, Collections.emptyMap()); - final SyncRollupIndexer action = new SyncRollupIndexer(executor, job, searcher, + final SyncRollupIndexer action = new SyncRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, searcher, fieldTypeLookup.values().toArray(new MappedFieldType[0]), fieldTypeLookup.get(dateHistoField)); rollupConsumer.accept(action.triggerAndWaitForCompletion(now)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); reader.close(); dir.close(); } @@ -627,9 +627,9 @@ class SyncRollupIndexer extends RollupIndexer { private final CountDownLatch latch = new CountDownLatch(1); private Exception exc; - SyncRollupIndexer(Executor executor, RollupJob job, IndexSearcher searcher, + SyncRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, IndexSearcher searcher, MappedFieldType[] fieldTypes, MappedFieldType timestampField) { - super(executor, job, new AtomicReference<>(IndexerState.STARTED), null, new AtomicBoolean(newIDScheme)); + super(threadPool, executorName, job, new AtomicReference<>(IndexerState.STARTED), null, new AtomicBoolean(newIDScheme)); this.searcher = searcher; this.fieldTypes = fieldTypes; this.timestampField = timestampField; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 30ee1db25d56d..51b9bb4e315be 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; @@ -35,9 +37,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -52,19 +52,19 @@ public class RollupIndexerStateTests extends ESTestCase { private static class EmptyRollupIndexer extends RollupIndexer { - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded, RollupIndexerJobStats stats) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats); } - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded)); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(upgraded)); } - EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition) { - this(executor, job, initialState, initialPosition, randomBoolean()); + this(threadPool, executorName, job, initialState, initialPosition, randomBoolean()); } @@ -140,19 +140,19 @@ protected void onFinish(ActionListener listener) { private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer { protected CountDownLatch latch; - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, boolean upgraded) { - super(executor, job, initialState, initialPosition, upgraded); + super(threadPool, executorName, job, initialState, initialPosition, upgraded); } - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition) { - super(executor, job, initialState, initialPosition, randomBoolean()); + super(threadPool, executorName, job, initialState, initialPosition, randomBoolean()); } - DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, RollupIndexerJobStats stats) { - super(executor, job, initialState, initialPosition, randomBoolean(), stats); + super(threadPool, executorName, job, initialState, initialPosition, randomBoolean(), stats); } private CountDownLatch newLatch() { @@ -178,17 +178,17 @@ private static class NonEmptyRollupIndexer extends RollupIndexer { final BiConsumer> saveStateCheck; private CountDownLatch latch; - NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { - this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); + this(threadPool, executorName, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); } - NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer, BiConsumer> saveStateCheck) { - super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); + super(threadPool, executorName, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; @@ -253,9 +253,9 @@ protected void onFinish(ActionListener listener) { public void testStarted() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - RollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null, true); + RollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, true); assertTrue(indexer.isUpgradedDocumentID()); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); @@ -269,17 +269,17 @@ public void testStarted() throws Exception { assertThat(indexer.getStats().getIndexTotal(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { AtomicBoolean isFinished = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onFinish(ActionListener listener) { super.onFinish(ActionListener.wrap(r -> { @@ -312,7 +312,7 @@ protected void doSaveState(IndexerState state, Map position, Run assertThat(indexer.getStats().getSearchTotal(), equalTo(1L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -334,10 +334,11 @@ public void testStateChangeMidTrigger() { doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L); RollupJob job = new RollupJob(config, Collections.emptyMap()); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { AtomicBoolean isFinished = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) { + DelayedEmptyRollupIndexer indexer = + new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, spyStats) { @Override protected void onFinish(ActionListener listener) { super.onFinish(ActionListener.wrap(r -> { @@ -357,7 +358,7 @@ protected void onFinish(ActionListener listener) { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -365,10 +366,10 @@ public void testAbortDuringSearch() throws Exception { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); final CountDownLatch latch = new CountDownLatch(1); try { - EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) { + EmptyRollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onFinish(ActionListener listener) { fail("Should not have called onFinish"); @@ -402,7 +403,7 @@ protected void onAbort() { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertThat(indexer.getStats().getSearchFailures(), equalTo(0L)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -410,13 +411,13 @@ public void testAbortAfterCompletion() throws Exception { final AtomicBoolean aborted = new AtomicBoolean(false); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); // Don't use the indexer's latch because we completely change doNextSearch() final CountDownLatch doNextSearchLatch = new CountDownLatch(1); try { - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { aborted.set(true); @@ -489,16 +490,16 @@ protected void doSaveState(IndexerState state, Map position, Run assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L)); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testStopIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null); + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null); final CountDownLatch latch = indexer.newLatch(); assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); @@ -511,17 +512,17 @@ public void testStopIndexing() throws Exception { assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED))); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testAbortIndexing() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -538,17 +539,17 @@ protected void onAbort() { assertBusy(() -> assertTrue(isAborted.get())); assertFalse(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testAbortStarted() { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -564,17 +565,17 @@ protected void onAbort() { assertThat(indexer.getStats().getNumPages(), equalTo(0L)); assertFalse(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } public void testMultipleJobTriggering() throws Exception { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { final AtomicBoolean isAborted = new AtomicBoolean(false); - DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) { + DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) { @Override protected void onAbort() { isAborted.set(true); @@ -601,7 +602,7 @@ protected void onAbort() { assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED))); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -688,10 +689,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -713,7 +714,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -800,10 +801,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws isFinished.set(true); }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, doSaveStateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -824,7 +825,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -848,10 +849,10 @@ public void testSearchShardFailure() throws Exception { } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -873,7 +874,7 @@ public void testSearchShardFailure() throws Exception { assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } @@ -961,10 +962,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } }; - final ExecutorService executor = Executors.newFixedThreadPool(1); + final ThreadPool threadPool = new TestThreadPool(getTestName()); try { - NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, + NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, searchFunction, bulkFunction, failureConsumer, stateCheck) { @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { @@ -991,7 +992,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { - executor.shutdownNow(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 6e247689ad5f9..35651e49ae271 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,7 +43,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -56,7 +56,8 @@ class ClientTransformIndexer extends TransformIndexer { private final AtomicReference seqNoPrimaryTermAndIndex; ClientTransformIndexer( - Executor executor, + ThreadPool threadPool, + String executorName, TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, TransformProgressGatherer progressGatherer, @@ -75,7 +76,8 @@ class ClientTransformIndexer extends TransformIndexer { boolean shouldStopAtCheckpoint ) { super( - ExceptionsHelper.requireNonNull(executor, "executor"), + ExceptionsHelper.requireNonNull(threadPool, "threadPool"), + executorName, transformsConfigManager, checkpointProvider, progressGatherer, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index d7c699b587038..c3739dfd7619a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.transforms; import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -20,7 +21,6 @@ import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; class ClientTransformIndexerBuilder { @@ -43,11 +43,12 @@ class ClientTransformIndexerBuilder { this.initialStats = new TransformIndexerStats(); } - ClientTransformIndexer build(Executor executor, TransformContext context) { + ClientTransformIndexer build(ThreadPool threadPool, String executorName, TransformContext context) { CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( - executor, + threadPool, + executorName, transformsConfigManager, checkpointProvider, new TransformProgressGatherer(parentTaskClient), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 80b9c9ca1ddba..e46626aa8b564 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -50,7 +51,6 @@ import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -114,7 +114,8 @@ private enum RunState { private volatile long lastCheckpointCleanup = 0L; public TransformIndexer( - Executor executor, + ThreadPool threadPool, + String executorName, TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, TransformProgressGatherer progressGatherer, @@ -129,7 +130,7 @@ public TransformIndexer( TransformCheckpoint nextCheckpoint, TransformContext context ) { - super(executor, initialState, initialPosition, jobStats); + super(threadPool, executorName, initialState, initialPosition, jobStats); this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); this.progressGatherer = ExceptionsHelper.requireNonNull(progressGatherer, "progressGatherer"); @@ -524,7 +525,7 @@ synchronized void handleFailure(Exception e) { * @param listener listener to call after done */ private void cleanupOldCheckpoints(ActionListener listener) { - long now = getTime(); + long now = getTimeNanos() * 1000; long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP; long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS; @@ -704,7 +705,7 @@ protected QueryBuilder buildFilterQuery() { } @Override - protected SearchRequest buildSearchRequest() { + protected SearchRequest buildSearchRequest(long waitTimeInNanos) { assert nextCheckpoint != null; SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false) @@ -846,13 +847,6 @@ protected void failIndexer(String failureMessage) { context.markAsFailed(failureMessage); } - /* - * Get the current time, abstracted for the purpose of testing - */ - long getTime() { - return System.currentTimeMillis(); - } - /** * Indicates if an audit message should be written when onFinish is called for the given checkpoint * We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99 diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 2381d7e05a440..371aea41fe841 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -538,7 +538,7 @@ private SchedulerEngine.Schedule next() { } synchronized void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) { - indexer.set(indexerBuilder.build(getThreadPool().executor(ThreadPool.Names.GENERIC), context)); + indexer.set(indexerBuilder.build(getThreadPool(), ThreadPool.Names.GENERIC, context)); } ThreadPool getThreadPool() { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index f9ac6319721bd..22ef2fc68e208 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -16,13 +16,12 @@ import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; -import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; +import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -38,7 +37,8 @@ public void testAudiOnFinishFrequency() { when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); ClientTransformIndexer indexer = new ClientTransformIndexer( - mock(Executor.class), + mock(ThreadPool.class), + ThreadPool.Names.GENERIC, mock(IndexBasedTransformConfigManager.class), mock(CheckpointProvider.class), new TransformProgressGatherer(mock(Client.class)), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index e6ce52fa5b9ca..00534d9e34a1b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -49,9 +51,6 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -77,6 +76,7 @@ public class TransformIndexerTests extends ESTestCase { private Client client; + private ThreadPool threadPool; class MockedTransformIndexer extends TransformIndexer { @@ -88,7 +88,8 @@ class MockedTransformIndexer extends TransformIndexer { private CountDownLatch latch; MockedTransformIndexer( - Executor executor, + ThreadPool threadPool, + String executorName, IndexBasedTransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, TransformProgressGatherer progressGatherer, @@ -104,7 +105,8 @@ class MockedTransformIndexer extends TransformIndexer { Consumer failureConsumer ) { super( - executor, + threadPool, + executorName, transformsConfigManager, checkpointProvider, progressGatherer, @@ -216,11 +218,13 @@ protected void failIndexer(String message) { @Before public void setUpMocks() { client = new NoOpClient(getTestName()); + threadPool = new TestThreadPool(getTestName()); } @After public void tearDownClient() { client.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } public void testPageSizeAdapt() throws Exception { @@ -248,8 +252,6 @@ public void testPageSizeAdapt() throws Exception { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { TransformAuditor auditor = new TransformAuditor(client, "node_1"); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); @@ -259,7 +261,8 @@ public void testPageSizeAdapt() throws Exception { searchFunction, bulkFunction, null, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -289,10 +292,6 @@ public void testPageSizeAdapt() throws Exception { // assert that page size has been reduced again assertThat(pageSizeAfterFirstReduction, greaterThan((long) indexer.getPageSize())); assertThat(pageSizeAfterFirstReduction, greaterThan((long) TransformIndexer.MINIMUM_PAGE_SIZE)); - - } finally { - executor.shutdownNow(); - } } public void testDoProcessAggNullCheck() { @@ -330,8 +329,6 @@ public void testDoProcessAggNullCheck() { Function searchFunction = searchRequest -> searchResponse; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { TransformAuditor auditor = mock(TransformAuditor.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); @@ -341,7 +338,8 @@ public void testDoProcessAggNullCheck() { searchFunction, bulkFunction, null, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -351,9 +349,6 @@ public void testDoProcessAggNullCheck() { assertThat(newPosition.getPosition(), is(nullValue())); assertThat(newPosition.isDone(), is(true)); verify(auditor, times(1)).info(anyString(), anyString()); - } finally { - executor.shutdownNow(); - } } public void testScriptError() throws Exception { @@ -397,8 +392,6 @@ public void testScriptError() throws Exception { failureMessage.compareAndSet(null, message); }; - final ExecutorService executor = Executors.newFixedThreadPool(1); - try { MockTransformAuditor auditor = new MockTransformAuditor(); TransformContext.Listener contextListener = mock(TransformContext.Listener.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); @@ -409,7 +402,8 @@ public void testScriptError() throws Exception { searchFunction, bulkFunction, failureConsumer, - executor, + threadPool, + ThreadPool.Names.GENERIC, auditor, context ); @@ -433,9 +427,6 @@ public void testScriptError() throws Exception { failureMessage.get(), matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]") ); - } finally { - executor.shutdownNow(); - } } private MockedTransformIndexer createMockIndexer( @@ -444,12 +435,14 @@ private MockedTransformIndexer createMockIndexer( Function searchFunction, Function bulkFunction, Consumer failureConsumer, - final ExecutorService executor, + ThreadPool threadPool, + String executorName, TransformAuditor auditor, TransformContext context ) { return new MockedTransformIndexer( - executor, + threadPool, + executorName, mock(IndexBasedTransformConfigManager.class), mock(CheckpointProvider.class), new TransformProgressGatherer(client),