Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] Make MultiBucketConsumerService thread safe to use across slices during search #7785

Closed
sohami opened this issue May 26, 2023 · 13 comments · Fixed by #9047
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@sohami
Copy link
Collaborator

sohami commented May 26, 2023

Is your feature request related to a problem? Please describe.
With concurrent segment search we will need to make the MultiBucketConsumer thread safe as it will be shared across different slices of the shard segments. Ref here.

Describe the solution you'd like
Make MultiBucketConsumer thread safe. We also need to see if the usage will change with Concurrent flow. For example: currently if both global and non global aggregations are present in a request, it keep tracks of the buckets across both of those and then reset after collection is completed for both aggregations. Given with concurrent segment search reduce is happening separately for both Aggregation types we will need to ensure that reset only happens in postProcess for concurrent model i.e. when both aggregation has completed the reduce phase.

@sohami sohami added enhancement Enhancement or improvement to existing feature or request untriaged labels May 26, 2023
@reta
Copy link
Collaborator

reta commented Jun 2, 2023

@sohami we should be very careful to not introduce any contention here, defeating the purpose of searching concurrently. I have not looked into code deeply but have you considered the approach to allow overfetching? We do that for normal search as a limitation (for exactly same reasons)

@sohami
Copy link
Collaborator Author

sohami commented Jun 2, 2023

@reta With initial look this is what I have found.

MultiBucketConsumer is used in 2 stages:

  1. Final Reduce stage on coordinator side when the actual number of buckets collected is compared against the limit. I guess this is what you are referring as overfetching since shards can still collect buckets more than the limit and it is finally checked in coordinator side ? Ref here
  2. On shards, where it doesn't check for bucket collection limit but does periodic check to see if request or parent level CircuitBreaker trips or not based on current usage. Ref here)

With concurrent search the reduce of aggregation will also happen on shards but on single thread during reduce phase. Also this is partial reduce where the bucket consumer is not consulted. So there is no change needed here and will keep the behavior as is.

Currently MultiBucketConsumer has 2 variables count and callCount which are non atomic.

  • count is used for case 1 which is final reduce phase and that happens on single thread, so I don't see any changes are needed for that.
  • callCount which will be incremented by different slices in concurrent case and will need to be made thread safe. This is used to trigger the CB to see if limit is breached with a value of 0

For callCount I am thinking below:

Make callCount an AtomicInteger also keep another AtomicBoolean variable which will be set when the CB trips for the first time. For each call to accept(value), increment and get the current value and compare that against the 1024 or check if new boolean variable is set. In either case it will call CB to check for memory pressure. This way if the first slice thread trips then all the other slice threads will also trip with CB check and fail all the slice threads for that request. Without the boolean variable one of the slice can trip whereas other slice can continue running and will trip only once the callCount has been increased by 1024 and so on.

@reta
Copy link
Collaborator

reta commented Jun 5, 2023

@sohami thanks a lot for the research,

My main concern is how much the contention would impact the concurrent search, from this perspective adding more synchronization would definitely have larger (negative) impact. I would suggest to use LongAccumulator instead of AtomicInteger (you could check the Javadocs why it is preferable) and, if possible, drop the usage of the AtomicBoolean. Regarding the tripping the other slices (if we drop AtomicBoolean), we could consider using LongAccumulator for that by adding some very large number to record the fact that one of the slices had overflow (over the limit), just an idea, I am sure there are more clever options to look at as well.

@yigithub yigithub assigned sohami and unassigned sohami Jun 23, 2023
@neetikasinghal
Copy link
Contributor

I will be working on this issue

@neetikasinghal
Copy link
Contributor

neetikasinghal commented Jul 27, 2023

callCount variable needs to be made thread safe as there is a periodic check on the CircuitBreaker in the MultiConsumerService that checks if the CB has tripped. With Concurrent Search, for each shard, there are multiple slices running on different threads and having one instance of MultiBucketConsumer. Multiple threads within the same instance of MultiBucketConsumer will try to access the callCount variable, and as of current logic that checks for CB trip after every 1024 calls to the accept function, there is possibility that if the callCount is not made thread safe, the CB trip check might never happen.

In order to make MultiBucketConsumer thread safe, these are the following options(neetikasinghal@5153c45):

Option 1
Having synchronized block in the method, which makes variable callCount as thread safe.

Reference to the code pointer: link

Average time taken per test for call to the accept function(Total test runs=1000, number of threads per test=6): 1.098ms
Test code: link

Pros

  • This would essentially trip the circuit breaker for the other threads if one of the threads has already tripped the CB.
  • Least time taken in the test runs across different options.
  • There is one exception thrown per shard by the MultiBucketConsumer.

Cons

  • Under high contention, expected throughput of this can be lower.

Option 2
Initializing callCount as a LongAdder variable making it thread safe.

Reference to the code pointer: link

Average time taken per test for call to the accept function(Total test runs=1000, number of threads per test=6): 1.559ms

Pros

  • Under high contention, expected throughput of this is significantly higher
  • There is one exception thrown per shard by the MultiBucketConsumer.

Cons

  • This doesn't trip the CB on any other threads if one of the thread already has a circuit breaker.
  • Most time taken in the test runs across different options.
  • Can have a higher memory consumption due to the use of LongAdder

Option 3
Initializing callCount as a LongAdder variable making it thread safe and also having an additional Atomic variable that tries to trip the CB for the other threads in case for one of the threads CB has already tripped.

Reference to the code pointer: link

Average time taken per test for call to the accept function(Total test runs=1000, number of threads per test=6): 1.211ms

Pros

  • There is one exception thrown per shard by the MultiBucketConsumer.

Cons

  • Some threads might also trip CB after the CB has tripped but this doesn't guarantee that all the threads would essentially trip CB after the first thread tripped as there can be a thread that could come for a check in the interval between circuitBreakerTripped3 variable is set vs get.

Option 4
Make MultiBucketConsumer specific to per slice, this would mean that instances of MultiBucketConsumer are specific to each of the thread which means every thread would maintain the callCount variable, hence callCount variable wouldn't need be thread safe.

Pros

  • This would not need adding any synchronization as mentioned in the above options.

Cons

  • There will be multiple exceptions thrown per shard by the MultiBucketConsumer which would be a change in the current behavior with normal search.

@neetikasinghal
Copy link
Contributor

@sohami @reta @jed326 I would love to hear your inclination towards any of the options mentioned above.

@jed326
Copy link
Collaborator

jed326 commented Jul 27, 2023

@neetikasinghal Thanks for writing this up!

A few questions:

  • For option 3 does the boolean need to be atomic? We aren't "flipping" the value of the boolean, so to me this would just act more like eventual consistency. More importantly, seems to me like using an AtomicBoolean completely negates the benefit of using LongAdder and we may as well just use an AtomicInteger
  • I think it's good that you gathered some rough benchmarking for the 3 solutions, but I wonder if the difference in time is better explained by thread scheduling rather than the amount of synchronization? Moreover, I think we are more concerned about the performance gap at scale since that's really where the synchronization would take it's toll.

Also a few thoughts that aren't questions:

  • In general I think we should avoid using any atomic variables or synchronized methods since that kind of gets rid of our concurrency.
  • Fundamentally, what callCount does is provide a mechanism to trigger a periodic check of the parent level circuit breaker. Have you thought about making this a time based check instead? That way we wouldn't have to synchronize any data between the threads.

@reta
Copy link
Collaborator

reta commented Jul 31, 2023

@neetikasinghal I think you could combine the 2 and 3 approach and purely rely on LongAdder but flipping it to negative in case of CircuitBreakingException (it is somewhat a trick to get rid of additional boolean):


           callCount2.increment();

           final long sum = callCount2.sum()
            if(sum < 0) {
                throw new CircuitBreakingException("test", CircuitBreaker.Durability.PERMANENT);
            }

            // check parent circuit breaker every 1024 calls
            if ((sum & 0x3) == 0) {
                try {
                    breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
                } catch (CircuitBreakingException e) {
                    callCount2.add(Long.MAX_VALUE); // flip to negative
                    throw e;
                }
            }

@sohami
Copy link
Collaborator Author

sohami commented Jul 31, 2023

@neetikasinghal @reta: I though about this little more. For LongAdder we will need to check for range instead of a fixed value as sum method of LongAdder is eventually consistent. The range can be something small like 0-16 such that there is not a lot of overhead when the callCount is not incremented by multiple threads resulting in frequent check for CB.

@neetikasinghal
Copy link
Contributor

  • Fundamentally, what callCount does is provide a mechanism to trigger a periodic check of the parent level circuit breaker. Have you thought about making this a time based check instead? That way we wouldn't have to synchronize any data between the threads.

@jed326 time based check involves some benchmarking which on its on is a bigger design issue, what @sohami suggested above seems to be a better approach on having a range check for CB.

@neetikasinghal
Copy link
Contributor

@sohami what do u think of having Runtime.getRuntime().availableProcessors() instead of hard-coded value of 16?

Also, taking @reta's suggestion above, accept function would look like as follows:

private LongAdder callCount2 = new LongAdder();

public void accept(int value) {
            if (value != 0) {
                count += value;
                if (count > limit) {
                    throw new TooManyBucketsException(
                        "Trying to create too many buckets. Must be less than or equal to: ["
                            + limit
                            + "] but was ["
                            + count
                            + "]. This limit can be set by changing the ["
                            + MAX_BUCKET_SETTING.getKey()
                            + "] cluster level setting.",
                        limit
                    );
                }
            }
            callCount2.increment();
            long sum = callCount2.sum();
            if (sum < 0) {
                throw new CircuitBreakingException("test", CircuitBreaker.Durability.PERMANENT);
            }
            if ((sum > 0) && (sum & 0x3FF) <= Runtime.getRuntime().availableProcessors()) {
                try {
                    breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
                } catch (CircuitBreakingException e) {
                    callCount2.add(Long.MAX_VALUE); // flip to negative
                    throw e;
                }
            }
        }

@reta @sohami please let me know your thoughts on this.

@reta
Copy link
Collaborator

reta commented Aug 1, 2023

@reta @sohami please let me know your thoughts on this.

:+1 to try with availableProcessors (consulted LongAdder as well) but please discard the idea with flipping to negative - it does not work well under race (when callCount2.add(Long.MAX_VALUE) may be called from different threads), using volatile boolean flag would be sufficient I think.

@sohami
Copy link
Collaborator Author

sohami commented Aug 1, 2023

what do u think of having Runtime.getRuntime().availableProcessors() instead of hard-coded value of 16?

Yes availableProcessors is fine, 16 was just used for discussion :)

but please discard the idea with flipping to negative - it does not work well under race (when callCount2.add(Long.MAX_VALUE) may be called from different threads), using volatile boolean flag would be sufficient I think.

+1

@neetikasinghal Let's create a PR with the change and we can have more implementation related discussion there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
Status: Done
6 participants