Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch channel with many subscribers has sub-optimal multithreaded performance #5403

Closed
tijsvd opened this issue Jan 27, 2023 · 29 comments · Fixed by #5464
Closed

Watch channel with many subscribers has sub-optimal multithreaded performance #5403

tijsvd opened this issue Jan 27, 2023 · 29 comments · Fixed by #5464
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync T-performance Topic: performance and benchmarks

Comments

@tijsvd
Copy link
Contributor

tijsvd commented Jan 27, 2023

Version

tokio v1.24.2

Platform

Linux x1 5.15.0-58-generic #64~20.04.1-Ubuntu SMP Fri Jan 6 16:42:31 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Description

Hi,

I encountered a performance issue in tokio::sync::watch. The usage pattern is something like:

  • there is a large number of networked subscribers
  • something happens centrally (reception of message from upstream)
  • that something is broadcast around using a watch channel
  • every subscriber has a little task running, which waits for changes on the channel, takes the latest value, does some subscriber-specific processing, sends out a message, and goes back to waiting

We use watch here instead of broadcast because we care only about the latest value, and this is an elegant way to deal with slow clients. I would think is a fairly typical use case for watch channel.

While doing some benchmarking, I saw a massive decay in throughput (actually received messages) moving from flavor="current_thread" to multi-thread. Also, I expected high CPU usage due to parallel per-subscriber processing. Instead, CPU stayed around 130-180% on a 8-vcore machine.

I have sort of reduced it to this example program:

use std::time::Duration;                                                                                           
use tokio::sync::watch;                                                                                            
use tokio::time::sleep;                                                                                            
                                                                                                                   
#[tokio::main]                                                                                                     
async fn main() {                                                                                                  
    let (snd, _) = watch::channel(0i32);                                                                           
    for _ in 0..1000 {                                                                                             
        let mut rcv = snd.subscribe();                                                                             
        tokio::spawn(async move {                                                                                  
            loop {                                                                                                 
                if rcv.changed().await.is_err() {                                                                  
                    break;                                                                                         
                }                                                                                                  
                // read lock                                                                                       
                let _ = *rcv.borrow();                                                                             
            }                                                                                                      
        });                                                                                                        
    }                                                                                                              
    for i in 0..1000 {                                                                                             
        sleep(Duration::from_millis(1)).await;                                                                     
        let _ = snd.send(i);                                                                                       
    }                                                                                                              
}                                                                                                                  

Not much happens here, yet running this reports almost 6 seconds of CPU time used, of which 1.6 system time. Changing to #[tokio::main(flavor="current_thread")] changes that to hardly anything (0.35s user time, 0.05s system). Also, running with strace shows a lot of futex calls.

So apparently there's a lot of lock contention. Digging into it, I think (well, reasonably sure) that the following happens.

  • calling send on the watch::Sender translates to Notify::notify_waiters; this by itself is fine, it sets all waiting tasks as runnable without locks held
  • tasks start running, and worker threads start stealing some
  • multiple threads are now processing the subscription tasks, and since they're fairly short, many end around the same time
  • at the end of the task loop, the tasks will go back to waiting on the same channel, which requires re-registering with the Notify
  • the waiter list inside the Notify is behind a mutex, and this is where the contention is.

If we accept that this is indeed a fairly typical use case, then it could, I think, be solved by making a different trade-off between send/receive performance and clone/subscribe/drop overhead:

  • give each receiver a unique 64-bit ID, allocated from an AtomicU64 in the shared state;
  • each receiver also has a Arc<AtomicWaker>;
  • inside shared state, instead of a notifier, maintain a Mutex<HashMap<u64, Arc<AtomicWaker>>>, which is accessed in Sender::send, Sender::subscribe, Receiver::clone and Receiver::drop only, so not in Receiver::changed.

Sender code is then something like:

... // update value
... // update version
let wakers = inner
  .subscriptions
  .lock()
  .values()
  .filter_map(|w| w.take())
  .collect::<Vec<_>>(); // FIXME reuse vector capacity
for waker in wakers {
  waker.wake();
}

Changing my example case above to something like this, reduces the CPU overhead to ~1sec, all of which user time. There is still non-negligible overhead (contention on the rwlock cache line), but no thread stalls.

I did something like this in my production code, which solved the problem. I actually created a special type of notifier, which is another consideration, but the use cases for Notify are probably more diverse than watch.

@tijsvd tijsvd added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jan 27, 2023
@Darksonn Darksonn added M-sync Module: tokio/sync T-performance Topic: performance and benchmarks labels Jan 27, 2023
@gustav3d
Copy link

I also had to implemented a specialized notifier due to how horrible the default one is vs whats optimal for some cases.

@carllerche
Copy link
Member

@gustav3d We are always happy to hear proposals (and eventually PRs) to improve performance. Please share your strategy.

@carllerche
Copy link
Member

@tijsvd Try updating your benchmark to the following and run again:

#[tokio::main]                                                                                                     
async fn main() {
    tokio::spawn(async {
            let (snd, _) = watch::channel(0i32);                                                                           
            for _ in 0..1000 {                                                                                             
                let mut rcv = snd.subscribe();                                                                             
                tokio::spawn(async move {                                                                                  
                    loop {                                                                                                 
                        if rcv.changed().await.is_err() {                                                                  
                            break;                                                                                         
                        }                                                                                                  
                        // read lock                                                                                       
                        let _ = *rcv.borrow();                                                                             
                    }                                                                                                      
                });                                                                                                        
            }                                                                                                              
            for i in 0..1000 {                                                                                             
                sleep(Duration::from_millis(1)).await;                                                                     
                let _ = snd.send(i);                                                                                       
            } 
    }).await.unwrap();                                                                                                
}  

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 15, 2023

Try updating your benchmark to the following and run again

I did, it improves the benchmark performance a lot. There is still considerable overhead compared to single-thread or mutex-free notification.

However, strace shows nothing at all now during the processing, which would indicate that no threads are being parked / unparked? I find this behaviour weird. Cpu does seem to be spread over the worker threads. Could it be that all workers wait for the sleep (no syscall?), one wins, the others go back to sleep, and the one worker just executes all the futures with no work stealing? I would expect that if any worker wakes up 1000 tasks, it should post some notification? Or does it, but that doesn't show up as a traceable syscall?

I'll dig into this a bit more tomorrow. I could not find a good description of how the scheduler / work stealing is actually supposed to work. I noticed that the original main thread does not actually participate in the scheduler, it runs only the join handle? Do the worker threads all run epoll, or would all incoming I/O then come from the main thread, and would all futures awoken by I/O also run on the main thread? (In which case the improvement of the benchmark is moot, as events would be caused by incoming network messages.)

Edit: never mind about strace, I was running it incorrectly (without -f). @carllerche can you explain why this change should make a large difference?

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 15, 2023

can you explain why this change should make a large difference?

To answer my own question: if the main thread only runs the "main" task, and all spawned tasks are run only on worker threads, then if the main task causes all events, the worker threads are awoken at the same time, so mutex contention is heaviest. If one of the worker threads causes the events, it should be able to process a significant chunk of the waiters before the other threads are even ready to steal the work.

But that is then mostly an artifact of the (admittedly far from perfect) benchmark -- the receiver tasks don't do any real work. Where I observed this in pre-production testing, the event source was a spawned task, but the receivers did more work per event, so the division of work across threads would have been more even.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 16, 2023

I put together a more detailed benchmark here: https://github.com/tijsvd/tokio-watch-benchmark

It performs some actual work in each receiver, and instead of looking at CPU, it measures the latency from send until the last receiver has finished its work. I also put together a quick-and-dirty watch channel as proposed, and it's measured as well.

Output on my laptop (8 threads):

TOKIO WATCH
running single thread
avg=2.649624ms  <2.476153ms [ 2.649857ms 2.666352ms 2.68224ms ] 2.836277ms>
running multi thread with main thread sender
avg=1.286592ms  <1.079605ms [ 1.203281ms 1.276133ms 1.373721ms ] 1.505286ms>
running multi thread with spawned sender
avg=1.29789ms  <1.184666ms [ 1.255816ms 1.291815ms 1.337994ms ] 1.425623ms>
CUSTOM WATCH
running single thread
avg=2.695562ms  <2.645752ms [ 2.694761ms 2.707446ms 2.714412ms ] 2.748192ms>
running multi thread with main thread sender
avg=1.036344ms  <983.935µs [ 1.007991ms 1.026612ms 1.058813ms ] 1.123364ms>
running multi thread with spawned sender
avg=960.654µs  <901.185µs [ 952.338µs 963.354µs 974.377µs ] 993.665µs>

@carllerche
Copy link
Member

More info here: #5446

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

More info here

Right, that issue is about having to cross a thread boundary. In this case, we're actually looking to cross a thread boundary, in order to spread the work in the multiple receivers. I think the more real-world benchmark above shows that it doesn't really matter much.

If there is broader consensus that send/receive performance for watch channels is more important than subscribe/drop, then I'd be happy to volunteer a PR. In that case I'd like some feedback on the best setup:

  • a change local to the watch module, or
  • use of an internal new notify construct that may be used for other cases, if we can identify them, or
  • a new public construct like Notify -- but it'd be hard to explain the difference, and a watch::channel(()) is probably more ergonomic for most use cases.

@Darksonn
Copy link
Contributor

Before you start working a PR, I would like to hear a description of the idea behind your solution. I note that the current implementation does not allocate memory in changed or when creating a new receiver.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

The idea is to trade the mutex that protects the list of waiters, against a pre-registration of waker slots.

That means a new receiver would allocate memory. No need to allocate in changed, that would remain mostly the same (except currently it's spread between Receiver::changed and Notify::notified).

The allocation for the receiver could be avoided by using some kind of custom allocator and a block of atomic wakers in the shared state, but that would likely have a negative performance effect on changed due to false sharing of cache lines.

Note that currently there may also be some mutex contention between send and changed, if more than 32 subscribers, due to the outer loop in Notify::notify_waiters. I don't know if that is relevant; likely not in an otherwise quiet situation, but perhaps more important if worker threads are awake and able to immediately react on the task wakeup. Trading a mutex on waiters with a mutex on subscriptions would also restrict this kind of contention to a situation where receiving tasks drop the receiver in reaction to the changed value.

Prototype (not otherwise optimised) here: https://github.com/tijsvd/tokio-watch-benchmark/blob/master/src/mywatch.rs

If all this sounds scary because of the larger overhead of Receiver::clone() / Receiver::drop(), then feel free to close -- again, I'm just signalling that this was a problem in my use case, but that doesn't mean it's the same for everyone.

I notice that broadcast channel uses a similar mutex-protected waitlist, so it might run into the same issue.

The issue as I see it is mostly about stalling worker threads on an OS-level lock, leading to insufficient speedup of the total work to be done on the number of cores available. Other "solutions" are available as well, but may be less generally good:

  • start more worker threads than cores, and let the OS care about utilisation
  • use a spinlock for these known-very-short-duration locks, in Notify and broadcast channel.

Edit: one more downside of what I propose, the sender will always iterate through the entire set of receivers; the assumption then is that most receivers would normally be waiting.

@Darksonn
Copy link
Contributor

Okay, so it uses an Mutex<HashMap<u64, Arc<AtomicWaker>>> so it can give each receiver an Arc<AtomicWaker>. Creating the Arc<AtomicWaker> objects does involve many allocations that would be preferable to avoid if possible.

If all this sounds scary because of the larger overhead of Receiver::clone() / Receiver::drop(), then feel free to close.

Regardless of how much I like your suggested solution, the problem definitely sounds like a real problem, and it would be good to find a solution.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

The only solution that I can think of, that moves the mutex contention from changed to clone, without a per-receiver allocation, is to make a linked list of receivers, instead of a linked list of Notified. But that would require pinning the receiver, so a breaking API change.

Or the shared state could have both, a one-time waiter list and a "persistent" waiter list that is fully iterated, and there's an additional Sender::subscribe_persistent that returns a new receiver type.

Wrt the allocations, are we afraid of alloc/free performance or memory use / fragmentation in general?

@Darksonn
Copy link
Contributor

There are also other potential solutions, for example we could put 16 copies of notify_rx in Shared and have changed pick one randomly. Then there's a much smaller chance of contention.

@Darksonn
Copy link
Contributor

Or for threads in a Tokio runtime, use the worker thread ID to force all receivers in the same runtime to use a different Notify, eliminating all contention in the single-runtime use-case.

@Darksonn
Copy link
Contributor

It might also be possible to create an atomic variant of Notify that is able to push to the linked list without the use of a mutex.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

Or for threads in a Tokio runtime, use the worker thread ID to force all receivers in the same runtime to use a different Notify, eliminating all contention in the single-runtime use-case.

So essentially have a 2-layer waiter list, first keyed by the thread ID at time of registration? That's clever but also not trivial. Potentially I can create a new runtime with more threads, then subscribe from within that runtime. Which means that the shared structure needs to grow at that point.

It might also be possible to create an atomic variant of Notify that is able to push to the linked list without the use of a mutex

This is, I think, very close to just using a spinlock.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

I have changed my prototype to use a linked list, with a pinned receiver. More complex, but no allocs and even better timing:

TOKIO WATCH
running single thread
avg=2.572526ms  <2.249407ms [ 2.544797ms 2.568492ms 2.741106ms ] 2.864615ms>
running multi thread with main thread sender
avg=1.266812ms  <1.042975ms [ 1.197027ms 1.277095ms 1.350633ms ] 1.475874ms>
running multi thread with spawned sender
avg=1.325034ms  <1.19766ms [ 1.276091ms 1.321332ms 1.384347ms ] 1.49261ms>
CUSTOM WATCH
running single thread
avg=2.57104ms  <2.461951ms [ 2.498101ms 2.513617ms 2.697105ms ] 2.740753ms>
running multi thread with main thread sender
avg=932.36µs  <414.713µs [ 925.2µs 962.261µs 1.002785ms ] 1.071667ms>
running multi thread with spawned sender
avg=934.073µs  <872.976µs [ 924.746µs 935.956µs 949.149µs ] 1.013803ms>

@Darksonn
Copy link
Contributor

So essentially have a 2-layer waiter list, first keyed by the thread ID at time of registration? That's clever but also not trivial. Potentially I can create a new runtime with more threads, then subscribe from within that runtime. Which means that the shared structure needs to grow at that point.

Well, one can fall back to the random bucket solution if it is used from multiple runtimes or from a runtime with more than 16 worker threads.

It might also be possible to create an atomic variant of Notify that is able to push to the linked list without the use of a mutex

This is, I think, very close to just using a spinlock.

I would not consider any solution involving a spin lock to be an example of what I mentioned.

I have changed my prototype to use a linked list, with a pinned receiver.

I haven't understood what this solution actually entails.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

I haven't understood what this solution actually entails.

Instead of using an Arc<AtomicWaker>, have a linked list of wakers, where the receiver inserts itself the first time changed is polled. It's very ugly though.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

I would not consider any solution involving a spin lock to be an example of what I mentioned.

I mean in terms of the number of atomic operations performed in practice. Note that any time you write something like:

let mut value = some_atomic.load(Ordering::Relaxed);
loop {
   let new_value = some_op(value);
   match some_atomic.compare_exchange(value, new_value, Ordering::SeqCst, Ordering::SeqCst) {
     Ok(_) => break,
     Err(curval) => value = curval,
  }
}

then that is essentially a spinlock, and will behave as such under heavy contention.

@Darksonn
Copy link
Contributor

then that is essentially a spinlock, and will behave as such under heavy contention.

No, spinlocks are much worse because in your example, the CAS can only fail if another thread has made progress. You do not have such a guarantee with a spinlock. (Of course, contention can still be a problem for the CAS loop.)

Anyways, how does your benchmark perform using these two implementations?

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

notify-random:

TOKIO WATCH
running single thread
avg=2.563468ms  <2.384683ms [ 2.552499ms 2.568129ms 2.580344ms ] 2.779218ms>
running multi thread with main thread sender
avg=1.113635ms  <1.001235ms [ 1.051909ms 1.075642ms 1.148269ms ] 1.320435ms>
running multi thread with spawned sender
avg=1.020961ms  <951.89µs [ 1.0051ms 1.016919ms 1.032745ms ] 1.100417ms>

so that's a significant improvement over latest release.

fetch-add:

TOKIO WATCH
running single thread
avg=2.60633ms  <2.494356ms [ 2.570568ms 2.584443ms 2.597674ms ] 2.799817ms>
running multi thread with main thread sender
avg=1.144432ms  <1.049795ms [ 1.094227ms 1.118736ms 1.186467ms ] 1.309496ms>
running multi thread with spawned sender
avg=1.049389ms  <980.366µs [ 1.037399ms 1.050089ms 1.06229ms ] 1.127742ms>

slightly worse (likely due to contention on the counter cache line).

@Darksonn
Copy link
Contributor

What happens if the array is reduced to 8 or 4?

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

I tried this in your BigNotify code, but it's not better than random:

        static THREAD_CNT: AtomicUsize = AtomicUsize::new(0);
        thread_local! {
            static THREAD_ID: usize = THREAD_CNT.fetch_add(1, Relaxed) % 16;
        }
        let i = THREAD_ID.with(|tid| *tid);
        self.inner[i].notified().await;

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

What happens if the array is reduced to 8 or 4?

With 8 it's about the same, with 4 it's worse. With 32 it's better:

running single thread
avg=2.603273ms  <2.516058ms [ 2.544477ms 2.56057ms 2.707762ms ] 2.775609ms>
running multi thread with main thread sender
avg=1.075749ms  <1.005522ms [ 1.032268ms 1.057611ms 1.10977ms ] 1.193817ms>
running multi thread with spawned sender
avg=1.002765ms  <897.487µs [ 992.836µs 1.00747ms 1.028271ms ] 1.100007ms>

but that's just my benchmark on my machine.

@Darksonn
Copy link
Contributor

Thank you. Those numbers seem like the solution is good enough. I went ahead and submitted the PR myself since I've already implemented the solution.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

Thanks. This seems like a simple enough general improvement. I'll keep my linked-list-of-pinned-subscribers solution for special cases.

@tijsvd
Copy link
Contributor Author

tijsvd commented Feb 17, 2023

It may be worthwhile porting this to sync::broadcast as well.

@Darksonn
Copy link
Contributor

I agree, but it looks non-trivial. I opened #5465 to track that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync T-performance Topic: performance and benchmarks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants