Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vnetserg committed Jan 19, 2024
1 parent 33a547d commit a017b7b
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,14 @@ impl<T> Shared<T> {
// Safety: `tail` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(waiter.queued.swap(false, Release));

if let Some(waker) = waiter.waker.take() {
wakers.push(waker);
}

// `Release` is needed to synchronize with `Recv::drop`.
// It is critical to set this variable **after** waker
// is extracted, otherwise we may data race with `Recv::drop`.
assert!(waiter.queued.swap(false, Release));
}
None => {
break 'outer;
Expand Down Expand Up @@ -1104,6 +1107,11 @@ impl<T> Receiver<T> {
}
}

// If the waiter is not already queued, enqueue it.
// Relaxed memory order suffices because we don't need
// to synchronize with `Recv::drop` here (calling
// `Receiver::recv_ref` with a waiter implies ownership
// of the corresponding `Recv`).
if !(*ptr).queued.swap(true, Relaxed) {
tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
}
Expand Down Expand Up @@ -1401,16 +1409,23 @@ where

impl<'a, T> Drop for Recv<'a, T> {
fn drop(&mut self) {
// Safety: `waiter.queued` is atomic.
// Acquire ordering is required to synchronize with
// `Shared::notify_rx` before we drop the object.
let queued = self
.waiter
.with(|ptr| unsafe { (*ptr).queued.load(Acquire) });

// If the waiter is queued, we need to unlink it from the waiters list.
// If not, no further synchronization is required, since the waiter
// is not in the list and, as such, is not shared with any other threads.
if queued {
// Acquire the tail lock. This is required for safety before accessing
// the waiter node.
let mut tail = self.receiver.shared.tail.lock();

// safety: tail lock is held
// Safety: tail lock is held.
// Relaxed order suffices because we hold the tail lock.
let queued = self
.waiter
.with(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
Expand Down

0 comments on commit a017b7b

Please sign in to comment.