From a6b5d26b22dffe2225fcac632a55161dd659a62c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Fri, 13 Sep 2024 17:13:23 +0300 Subject: [PATCH] Switch from `async-lock` to `tokio` primitives --- Cargo.lock | 2 -- crates/subspace-farmer-components/Cargo.toml | 1 - crates/subspace-farmer-components/src/plotting.rs | 5 ++--- crates/subspace-farmer/Cargo.toml | 3 +-- .../subspace-farmer/commands/cluster/controller.rs | 2 +- .../commands/cluster/controller/farms.rs | 10 +++++----- .../bin/subspace-farmer/commands/cluster/farmer.rs | 3 +-- .../bin/subspace-farmer/commands/cluster/plotter.rs | 3 +-- .../src/bin/subspace-farmer/commands/farm.rs | 7 ++++--- .../bin/subspace-farmer/commands/shared/network.rs | 5 +++-- crates/subspace-farmer/src/cluster/controller.rs | 2 +- crates/subspace-farmer/src/farmer_cache.rs | 9 +++++---- crates/subspace-farmer/src/farmer_piece_getter.rs | 13 +++++++------ .../src/node_client/caching_proxy_node_client.rs | 3 +-- .../src/node_client/rpc_node_client.rs | 2 +- crates/subspace-farmer/src/plotter/cpu.rs | 5 ++--- crates/subspace-farmer/src/plotter/gpu.rs | 5 ++--- crates/subspace-farmer/src/plotter/gpu/cuda.rs | 4 ++-- crates/subspace-farmer/src/single_disk_farm.rs | 5 ++--- .../subspace-farmer/src/single_disk_farm/farming.rs | 4 ++-- .../src/single_disk_farm/piece_reader.rs | 4 ++-- .../src/single_disk_farm/plot_cache.rs | 7 ++++--- .../src/single_disk_farm/plot_cache/tests.rs | 7 ++++--- .../src/single_disk_farm/plotted_sectors.rs | 2 +- .../src/single_disk_farm/plotting.rs | 5 ++--- 25 files changed, 56 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78f0dd0704..1b2d1d7a90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12631,7 +12631,6 @@ name = "subspace-farmer" version = "0.1.0" dependencies = [ "anyhow", - "async-lock 3.4.0", "async-nats", "async-trait", "backoff", @@ -12691,7 +12690,6 @@ dependencies = [ name = "subspace-farmer-components" version = "0.1.0" dependencies = [ - "async-lock 3.4.0", "async-trait", "backoff", "bitvec", diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 88aa5e4c69..3dafc859a5 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -16,7 +16,6 @@ include = [ bench = false [dependencies] -async-lock = "3.3.0" async-trait = "0.1.81" backoff = { version = "0.4.0", features = ["futures", "tokio"] } bitvec = "1.0.1" diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 508ba3aa27..07a4651ca1 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -12,7 +12,6 @@ use crate::sector::{ }; use crate::segment_reconstruction::recover_missing_piece; use crate::{FarmerProtocolInfo, PieceGetter}; -use async_lock::Mutex as AsyncMutex; use backoff::future::retry; use backoff::{Error as BackoffError, ExponentialBackoff}; use futures::stream::FuturesUnordered; @@ -34,7 +33,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::{AcquireError, Semaphore}; +use tokio::sync::{AcquireError, Mutex as AsyncMutex, Semaphore}; use tracing::{debug, trace, warn}; const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1; @@ -397,7 +396,7 @@ where loop { // Take mutex briefly to make sure encoding is allowed right now - global_mutex.lock_blocking(); + let _ = global_mutex.blocking_lock(); // This instead of `while` above because otherwise mutex will be held // for the duration of the loop and will limit concurrency to 1 record diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 98dc1198cf..0d70fda086 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -17,7 +17,6 @@ required-features = ["binary"] [dependencies] anyhow = "1.0.86" -async-lock = "3.3.0" async-nats = { version = "0.35.1", optional = true } async-trait = "0.1.81" backoff = { version = "0.4.0", features = ["futures", "tokio"] } @@ -65,7 +64,7 @@ supports-color = { version = "3.0.0", optional = true } tempfile = "3.12.0" thiserror = "1.0.63" thread-priority = "1.1.0" -tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio-stream = { version = "0.1.15", features = ["sync"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 6f40a5be03..2f41466447 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -6,7 +6,6 @@ use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex}; use crate::commands::shared::derive_libp2p_keypair; use crate::commands::shared::network::{configure_network, NetworkArgs}; use anyhow::anyhow; -use async_lock::RwLock as AsyncRwLock; use backoff::ExponentialBackoff; use clap::{Parser, ValueHint}; use futures::stream::FuturesUnordered; @@ -31,6 +30,7 @@ use subspace_farmer::node_client::NodeClient; use subspace_farmer::single_disk_farm::identity::Identity; use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; use subspace_networking::utils::piece_provider::PieceProvider; +use tokio::sync::RwLock as AsyncRwLock; use tracing::info; /// Get piece retry attempts number. diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index e8efec6895..e0b03e54a7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -6,7 +6,6 @@ use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL; use anyhow::anyhow; -use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; use futures::future::FusedFuture; use futures::stream::FuturesUnordered; @@ -25,6 +24,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use tokio::sync::RwLock as AsyncRwLock; use tokio::task; use tokio::time::MissedTickBehavior; use tracing::{error, info, trace, warn}; @@ -178,7 +178,7 @@ pub(super) async fn maintain_farms( let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { - plotted_pieces.write_blocking().delete_farm(farm_index); + plotted_pieces.blocking_write().delete_farm(farm_index); }); if let Err(error) = delete_farm_fut.await { error!( @@ -235,7 +235,7 @@ pub(super) async fn maintain_farms( let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { - plotted_pieces.write_blocking().delete_farm(farm_index); + plotted_pieces.blocking_write().delete_farm(farm_index); }); if let Err(error) = delete_farm_fut.await { error!( @@ -322,7 +322,7 @@ fn process_farm_identify_message<'a>( let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { - plotted_pieces.write_blocking().delete_farm(farm_index); + plotted_pieces.blocking_write().delete_farm(farm_index); }); if let Err(error) = delete_farm_fut.await { error!( @@ -434,7 +434,7 @@ async fn initialize_farm( drop(sector_update_handler); let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock()); let add_buffered_sectors_fut = task::spawn_blocking(move || { - let mut plotted_pieces = plotted_pieces.write_blocking(); + let mut plotted_pieces = plotted_pieces.blocking_write(); for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer { if let Some(old_plotted_sector) = old_plotted_sector { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index 16ff9de26d..ecdf1acbd4 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -2,7 +2,6 @@ use crate::commands::shared::DiskFarm; use anyhow::anyhow; -use async_lock::Mutex as AsyncMutex; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::Parser; @@ -35,7 +34,7 @@ use subspace_farmer::utils::{ }; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_proof_of_space::Table; -use tokio::sync::{Barrier, Semaphore}; +use tokio::sync::{Barrier, Mutex as AsyncMutex, Semaphore}; use tracing::{error, info, info_span, warn, Instrument}; const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index c898a5ffaa..4da8c6b8db 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -1,6 +1,5 @@ use crate::commands::shared::PlottingThreadPriority; use anyhow::anyhow; -use async_lock::Mutex as AsyncMutex; use clap::Parser; use futures::{select, FutureExt}; use prometheus_client::registry::Registry; @@ -27,7 +26,7 @@ use subspace_farmer::utils::{ }; use subspace_farmer_components::PieceGetter; use subspace_proof_of_space::Table; -use tokio::sync::Semaphore; +use tokio::sync::{Mutex as AsyncMutex, Semaphore}; use tracing::info; const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 8b86bbf769..b1b51abead 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -2,7 +2,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; use crate::utils::shutdown_signal; use anyhow::anyhow; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::{Parser, ValueHint}; @@ -51,7 +50,8 @@ use subspace_farmer_components::PieceGetter; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; -use tokio::sync::{Barrier, Semaphore}; +use tokio::sync::{Barrier, Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; +use tokio::task; use tracing::{error, info, info_span, warn, Instrument}; /// Get piece retry attempts number. @@ -713,7 +713,8 @@ where { let _span_guard = span.enter(); - let mut plotted_pieces = plotted_pieces.write_blocking(); + let mut plotted_pieces = + task::block_in_place(|| plotted_pieces.blocking_write()); if let Some(old_plotted_sector) = &old_plotted_sector { plotted_pieces.delete_sector(farm_index, old_plotted_sector); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 8f1ed29ee0..e5ad780b60 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -1,4 +1,3 @@ -use async_lock::RwLock as AsyncRwLock; use clap::Parser; use prometheus_client::registry::Registry; use std::collections::HashSet; @@ -22,6 +21,7 @@ use subspace_networking::{ SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, }; use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST; +use tokio::sync::RwLock as AsyncRwLock; use tracing::{debug, error, info, Instrument}; /// How many segment headers can be requested at a time. @@ -139,7 +139,8 @@ where let read_piece_fut = match weak_plotted_pieces.upgrade() { Some(plotted_pieces) => plotted_pieces - .try_read()? + .try_read() + .ok()? .read_piece(piece_index)? .in_current_span(), None => { diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index ddc4730917..2c4856168c 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -14,7 +14,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset}; use crate::farmer_cache::FarmerCache; use crate::node_client::{Error as NodeClientError, NodeClient}; use anyhow::anyhow; -use async_lock::Semaphore; use async_nats::HeaderValue; use async_trait::async_trait; use futures::{select, FutureExt, Stream, StreamExt}; @@ -29,6 +28,7 @@ use subspace_farmer_components::PieceGetter; use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; +use tokio::sync::Semaphore; use tracing::{debug, trace, warn}; /// Broadcast sent by controllers requesting farmers to identify themselves diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 1d4a8e81d6..6fb19111da 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -13,7 +13,6 @@ use crate::farmer_cache::metrics::FarmerCacheMetrics; use crate::farmer_cache::piece_cache_state::PieceCachesState; use crate::node_client::NodeClient; use crate::utils::run_future_in_dedicated_thread; -use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; @@ -32,7 +31,7 @@ use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::{KeyWithDistance, LocalRecordProvider}; use tokio::runtime::Handle; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock as AsyncRwLock}; use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, trace, warn}; @@ -1208,7 +1207,8 @@ where let distance_key = KeyWithDistance::new(self.peer_id, key.clone()); if self .piece_caches - .try_read()? + .try_read() + .ok()? .contains_stored_piece(&distance_key) { // Note: We store our own provider records locally without local addresses @@ -1225,7 +1225,8 @@ where let found_fut = self .plot_caches .caches - .try_read()? + .try_read() + .ok()? .iter() .map(|plot_cache| { let plot_cache = Arc::clone(plot_cache); diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index 5ced33ac58..20bba024d2 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -3,9 +3,6 @@ use crate::farm::plotted_pieces::PlottedPieces; use crate::farmer_cache::FarmerCache; use crate::node_client::NodeClient; -use async_lock::{ - Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore, -}; use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; @@ -22,6 +19,9 @@ use subspace_core_primitives::{Piece, PieceIndex}; use subspace_farmer_components::PieceGetter; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator}; +use tokio::sync::{ + Mutex as AsyncMutex, OwnedMutexGuard as AsyncOwnedMutexGuard, RwLock as AsyncRwLock, Semaphore, +}; use tracing::{debug, error, trace}; pub mod piece_validator; @@ -30,7 +30,7 @@ const MAX_RANDOM_WALK_ROUNDS: usize = 15; struct InProgressPieceGetting<'a> { piece_index: PieceIndex, - in_progress_piece: AsyncMutexGuardArc>, + in_progress_piece: AsyncOwnedMutexGuard>, in_progress_pieces: &'a Mutex>>>>, } @@ -63,8 +63,8 @@ impl<'a> InProgressPiece<'a> { // Take lock before anything else, set to `None` when another piece getting is already in // progress let mut local_in_progress_piece_guard = Some( - in_progress_piece_mutex - .try_lock_arc() + Arc::clone(&in_progress_piece_mutex) + .try_lock_owned() .expect("Just created; qed"), ); let in_progress_piece_mutex = in_progress_pieces @@ -282,6 +282,7 @@ where let maybe_read_piece_fut = inner .plotted_pieces .try_read() + .ok() .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index)); if let Some(read_piece_fut) = maybe_read_piece_fut { diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index 33bf6856fb..b825a9d9bf 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -3,7 +3,6 @@ use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; use crate::utils::AsyncJoinOnDrop; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use futures::{select, FutureExt, Stream, StreamExt}; use std::pin::Pin; @@ -14,7 +13,7 @@ use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST, }; -use tokio::sync::watch; +use tokio::sync::{watch, Mutex as AsyncMutex, RwLock as AsyncRwLock}; use tokio_stream::wrappers::WatchStream; use tracing::{info, trace, warn}; diff --git a/crates/subspace-farmer/src/node_client/rpc_node_client.rs b/crates/subspace-farmer/src/node_client/rpc_node_client.rs index 7f3946fa71..156e94d8ee 100644 --- a/crates/subspace-farmer/src/node_client/rpc_node_client.rs +++ b/crates/subspace-farmer/src/node_client/rpc_node_client.rs @@ -1,7 +1,6 @@ //! Node client implementation that connects to node via RPC (WebSockets) use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; -use async_lock::Semaphore; use async_trait::async_trait; use futures::{Stream, StreamExt}; use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT}; @@ -13,6 +12,7 @@ use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; +use tokio::sync::Semaphore; /// TODO: Node is having a hard time responding for many piece requests, specifically this results /// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409 diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index 0d77736cc4..c6245cda93 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -6,7 +6,6 @@ use crate::plotter::cpu::metrics::CpuPlotterMetrics; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; -use async_lock::Mutex as AsyncMutex; use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; @@ -32,7 +31,7 @@ use subspace_farmer_components::plotting::{ }; use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_proof_of_space::Table; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{warn, Instrument}; @@ -302,7 +301,7 @@ where } // Take mutex briefly to make sure plotting is allowed right now - global_mutex.lock().await; + let _ = global_mutex.lock().await; let downloading_start = Instant::now(); diff --git a/crates/subspace-farmer/src/plotter/gpu.rs b/crates/subspace-farmer/src/plotter/gpu.rs index b090f15e90..1b566bcdae 100644 --- a/crates/subspace-farmer/src/plotter/gpu.rs +++ b/crates/subspace-farmer/src/plotter/gpu.rs @@ -9,7 +9,6 @@ use crate::plotter::gpu::gpu_encoders_manager::GpuRecordsEncoderManager; use crate::plotter::gpu::metrics::GpuPlotterMetrics; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::utils::AsyncJoinOnDrop; -use async_lock::Mutex as AsyncMutex; use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; @@ -32,7 +31,7 @@ use subspace_farmer_components::plotting::{ PlottingError, RecordsEncoder, }; use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::{warn, Instrument}; @@ -305,7 +304,7 @@ where } // Take mutex briefly to make sure plotting is allowed right now - global_mutex.lock().await; + let _ = global_mutex.lock().await; let downloading_start = Instant::now(); diff --git a/crates/subspace-farmer/src/plotter/gpu/cuda.rs b/crates/subspace-farmer/src/plotter/gpu/cuda.rs index ffdc9aab7d..48bbdbe7f6 100644 --- a/crates/subspace-farmer/src/plotter/gpu/cuda.rs +++ b/crates/subspace-farmer/src/plotter/gpu/cuda.rs @@ -1,7 +1,6 @@ //! CUDA GPU records encoder use crate::plotter::gpu::GpuRecordsEncoder; -use async_lock::Mutex as AsyncMutex; use parking_lot::Mutex; use rayon::{ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -10,6 +9,7 @@ use subspace_core_primitives::{HistorySize, PieceOffset, Record, SectorId}; use subspace_farmer_components::plotting::RecordsEncoder; use subspace_farmer_components::sector::SectorContentsMap; use subspace_proof_of_space_gpu::cuda::CudaDevice; +use tokio::sync::Mutex as AsyncMutex; /// CUDA implementation of [`GpuRecordsEncoder`] #[derive(Debug)] @@ -48,7 +48,7 @@ impl RecordsEncoder for CudaRecordsEncoder { rayon::scope(|scope| { scope.spawn_broadcast(|_scope, _ctx| loop { // Take mutex briefly to make sure encoding is allowed right now - self.global_mutex.lock_blocking(); + let _ = self.global_mutex.blocking_lock(); // This instead of `while` above because otherwise mutex will be held for the // duration of the loop and will limit concurrency to 1 record diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9d491e6518..64148e5352 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -42,7 +42,6 @@ use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::{farm, KNOWN_PEERS_CACHE_SIZE}; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::{mpsc, oneshot}; @@ -85,7 +84,7 @@ use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::{broadcast, Barrier, Semaphore}; +use tokio::sync::{broadcast, Barrier, Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; use tokio::task; use tracing::{debug, error, info, trace, warn, Instrument, Span}; @@ -958,7 +957,7 @@ impl SingleDiskFarm { *registry.lock(), single_disk_farm_info.id(), target_sector_count, - sectors_metadata.read_blocking().len() as SectorIndex, + task::block_in_place(|| sectors_metadata.blocking_read()).len() as SectorIndex, )) }); diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 1da957fe45..f0a2c55b28 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -11,7 +11,6 @@ use crate::farm::{ use crate::node_client::NodeClient; use crate::single_disk_farm::metrics::SingleDiskFarmMetrics; use crate::single_disk_farm::Handlers; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::mpsc; use futures::StreamExt; use parking_lot::Mutex; @@ -31,6 +30,7 @@ use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksumm use subspace_farmer_components::ReadAtSync; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; +use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use tracing::{debug, error, info, trace, warn, Span}; /// How many non-fatal errors should happen in a row before farm is considered non-operational @@ -257,7 +257,7 @@ where let slot = slot_info.slot_number; // Take mutex briefly to make sure farming is allowed right now - global_mutex.lock().await; + let _ = global_mutex.lock().await; let mut problematic_sectors = Vec::new(); let result: Result<(), FarmingError> = try { diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index 33bd304e77..96847f14da 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -3,7 +3,6 @@ use crate::farm::{FarmError, PieceReader}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, StreamExt}; @@ -18,6 +17,7 @@ use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed}; use subspace_farmer_components::{reading, ReadAt, ReadAtAsync, ReadAtSync}; use subspace_proof_of_space::Table; +use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use tracing::{error, warn}; #[derive(Debug)] @@ -194,7 +194,7 @@ async fn read_pieces( let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64); // Take mutex briefly to make sure piece reading is allowed right now - global_mutex.lock().await; + let _ = global_mutex.lock().await; let maybe_piece = read_piece::( &public_key, diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs index 5d3d9e33c6..9462e1f31c 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -7,7 +7,6 @@ use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::utils::AsyncJoinOnDrop; -use async_lock::RwLock as AsyncRwLock; use async_trait::async_trait; use bytes::BytesMut; use parking_lot::RwLock; @@ -23,6 +22,7 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; use thiserror::Error; +use tokio::sync::RwLock as AsyncRwLock; use tokio::task; use tracing::{debug, info, warn}; @@ -92,7 +92,7 @@ impl DiskPlotCache { ) -> Self { info!("Checking plot cache contents, this can take a while"); let cached_pieces = { - let sectors_metadata = sectors_metadata.read_blocking(); + let sectors_metadata = task::block_in_place(|| sectors_metadata.blocking_read()); let mut element = vec![0; Self::element_size() as usize]; // Clippy complains about `RecordKey`, but it is not changing here, so it is fine #[allow(clippy::mutable_key_type)] @@ -170,7 +170,8 @@ impl DiskPlotCache { let element_offset = u64::from(offset) * u64::from(Self::element_size()); // Blocking read is fine because writes in farmer are very rare and very brief - let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64; + let plotted_bytes = self.sector_size + * task::block_in_place(|| sectors_metadata.blocking_read()).len() as u64; // Make sure offset is after anything that is already plotted if element_offset < plotted_bytes { diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index 00b81c7868..f793d17ed5 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -92,7 +92,7 @@ async fn basic() { ); // Can't store pieces when all sectors are plotted - sectors_metadata.write_blocking().resize( + sectors_metadata.write().await.resize( usize::from(TARGET_SECTOR_COUNT), dummy_sector_metadata.clone(), ); @@ -106,7 +106,7 @@ async fn basic() { ); // Clear plotted sectors and reopen - sectors_metadata.write_blocking().clear(); + sectors_metadata.write().await.clear(); let disk_plot_cache = DiskPlotCache::new( &file, §ors_metadata, @@ -177,7 +177,8 @@ async fn basic() { MaybePieceStoredResult::Yes ); sectors_metadata - .write_blocking() + .write() + .await .resize(usize::from(TARGET_SECTOR_COUNT - 1), dummy_sector_metadata); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_1), diff --git a/crates/subspace-farmer/src/single_disk_farm/plotted_sectors.rs b/crates/subspace-farmer/src/single_disk_farm/plotted_sectors.rs index 618fb7f89c..6fdd97dc90 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotted_sectors.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotted_sectors.rs @@ -1,5 +1,4 @@ use crate::farm::{FarmError, PlottedSectors}; -use async_lock::RwLock as AsyncRwLock; use async_trait::async_trait; use futures::{stream, Stream}; use std::sync::Arc; @@ -7,6 +6,7 @@ use subspace_core_primitives::{PieceOffset, PublicKey, SectorId}; use subspace_farmer_components::plotting::PlottedSector; use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_farmer_components::FarmerProtocolInfo; +use tokio::sync::RwLock as AsyncRwLock; /// Getter for single disk plotted sectors #[derive(Debug)] diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 1e0f7d4615..23a93080d6 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -7,7 +7,6 @@ use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA, }; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::{mpsc, oneshot}; use futures::stream::FuturesOrdered; use futures::{select, FutureExt, SinkExt, StreamExt}; @@ -29,7 +28,7 @@ use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::plotting::PlottedSector; use subspace_farmer_components::sector::SectorMetadataChecksummed; use thiserror::Error; -use tokio::sync::watch; +use tokio::sync::{watch, Mutex as AsyncMutex, RwLock as AsyncRwLock}; use tokio::task; use tracing::{debug, info, info_span, trace, warn, Instrument}; @@ -588,7 +587,7 @@ async fn plot_single_sector_internal( { // Take mutex briefly to make sure writing is allowed right now - global_mutex.lock().await; + let _ = global_mutex.lock().await; if let Some(metrics) = metrics { metrics.sector_writing.inc();