diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index b08a127b4a806..07bb92815484a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -585,15 +585,24 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc " not be allowed in the next major version by default. You can change the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting to use a greater default value or lower the number of" + " scrolls that you need to run in parallel."); - } else if (openScrollContexts.get() >= maxOpenScrollContext) { + } + if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { + openScrollContexts.decrementAndGet(); throw new ElasticsearchException( "Trying to create too many scroll contexts. Must be less than or equal to: [" + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); } } - - SearchContext context = createContext(request); + SearchContext context = null; + try { + context = createContext(request); + context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT); + } finally { + if (context == null) { + openScrollContexts.decrementAndGet(); + } + } onNewContext(context); boolean success = false; try { @@ -611,7 +620,6 @@ private void onNewContext(SearchContext context) { boolean success = false; try { if (context.scrollContext() != null) { - openScrollContexts.incrementAndGet(); context.indexShard().getSearchOperationListener().onNewScrollContext(context); } context.indexShard().getSearchOperationListener().onNewContext(context); @@ -724,7 +732,6 @@ private void onFreeContext(SearchContext context) { assert activeContexts.containsKey(context.id()) == false; context.indexShard().getSearchOperationListener().onFreeContext(context); if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); context.indexShard().getSearchOperationListener().onFreeScrollContext(context); } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index b8052fc621868..a904c67a5c4be 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -489,6 +489,52 @@ public void testMaxOpenScrollContexts() throws RuntimeException, IOException { } } + public void testOpenScrollContextsConcurrently() throws Exception { + final int maxScrollContexts = randomIntBetween(50, 200); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SearchService.MAX_OPEN_SCROLL_CONTEXT.getKey(), maxScrollContexts))); + try { + createIndex("index"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexShard indexShard = indicesService.indexServiceSafe(resolveIndex("index")).getShard(0); + + final SearchService searchService = getInstanceFromNode(SearchService.class); + Thread[] threads = new Thread[randomIntBetween(2, 8)]; + CountDownLatch latch = new CountDownLatch(threads.length); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + latch.countDown(); + try { + latch.await(); + for (; ; ) { + try { + searchService.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId())); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), equalTo( + "Trying to create too many scroll contexts. Must be less than or equal to: " + + "[" + maxScrollContexts + "]. " + + "This limit can be set by changing the [search.max_open_scroll_context] setting.")); + return; + } + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + threads[i].setName("elasticsearch[node_s_0][search]"); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(searchService.getActiveContexts(), equalTo(maxScrollContexts)); + searchService.freeAllScrollContexts(); + } finally { + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(SearchService.MAX_OPEN_SCROLL_CONTEXT.getKey()))); + } + } + public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin { @Override public List> getQueries() {