Skip to content

Commit

Permalink
Switch async RwLock from async-lock to tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Sep 13, 2024
1 parent 492362a commit 93f131d
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 35 deletions.
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ supports-color = { version = "3.0.0", optional = true }
tempfile = "3.12.0"
thiserror = "1.0.63"
thread-priority = "1.1.0"
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::stream::FuturesUnordered;
Expand All @@ -31,6 +30,7 @@ use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::identity::Identity;
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_networking::utils::piece_provider::PieceProvider;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::info;

/// Get piece retry attempts number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
Expand All @@ -25,6 +24,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -178,7 +178,9 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
// TODO: switch to async-lock's `write_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
futures::executor::block_on(plotted_pieces.write()).delete_farm(farm_index);
// plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -235,7 +237,9 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
// TODO: switch to async-lock's `write_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
futures::executor::block_on(plotted_pieces.write()).delete_farm(farm_index);
// plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -322,7 +326,9 @@ fn process_farm_identify_message<'a>(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
// TODO: switch to async-lock's `write_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
futures::executor::block_on(plotted_pieces.write()).delete_farm(farm_index);
// plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -434,7 +440,9 @@ async fn initialize_farm(
drop(sector_update_handler);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();
// TODO: switch to async-lock's `write_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
let mut plotted_pieces = futures::executor::block_on(plotted_pieces.write());
// let mut plotted_pieces = plotted_pieces.write_blocking();

for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -51,7 +51,7 @@ use subspace_farmer_components::PieceGetter;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::{Barrier, RwLock as AsyncRwLock, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

/// Get piece retry attempts number.
Expand Down Expand Up @@ -713,7 +713,9 @@ where
{
let _span_guard = span.enter();

let mut plotted_pieces = plotted_pieces.write_blocking();
// TODO: switch to async-lock's `write_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
let mut plotted_pieces = futures::executor::block_on(plotted_pieces.write());
// let mut plotted_pieces = plotted_pieces.write_blocking();

if let Some(old_plotted_sector) = &old_plotted_sector {
plotted_pieces.delete_sector(farm_index, old_plotted_sector);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_lock::RwLock as AsyncRwLock;
use clap::Parser;
use prometheus_client::registry::Registry;
use std::collections::HashSet;
Expand All @@ -22,6 +21,7 @@ use subspace_networking::{
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, Instrument};

/// How many segment headers can be requested at a time.
Expand Down Expand Up @@ -139,7 +139,8 @@ where

let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
.try_read()?
.try_read()
.ok()?
.read_piece(piece_index)?
.in_current_span(),
None => {
Expand Down
9 changes: 5 additions & 4 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::farmer_cache::piece_cache_state::PieceCachesState;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
Expand All @@ -32,7 +31,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1208,7 +1207,8 @@ where
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
.piece_caches
.try_read()?
.try_read()
.ok()?
.contains_stored_piece(&distance_key)
{
// Note: We store our own provider records locally without local addresses
Expand All @@ -1225,7 +1225,8 @@ where
let found_fut = self
.plot_caches
.caches
.try_read()?
.try_read()
.ok()?
.iter()
.map(|plot_cache| {
let plot_cache = Arc::clone(plot_cache);
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{
Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
};
use async_lock::{Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, Semaphore};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -22,6 +20,7 @@ use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::PieceGetter;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, trace};

pub mod piece_validator;
Expand Down Expand Up @@ -282,6 +281,7 @@ where
let maybe_read_piece_fut = inner
.plotted_pieces
.try_read()
.ok()
.and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));

if let Some(read_piece_fut) = maybe_read_piece_fut {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use crate::utils::AsyncJoinOnDrop;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
use std::pin::Pin;
Expand All @@ -14,7 +14,7 @@ use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tokio::sync::watch;
use tokio::sync::{watch, RwLock as AsyncRwLock};
use tokio_stream::wrappers::WatchStream;
use tracing::{info, trace, warn};

Expand Down
8 changes: 5 additions & 3 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -85,7 +85,7 @@ use subspace_proof_of_space::Table;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, Barrier, Semaphore};
use tokio::sync::{broadcast, Barrier, RwLock as AsyncRwLock, Semaphore};
use tokio::task;
use tracing::{debug, error, info, trace, warn, Instrument, Span};

Expand Down Expand Up @@ -958,7 +958,9 @@ impl SingleDiskFarm {
*registry.lock(),
single_disk_farm_info.id(),
target_sector_count,
sectors_metadata.read_blocking().len() as SectorIndex,
// TODO: switch to async-lock's `read_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
futures::executor::block_on(sectors_metadata.read()).len() as SectorIndex,
// sectors_metadata.read_blocking().len() as SectorIndex,
))
});

Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::farm::{
use crate::node_client::NodeClient;
use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
use crate::single_disk_farm::Handlers;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::Mutex;
Expand All @@ -31,6 +31,7 @@ use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksumm
use subspace_farmer_components::ReadAtSync;
use subspace_proof_of_space::{Table, TableGenerator};
use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, trace, warn, Span};

/// How many non-fatal errors should happen in a row before farm is considered non-operational
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm/piece_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::farm::{FarmError, PieceReader};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
Expand All @@ -18,6 +18,7 @@ use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadataChecksummed};
use subspace_farmer_components::{reading, ReadAt, ReadAtAsync, ReadAtSync};
use subspace_proof_of_space::Table;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, warn};

#[derive(Debug)]
Expand Down
11 changes: 8 additions & 3 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::utils::AsyncJoinOnDrop;
use async_lock::RwLock as AsyncRwLock;
use async_trait::async_trait;
use bytes::BytesMut;
use parking_lot::RwLock;
Expand All @@ -23,6 +22,7 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use thiserror::Error;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::task;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -92,7 +92,9 @@ impl DiskPlotCache {
) -> Self {
info!("Checking plot cache contents, this can take a while");
let cached_pieces = {
let sectors_metadata = sectors_metadata.read_blocking();
// TODO: switch to async-lock's `read_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
let sectors_metadata = futures::executor::block_on(sectors_metadata.read());
// let sectors_metadata = sectors_metadata.read_blocking();
let mut element = vec![0; Self::element_size() as usize];
// Clippy complains about `RecordKey`, but it is not changing here, so it is fine
#[allow(clippy::mutable_key_type)]
Expand Down Expand Up @@ -170,7 +172,10 @@ impl DiskPlotCache {

let element_offset = u64::from(offset) * u64::from(Self::element_size());
// Blocking read is fine because writes in farmer are very rare and very brief
let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
// TODO: switch to async-lock's `read_blocking` once https://github.com/smol-rs/async-lock/issues/91 is fixed
let plotted_bytes =
self.sector_size * futures::executor::block_on(sectors_metadata.read()).len() as u64;
// let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;

// Make sure offset is after anything that is already plotted
if element_offset < plotted_bytes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn basic() {
);

// Can't store pieces when all sectors are plotted
sectors_metadata.write_blocking().resize(
sectors_metadata.write().await.resize(
usize::from(TARGET_SECTOR_COUNT),
dummy_sector_metadata.clone(),
);
Expand All @@ -106,7 +106,7 @@ async fn basic() {
);

// Clear plotted sectors and reopen
sectors_metadata.write_blocking().clear();
sectors_metadata.write().await.clear();
let disk_plot_cache = DiskPlotCache::new(
&file,
&sectors_metadata,
Expand Down Expand Up @@ -177,7 +177,8 @@ async fn basic() {
MaybePieceStoredResult::Yes
);
sectors_metadata
.write_blocking()
.write()
.await
.resize(usize::from(TARGET_SECTOR_COUNT - 1), dummy_sector_metadata);
assert_matches!(
disk_plot_cache.is_piece_maybe_stored(&record_key_1),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::farm::{FarmError, PlottedSectors};
use async_lock::RwLock as AsyncRwLock;
use async_trait::async_trait;
use futures::{stream, Stream};
use std::sync::Arc;
use subspace_core_primitives::{PieceOffset, PublicKey, SectorId};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_farmer_components::FarmerProtocolInfo;
use tokio::sync::RwLock as AsyncRwLock;

/// Getter for single disk plotted sectors
#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows
use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_lock::Mutex as AsyncMutex;
use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesOrdered;
use futures::{select, FutureExt, SinkExt, StreamExt};
Expand All @@ -29,7 +29,7 @@ use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use thiserror::Error;
use tokio::sync::watch;
use tokio::sync::{watch, RwLock as AsyncRwLock};
use tokio::task;
use tracing::{debug, info, info_span, trace, warn, Instrument};

Expand Down

0 comments on commit 93f131d

Please sign in to comment.