Skip to content

Commit

Permalink
Debounce the provider events before the message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Apr 30, 2024
1 parent ef05bdb commit efb3b30
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 30 deletions.
1 change: 1 addition & 0 deletions crates/maple_core/src/stdio_server/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ impl Context {
self.env.provider_id.as_str()
}

/// Debounce delay for Provider::Event::OnTyped in milliseconds.
pub fn provider_debounce(&self) -> u64 {
maple_config::config().provider_debounce(self.env.provider_id.as_str())
}
Expand Down
134 changes: 104 additions & 30 deletions crates/maple_core/src/stdio_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,43 @@ use tokio::time::Instant;

pub type ProviderSessionId = u64;

// Type alias here for readability.
type DebouncedProviderEvent = ProviderEvent;

#[derive(Debug)]
pub struct ProviderSession {
ctx: Context,
id: ProviderId,
provider_session_id: ProviderSessionId,
/// Each provider session can have its own message processing logic.
provider: Box<dyn ClapProvider>,
provider_events: UnboundedReceiver<ProviderEvent>,
provider_events: UnboundedReceiver<DebouncedProviderEvent>,
}

struct DebounceTimer {
last_emitted: Option<std::time::Instant>,
debounce_period: Duration,
}

impl DebounceTimer {
fn new(debounce_period: Duration) -> Self {
Self {
last_emitted: None,
debounce_period,
}
}

fn should_emit(&mut self) -> bool {
let now = std::time::Instant::now();
if self.last_emitted.is_none()
|| now.duration_since(self.last_emitted.expect("Must be Some as checked"))
> self.debounce_period
{
self.last_emitted.replace(now);
return true;
}
false
}
}

impl ProviderSession {
Expand All @@ -33,21 +62,64 @@ impl ProviderSession {
provider_session_id: ProviderSessionId,
provider: Box<dyn ClapProvider>,
) -> (Self, UnboundedSender<ProviderEvent>) {
let (provider_event_sender, provider_event_receiver) = unbounded_channel();
let (origin_provider_event_sender, mut origin_provider_event_receiver) =
unbounded_channel();

let (debounced_provider_event_sender, debounced_provider_event_receiver) =
unbounded_channel();

ctx.set_provider_event_sender(provider_event_sender.clone());
ctx.set_provider_event_sender(origin_provider_event_sender.clone());

let id = ctx.env.provider_id.clone();
let debounce_delay = ctx.provider_debounce();

tokio::spawn(async move {
let mut on_move_timer = DebounceTimer::new(Duration::from_millis(200));
let mut on_typed_timer = DebounceTimer::new(Duration::from_millis(debounce_delay));

// Text input from users could be overloaded in a short period of time, e.g., OnMove
// and OnTyped can be too frequent if user types too fast, in which case we observe
// the receiver side of unbounded channel may not be able to receive the events in time,
// leading to the annoying frozen UI on the vim side. The cause is probably because of
// the event processing logic on the receiver side are not running in separate tasks,
// the processing of incoming messages cannot keep up with the rate of message generation,
// rendering the receiver may hang for a while. One proper solution is to process each
// provider event in a separate task, that requires more effoets however, now we choose to
// debounce the stream to avoid overwhelming the system.
while let Some(event) = origin_provider_event_receiver.recv().await {
tracing::debug!("Recv origin event: {event:?}");

let (should_emit, debounced_event) = match event {
ProviderEvent::OnMove(params) => {
(on_move_timer.should_emit(), ProviderEvent::OnMove(params))
}
ProviderEvent::OnTyped(params) => {
(on_typed_timer.should_emit(), ProviderEvent::OnTyped(params))
}
undebounced_event => (true, undebounced_event),
};

// Send event after debounce period
if should_emit {
if debounced_provider_event_sender
.send(debounced_event)
.is_err()
{
return;
}
}

Check failure on line 110 in crates/maple_core/src/stdio_server/service.rs

View workflow job for this annotation

GitHub Actions / clippy

this `if` statement can be collapsed

error: this `if` statement can be collapsed --> crates/maple_core/src/stdio_server/service.rs:103:17 | 103 | / if should_emit { 104 | | if debounced_provider_event_sender 105 | | .send(debounced_event) 106 | | .is_err() ... | 109 | | } 110 | | } | |_________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_if = note: `-D clippy::collapsible-if` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::collapsible_if)]` help: collapse nested if block | 103 ~ if should_emit && debounced_provider_event_sender 104 + .send(debounced_event) 105 + .is_err() { 106 + return; 107 + } |
}
});

let provider_session = ProviderSession {
ctx,
id,
provider_session_id,
provider,
provider_events: provider_event_receiver,
provider_events: debounced_provider_event_receiver,
};

(provider_session, provider_event_sender)
(provider_session, origin_provider_event_sender)
}

pub fn start_event_loop(self) {
Expand Down Expand Up @@ -79,12 +151,12 @@ impl ProviderSession {
// which is actually just 1 year in the future.
const NEVER: Duration = Duration::from_secs(365 * 24 * 60 * 60);

let mut on_move_dirty = false;
let mut on_move = None;
let on_move_delay = Duration::from_millis(50);
let on_move_timer = tokio::time::sleep(NEVER);
tokio::pin!(on_move_timer);

let mut on_typed_dirty = false;
let mut on_typed = None;
// Delay can be adjusted once we know the provider source scale.
//
// Here is the benchmark result of filtering on AMD 5900X:
Expand All @@ -102,7 +174,7 @@ impl ProviderSession {
maybe_event = self.provider_events.recv() => {
match maybe_event {
Some(event) => {
tracing::trace!(debounce = true, "[{}] Received event: {event:?}", self.id);
tracing::trace!(debounce = true, "[{}] Recv debounced event: {event:?}", self.id);

match event {
ProviderEvent::Internal(internal_event) => {
Expand All @@ -115,12 +187,12 @@ impl ProviderSession {
}
}
}
ProviderEvent::OnMove(_params) => {
on_move_dirty = true;
ProviderEvent::OnMove(params) => {
on_move.replace(params);
on_move_timer.as_mut().reset(Instant::now() + on_move_delay);
}
ProviderEvent::OnTyped(_params) => {
on_typed_dirty = true;
ProviderEvent::OnTyped(params) => {
on_typed.replace(params);
on_typed_timer.as_mut().reset(Instant::now() + on_typed_delay);
}
ProviderEvent::Key(key_event) => {
Expand All @@ -141,25 +213,27 @@ impl ProviderSession {
None => break, // channel has closed.
}
}
_ = on_move_timer.as_mut(), if on_move_dirty => {
on_move_dirty = false;
on_move_timer.as_mut().reset(Instant::now() + NEVER);
_ = on_move_timer.as_mut(), if on_move.is_some() => {
if let Some(_params) = on_move.take() {
on_move_timer.as_mut().reset(Instant::now() + NEVER);

if let Err(err) = self.provider.on_move(&mut self.ctx).await {
tracing::error!(?err, "Failed to process ProviderEvent::OnMove");
if let Err(err) = self.provider.on_move(&mut self.ctx).await {
tracing::error!(?err, "Failed to process ProviderEvent::OnMove");
}
}
}
_ = on_typed_timer.as_mut(), if on_typed_dirty => {
on_typed_dirty = false;
on_typed_timer.as_mut().reset(Instant::now() + NEVER);
_ = on_typed_timer.as_mut(), if on_typed.is_some() => {
if let Some(_params) = on_typed.take() {
on_typed_timer.as_mut().reset(Instant::now() + NEVER);

let _ = self.ctx.record_input().await;
let _ = self.ctx.record_input().await;

if let Err(err) = self.provider.on_typed(&mut self.ctx).await {
tracing::error!(?err, "Failed to process ProviderEvent::OnTyped");
}
if let Err(err) = self.provider.on_typed(&mut self.ctx).await {
tracing::error!(?err, "Failed to process ProviderEvent::OnTyped");
}

let _ = self.provider.on_move(&mut self.ctx).await;
let _ = self.provider.on_move(&mut self.ctx).await;
}
}
}
}
Expand Down Expand Up @@ -339,7 +413,6 @@ impl PluginSession {
const NEVER: Duration = Duration::from_secs(365 * 24 * 60 * 60);

let mut pending_plugin_event = None;
let mut notification_dirty = false;
let notification_timer = tokio::time::sleep(NEVER);
tokio::pin!(notification_timer);

Expand All @@ -348,11 +421,10 @@ impl PluginSession {
maybe_plugin_event = self.plugin_events.recv() => {
match maybe_plugin_event {
Some(plugin_event) => {
tracing::trace!(?plugin_event, "[{id}] Received event");
// tracing::trace!(?plugin_event, "[{id}] Received event");

if plugin_event.should_debounce() {
pending_plugin_event.replace(plugin_event);
notification_dirty = true;
notification_timer.as_mut().reset(Instant::now() + event_delay);
} else {
let res = match plugin_event.clone() {
Expand All @@ -367,8 +439,7 @@ impl PluginSession {
None => break, // channel has closed.
}
}
_ = notification_timer.as_mut(), if notification_dirty => {
notification_dirty = false;
_ = notification_timer.as_mut(), if pending_plugin_event.is_some() => {
notification_timer.as_mut().reset(Instant::now() + NEVER);

if let Some(autocmd) = pending_plugin_event.take() {
Expand Down Expand Up @@ -405,6 +476,9 @@ impl ServiceManager {
provider: Box<dyn ClapProvider>,
ctx: Context,
) {
// Only one provider instance is allowed.
//
// Kill the existing providers if any before creating a new one.
for (provider_session_id, sender) in self.providers.drain() {
tracing::debug!(?provider_session_id, "Sending internal Terminate signal");
sender.send(ProviderEvent::Internal(InternalProviderEvent::Terminate));
Expand Down

0 comments on commit efb3b30

Please sign in to comment.