-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Watch channel with many subscribers has sub-optimal multithreaded performance #5403
Comments
I also had to implemented a specialized notifier due to how horrible the default one is vs whats optimal for some cases. |
@gustav3d We are always happy to hear proposals (and eventually PRs) to improve performance. Please share your strategy. |
@tijsvd Try updating your benchmark to the following and run again: #[tokio::main]
async fn main() {
tokio::spawn(async {
let (snd, _) = watch::channel(0i32);
for _ in 0..1000 {
let mut rcv = snd.subscribe();
tokio::spawn(async move {
loop {
if rcv.changed().await.is_err() {
break;
}
// read lock
let _ = *rcv.borrow();
}
});
}
for i in 0..1000 {
sleep(Duration::from_millis(1)).await;
let _ = snd.send(i);
}
}).await.unwrap();
} |
I did, it improves the benchmark performance a lot. There is still considerable overhead compared to single-thread or mutex-free notification. However, I'll dig into this a bit more tomorrow. I could not find a good description of how the scheduler / work stealing is actually supposed to work. I noticed that the original main thread does not actually participate in the scheduler, it runs only the join handle? Do the worker threads all run epoll, or would all incoming I/O then come from the main thread, and would all futures awoken by I/O also run on the main thread? (In which case the improvement of the benchmark is moot, as events would be caused by incoming network messages.) Edit: never mind about |
To answer my own question: if the main thread only runs the "main" task, and all spawned tasks are run only on worker threads, then if the main task causes all events, the worker threads are awoken at the same time, so mutex contention is heaviest. If one of the worker threads causes the events, it should be able to process a significant chunk of the waiters before the other threads are even ready to steal the work. But that is then mostly an artifact of the (admittedly far from perfect) benchmark -- the receiver tasks don't do any real work. Where I observed this in pre-production testing, the event source was a spawned task, but the receivers did more work per event, so the division of work across threads would have been more even. |
I put together a more detailed benchmark here: https://github.com/tijsvd/tokio-watch-benchmark It performs some actual work in each receiver, and instead of looking at CPU, it measures the latency from send until the last receiver has finished its work. I also put together a quick-and-dirty watch channel as proposed, and it's measured as well. Output on my laptop (8 threads):
|
More info here: #5446 |
Right, that issue is about having to cross a thread boundary. In this case, we're actually looking to cross a thread boundary, in order to spread the work in the multiple receivers. I think the more real-world benchmark above shows that it doesn't really matter much. If there is broader consensus that send/receive performance for watch channels is more important than subscribe/drop, then I'd be happy to volunteer a PR. In that case I'd like some feedback on the best setup:
|
Before you start working a PR, I would like to hear a description of the idea behind your solution. I note that the current implementation does not allocate memory in |
The idea is to trade the mutex that protects the list of waiters, against a pre-registration of waker slots. That means a new receiver would allocate memory. No need to allocate in The allocation for the receiver could be avoided by using some kind of custom allocator and a block of atomic wakers in the shared state, but that would likely have a negative performance effect on Note that currently there may also be some mutex contention between Prototype (not otherwise optimised) here: https://github.com/tijsvd/tokio-watch-benchmark/blob/master/src/mywatch.rs If all this sounds scary because of the larger overhead of I notice that broadcast channel uses a similar mutex-protected waitlist, so it might run into the same issue. The issue as I see it is mostly about stalling worker threads on an OS-level lock, leading to insufficient speedup of the total work to be done on the number of cores available. Other "solutions" are available as well, but may be less generally good:
Edit: one more downside of what I propose, the sender will always iterate through the entire set of receivers; the assumption then is that most receivers would normally be waiting. |
Okay, so it uses an
Regardless of how much I like your suggested solution, the problem definitely sounds like a real problem, and it would be good to find a solution. |
The only solution that I can think of, that moves the mutex contention from Or the shared state could have both, a one-time waiter list and a "persistent" waiter list that is fully iterated, and there's an additional Wrt the allocations, are we afraid of alloc/free performance or memory use / fragmentation in general? |
There are also other potential solutions, for example we could put 16 copies of |
Or for threads in a Tokio runtime, use the worker thread ID to force all receivers in the same runtime to use a different |
It might also be possible to create an atomic variant of |
So essentially have a 2-layer waiter list, first keyed by the thread ID at time of registration? That's clever but also not trivial. Potentially I can create a new runtime with more threads, then subscribe from within that runtime. Which means that the shared structure needs to grow at that point.
This is, I think, very close to just using a spinlock. |
I have changed my prototype to use a linked list, with a pinned receiver. More complex, but no allocs and even better timing:
|
Well, one can fall back to the random bucket solution if it is used from multiple runtimes or from a runtime with more than 16 worker threads.
I would not consider any solution involving a spin lock to be an example of what I mentioned.
I haven't understood what this solution actually entails. |
Instead of using an |
I mean in terms of the number of atomic operations performed in practice. Note that any time you write something like: let mut value = some_atomic.load(Ordering::Relaxed);
loop {
let new_value = some_op(value);
match some_atomic.compare_exchange(value, new_value, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(curval) => value = curval,
}
} then that is essentially a spinlock, and will behave as such under heavy contention. |
No, spinlocks are much worse because in your example, the CAS can only fail if another thread has made progress. You do not have such a guarantee with a spinlock. (Of course, contention can still be a problem for the CAS loop.) Anyways, how does your benchmark perform using these two implementations? |
notify-random:
so that's a significant improvement over latest release. fetch-add:
slightly worse (likely due to contention on the counter cache line). |
What happens if the array is reduced to 8 or 4? |
I tried this in your BigNotify code, but it's not better than random: static THREAD_CNT: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static THREAD_ID: usize = THREAD_CNT.fetch_add(1, Relaxed) % 16;
}
let i = THREAD_ID.with(|tid| *tid);
self.inner[i].notified().await; |
With 8 it's about the same, with 4 it's worse. With 32 it's better:
but that's just my benchmark on my machine. |
Thank you. Those numbers seem like the solution is good enough. I went ahead and submitted the PR myself since I've already implemented the solution. |
Thanks. This seems like a simple enough general improvement. I'll keep my linked-list-of-pinned-subscribers solution for special cases. |
It may be worthwhile porting this to |
I agree, but it looks non-trivial. I opened #5465 to track that. |
Version
tokio v1.24.2
Platform
Linux x1 5.15.0-58-generic #64~20.04.1-Ubuntu SMP Fri Jan 6 16:42:31 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Description
Hi,
I encountered a performance issue in
tokio::sync::watch
. The usage pattern is something like:watch
channelWe use
watch
here instead ofbroadcast
because we care only about the latest value, and this is an elegant way to deal with slow clients. I would think is a fairly typical use case for watch channel.While doing some benchmarking, I saw a massive decay in throughput (actually received messages) moving from
flavor="current_thread"
to multi-thread. Also, I expected high CPU usage due to parallel per-subscriber processing. Instead, CPU stayed around 130-180% on a 8-vcore machine.I have sort of reduced it to this example program:
Not much happens here, yet running this reports almost 6 seconds of CPU time used, of which 1.6 system time. Changing to
#[tokio::main(flavor="current_thread")]
changes that to hardly anything (0.35s user time, 0.05s system). Also, running withstrace
shows a lot offutex
calls.So apparently there's a lot of lock contention. Digging into it, I think (well, reasonably sure) that the following happens.
send
on thewatch::Sender
translates toNotify::notify_waiters
; this by itself is fine, it sets all waiting tasks as runnable without locks heldNotify
Notify
is behind a mutex, and this is where the contention is.If we accept that this is indeed a fairly typical use case, then it could, I think, be solved by making a different trade-off between send/receive performance and clone/subscribe/drop overhead:
AtomicU64
in the shared state;Arc<AtomicWaker>
;Mutex<HashMap<u64, Arc<AtomicWaker>>>
, which is accessed inSender::send
,Sender::subscribe
,Receiver::clone
andReceiver::drop
only, so not inReceiver::changed
.Sender code is then something like:
Changing my example case above to something like this, reduces the CPU overhead to ~1sec, all of which user time. There is still non-negligible overhead (contention on the rwlock cache line), but no thread stalls.
I did something like this in my production code, which solved the problem. I actually created a special type of notifier, which is another consideration, but the use cases for
Notify
are probably more diverse thanwatch
.The text was updated successfully, but these errors were encountered: