Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MultiBucketConsumerService thread safe to use across slices during search #9047

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))

- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Setting;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.search.aggregations.bucket.BucketsAggregator;

import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;

/**
Expand Down Expand Up @@ -127,13 +129,36 @@ public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
private final CircuitBreaker breaker;

// aggregations execute in a single thread so no atomic here
// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
private int count;
private int callCount = 0;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
private final LongAdder callCount;
private volatile boolean circuitBreakerTripped;
private final int availProcessors;

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
neetikasinghal marked this conversation as resolved.
Show resolved Hide resolved
this.limit = limit;
this.breaker = breaker;
callCount = new LongAdder();
availProcessors = Runtime.getRuntime().availableProcessors();
}

// only visible for testing
protected MultiBucketConsumer(
int limit,
CircuitBreaker breaker,
LongAdder callCount,
boolean circuitBreakerTripped,
int availProcessors
) {
this.limit = limit;
this.breaker = breaker;
this.callCount = callCount;
this.circuitBreakerTripped = circuitBreakerTripped;
this.availProcessors = availProcessors;
}

@Override
Expand All @@ -153,10 +178,27 @@ public void accept(int value) {
);
}
}
// check parent circuit breaker every 1024 calls
callCount++;
if ((callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
callCount.increment();
// tripping the circuit breaker for other threads in case of concurrent search
// if the circuit breaker has tripped for one of the threads already, more info
// can be found on: https://github.com/opensearch-project/OpenSearch/issues/7785
if (circuitBreakerTripped) {
reta marked this conversation as resolved.
Show resolved Hide resolved
throw new CircuitBreakingException(
"Circuit breaker for this consumer has already been tripped by previous invocations. "
+ "This can happen in case of concurrent segment search when multiple threads are "
+ "executing the request and one of the thread has already tripped the circuit breaker",
breaker.getDurability()
);
}
// check parent circuit breaker every 1024 to (1024 + available processors) calls
long sum = callCount.sum();
if ((sum >= 1024) && (sum & 0x3FF) <= availProcessors) {
try {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
} catch (CircuitBreakingException e) {
circuitBreakerTripped = true;
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.mockito.Mockito;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.atomic.LongAdder;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

public class MultiBucketConsumerTests extends OpenSearchTestCase {

public void testMultiConsumerAcceptWhenCBTripped() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
new LongAdder(),
true,
1
);
// exception is thrown upfront since the circuit breaker has already tripped
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0));
Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}

public void testMultiConsumerAcceptToTripCB() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
LongAdder callCount = new LongAdder();
for (int i = 0; i < 1024; i++) {
callCount.increment();
}
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
callCount,
false,
2
);
// circuit breaker check is performed as the value of call count would be 1025 which is still in range
Mockito.when(breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets")).thenThrow(CircuitBreakingException.class);
expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(0));
Mockito.verify(breaker, Mockito.times(1)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}

public void testMultiConsumerAccept() {
CircuitBreaker breaker = Mockito.mock(CircuitBreaker.class);
LongAdder callCount = new LongAdder();
for (int i = 0; i < 1100; i++) {
callCount.increment();
}
MultiBucketConsumerService.MultiBucketConsumer multiBucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
breaker,
callCount,
false,
1
);
// no exception is thrown as the call count value is not in the expected range and CB is not checked
multiBucketConsumer.accept(0);
Mockito.verify(breaker, Mockito.times(0)).addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}
Loading