Skip to content

Commit

Permalink
Use global thread for event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
quininer committed Oct 27, 2020
1 parent aa8436e commit 8ebd288
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions src/event/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, SyncSender},
Arc,
},
thread,
Expand All @@ -13,6 +14,8 @@ use futures_core::{
task::{Context, Poll},
};

use lazy_static::lazy_static;

use crate::Result;

use super::{
Expand All @@ -34,16 +37,16 @@ use super::{
#[derive(Debug)]
pub struct EventStream {
poll_internal_waker: Waker,
stream_wake_thread_spawned: Arc<AtomicBool>,
stream_wake_thread_should_shutdown: Arc<AtomicBool>,
stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_task_should_shutdown: Arc<AtomicBool>,
}

impl Default for EventStream {
fn default() -> Self {
EventStream {
poll_internal_waker: INTERNAL_EVENT_READER.write().waker(),
stream_wake_thread_spawned: Arc::new(AtomicBool::new(false)),
stream_wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)),
stream_wake_task_executed: Arc::new(AtomicBool::new(false)),
stream_wake_task_should_shutdown: Arc::new(AtomicBool::new(false)),
}
}
}
Expand All @@ -55,6 +58,36 @@ impl EventStream {
}
}

lazy_static!{
static ref TERM_EVENT_THREAD: SyncSender<Task> = {
let (sender, receiver) = mpsc::sync_channel::<Task>(3);

thread::spawn(move || {
while let Ok(task) = receiver.recv() {
loop {
if let Ok(true) = poll_internal(None, &EventFilter) {
break;
}

if task.stream_wake_task_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
task.stream_wake_task_executed.store(false, Ordering::SeqCst);
task.stream_waker.wake();
}
});

sender
};
}

struct Task {
stream_waker: std::task::Waker,
stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_task_should_shutdown: Arc<AtomicBool>,
}

// Note to future me
//
// We need two wakers in order to implement EventStream correctly.
Expand Down Expand Up @@ -86,28 +119,20 @@ impl Stream for EventStream {
},
Ok(false) => {
if !self
.stream_wake_thread_spawned
.stream_wake_task_executed
.compare_and_swap(false, true, Ordering::SeqCst)
{
let stream_waker = cx.waker().clone();
let stream_wake_thread_spawned = self.stream_wake_thread_spawned.clone();
let stream_wake_thread_should_shutdown =
self.stream_wake_thread_should_shutdown.clone();

stream_wake_thread_should_shutdown.store(false, Ordering::SeqCst);

thread::spawn(move || {
loop {
if let Ok(true) = poll_internal(None, &EventFilter) {
break;
}

if stream_wake_thread_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
stream_wake_thread_spawned.store(false, Ordering::SeqCst);
stream_waker.wake();
let stream_wake_task_executed = self.stream_wake_task_executed.clone();
let stream_wake_task_should_shutdown =
self.stream_wake_task_should_shutdown.clone();

stream_wake_task_should_shutdown.store(false, Ordering::SeqCst);

let _ = TERM_EVENT_THREAD.send(Task {
stream_waker,
stream_wake_task_executed,
stream_wake_task_should_shutdown
});
}
Poll::Pending
Expand All @@ -120,7 +145,7 @@ impl Stream for EventStream {

impl Drop for EventStream {
fn drop(&mut self) {
self.stream_wake_thread_should_shutdown
self.stream_wake_task_should_shutdown
.store(true, Ordering::SeqCst);
let _ = self.poll_internal_waker.wake();
}
Expand Down

0 comments on commit 8ebd288

Please sign in to comment.