Skip to content

Commit

Permalink
chore(watch): use std Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-olszewski committed Sep 16, 2024
1 parent 91a5169 commit eb28b9e
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::HashSet, ops::DerefMut, sync::Arc};
use std::{
collections::HashSet,
ops::DerefMut as _,
sync::{Arc, Mutex},
};

use futures::StreamExt;
use miette::{Diagnostic, SourceSpan};
use thiserror::Error;
use tokio::{
select,
sync::{Mutex, Notify},
task::JoinHandle,
};
use tokio::{select, sync::Notify, task::JoinHandle};
use tracing::{instrument, trace};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand Down Expand Up @@ -179,7 +179,7 @@ impl WatchClient {
let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Self::handle_change_event(&changed_packages, event.event.unwrap()).await?;
Self::handle_change_event(&changed_packages, event.event.unwrap())?;
notify_event.notify_one();
}

Expand All @@ -189,9 +189,13 @@ impl WatchClient {
let run_fut = async {
loop {
notify_run.notified().await;
let mut changed_packages_guard = changed_packages.lock().await;
if !changed_packages_guard.is_empty() {
let changed_packages = std::mem::take(changed_packages_guard.deref_mut());
let some_changed_packages = {
let mut changed_packages_guard =
changed_packages.lock().expect("poisoned lock");
(!changed_packages_guard.is_empty())
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};
if let Some(changed_packages) = some_changed_packages {
self.execute_run(changed_packages).await?;
}
}
Expand All @@ -214,7 +218,7 @@ impl WatchClient {
}

#[instrument(skip(changed_packages))]
async fn handle_change_event(
fn handle_change_event(
changed_packages: &Mutex<ChangedPackages>,
event: proto::package_change_event::Event,
) -> Result<(), Error> {
Expand All @@ -225,7 +229,7 @@ impl WatchClient {
}) => {
let package_name = PackageName::from(package_name);

match changed_packages.lock().await.deref_mut() {
match changed_packages.lock().expect("poisoned lock").deref_mut() {
ChangedPackages::All => {
// If we've already changed all packages, ignore
}
Expand All @@ -235,7 +239,7 @@ impl WatchClient {
}
}
proto::package_change_event::Event::RediscoverPackages(_) => {
*changed_packages.lock().await = ChangedPackages::All;
*changed_packages.lock().expect("poisoned lock") = ChangedPackages::All;
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
return Err(DaemonError::Unavailable(message).into());
Expand Down

0 comments on commit eb28b9e

Please sign in to comment.