diff --git a/CHANGELOG.md b/CHANGELOG.md index a37976462b38e..80de8c4c9e342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,7 +109,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807)) - Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) - [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005)) - +- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047)) ### Deprecated ### Removed diff --git a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java index f1416fddebfa2..5fa7c4a13c96a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java @@ -33,6 +33,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.settings.Setting; @@ -42,6 +43,7 @@ import org.opensearch.search.aggregations.bucket.BucketsAggregator; import java.io.IOException; +import java.util.concurrent.atomic.LongAdder; import java.util.function.IntConsumer; /** @@ -127,13 +129,36 @@ public static class MultiBucketConsumer implements IntConsumer { private final int limit; private final CircuitBreaker breaker; - // aggregations execute in a single thread so no atomic here + // aggregations execute in a single thread for both sequential + // and concurrent search, so no atomic here private int count; - private int callCount = 0; + + // will be updated by multiple threads in concurrent search + // hence making it as LongAdder + private final LongAdder callCount; + private volatile boolean circuitBreakerTripped; + private final int availProcessors; public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; this.breaker = breaker; + callCount = new LongAdder(); + availProcessors = Runtime.getRuntime().availableProcessors(); + } + + // only visible for testing + protected MultiBucketConsumer( + int limit, + CircuitBreaker breaker, + LongAdder callCount, + boolean circuitBreakerTripped, + int availProcessors + ) { + this.limit = limit; + this.breaker = breaker; + this.callCount = callCount; + this.circuitBreakerTripped = circuitBreakerTripped; + this.availProcessors = availProcessors; } @Override @@ -153,10 +178,26 @@ public void accept(int value) { ); } } - // check parent circuit breaker every 1024 calls - callCount++; - if ((callCount & 0x3FF) == 0) { - breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + callCount.increment(); + // tripping the circuit breaker for other threads in case of concurrent search + // if the circuit breaker has tripped for one of the threads already, more info + // can be found on: https://github.com/opensearch-project/OpenSearch/issues/7785 + if (circuitBreakerTripped) { + throw new CircuitBreakingException( + "Circuit breaker for this consumer has already been tripped by previous invocations. " + + "This can happen in case of concurrent segment search when multiple threads are " + + "executing the request and one of the thread has already tripped the circuit breaker", + breaker.getDurability() + ); + } + // check parent circuit breaker every 1024 to (1024 + available processors) calls + if ((callCount.sum() & 0x3FF) <= availProcessors) { + try { + breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } catch (CircuitBreakingException e) { + circuitBreakerTripped = true; + throw e; + } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/MultiBucketConsumerTests.java b/server/src/test/java/org/opensearch/search/aggregations/MultiBucketConsumerTests.java new file mode 100644 index 0000000000000..eda705f95bf9b --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/MultiBucketConsumerTests.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations; + +import org.mockito.Mockito; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.concurrent.atomic.LongAdder; + +import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; + +public class MultiBucketConsumerTests extends OpenSearchTestCase { + + public void testMultiConsumerAcceptWhenCBTripped() { + CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class); + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + breaker, + new LongAdder(), + true, + 1 + ); + // exception is thrown upfront since the circuit breaker has already tripped + expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0)); + Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } + + public void testMultiConsumerAcceptToTripCB() { + CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class); + LongAdder callCount = new LongAdder(); + for (int i = 0; i < 1024; i++) { + callCount.increment(); + } + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + breaker, + callCount, + false, + 2 + ); + // circuit breaker check is performed as the value of call count would be 1025 which is still in range + Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class); + expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0)); + Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } + + public void testMultiConsumerAccept() { + CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class); + LongAdder callCount = new LongAdder(); + for (int i = 0; i < 1100; i++) { + callCount.increment(); + } + MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + breaker, + callCount, + false, + 1 + ); + // no exception is thrown as the call count value is not in the expected range and CB is not checked + multiBucketConsumer.accept(0); + Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } +}