From 10923f130c3b348146b5ea1c6b5975188489baa2 Mon Sep 17 00:00:00 2001 From: Sebastian Messmer Date: Fri, 2 Dec 2022 19:55:05 -0800 Subject: [PATCH] Also add lock_all_entries to trait [wip] this part currently makes the compiler ICE --- src/lockable_hash_map.rs | 112 +++----------------------- src/lockable_lru_cache.rs | 112 +++----------------------- src/lockable_trait.rs | 164 ++++++++++++++++++++++++++++++++++++++ src/lockpool.rs | 17 ++++ 4 files changed, 205 insertions(+), 200 deletions(-) diff --git a/src/lockable_hash_map.rs b/src/lockable_hash_map.rs index e6e2a70..dffe691 100644 --- a/src/lockable_hash_map.rs +++ b/src/lockable_hash_map.rs @@ -331,6 +331,18 @@ where fn keys_with_entries_or_locked(&self) -> Vec { self.map_impl.keys_with_entries_or_locked() } + + #[inline] + async fn lock_all_entries(&self) -> impl Stream>::Guard<'_>> { + LockableMapImpl::lock_all_entries(&self.map_impl).await + } + + #[inline] + async fn lock_all_entries_owned( + self: &Arc, + ) -> impl Stream>::OwnedGuard> { + LockableMapImpl::lock_all_entries(Arc::clone(self)).await + } } impl LockableHashMap @@ -355,106 +367,6 @@ where map_impl: LockableMapImpl::new(), } } - - /// Lock all entries of the cache once. The result of this is a [Stream] that will - /// produce the corresponding lock guards. If items are locked, the [Stream] will - /// produce them as they become unlocked and can be locked by the stream. - /// - /// The returned stream is `async` and therefore may return items much later than - /// when this function was called, but it only returns an entry if it existed - /// or was locked at the time this function was called, and still exists when - /// the stream is returning the entry. - /// For any entry currently locked by another thread or task while this function - /// is called, the following rules apply: - /// - If that thread/task creates the entry => the stream will return it - /// - If that thread/task removes the entry => the stream will not return it - /// - If the entry was not pre-existing and that thread/task does not create it => the stream will not return it. - /// - /// Examples - /// ----- - /// ``` - /// use futures::stream::StreamExt; - /// use lockable::{AsyncLimit, Lockable, LockableHashMap}; - /// - /// # tokio::runtime::Runtime::new().unwrap().block_on(async { - /// let lockable_map = LockableHashMap::::new(); - /// - /// // Insert two entries - /// lockable_map - /// .async_lock(4, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 4")); - /// lockable_map - /// .async_lock(5, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 5")); - /// - /// // Lock all entries and add them to an `entries` vector - /// let mut entries: Vec<(i64, String)> = Vec::new(); - /// let mut stream = lockable_map.lock_all_entries().await; - /// while let Some(guard) = stream.next().await { - /// entries.push((*guard.key(), guard.value().unwrap().clone())); - /// } - /// - /// // `entries` now contains both entries, but in an arbitrary order - /// assert_eq!(2, entries.len()); - /// assert!(entries.contains(&(4, String::from("Value 4")))); - /// assert!(entries.contains(&(5, String::from("Value 5")))); - /// # Ok::<(), lockable::Never>(())}).unwrap(); - /// ``` - pub async fn lock_all_entries( - &self, - ) -> impl Stream>::Guard<'_>> { - LockableMapImpl::lock_all_entries(&self.map_impl).await - } - - /// Lock all entries of the cache once. The result of this is a [Stream] that will - /// produce the corresponding lock guards. If items are locked, the [Stream] will - /// produce them as they become unlocked and can be locked by the stream. - /// - /// This is identical to [LockableHashMap::lock_all_entries], but but it works on - /// an `Arc` instead of a [LockableHashMap] and returns a - /// [Lockable::OwnedGuard] that binds its lifetime to the [LockableHashMap] in that - /// [Arc]. Such a [Lockable::OwnedGuard] can be more easily moved around or cloned. - /// - /// Examples - /// ----- - /// ``` - /// use futures::stream::StreamExt; - /// use lockable::{AsyncLimit, Lockable, LockableHashMap}; - /// use std::sync::Arc; - /// - /// # tokio::runtime::Runtime::new().unwrap().block_on(async { - /// let lockable_map = Arc::new(LockableHashMap::::new()); - /// - /// // Insert two entries - /// lockable_map - /// .async_lock(4, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 4")); - /// lockable_map - /// .async_lock(5, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 5")); - /// - /// // Lock all entries and add them to an `entries` vector - /// let mut entries: Vec<(i64, String)> = Vec::new(); - /// let mut stream = lockable_map.lock_all_entries_owned().await; - /// while let Some(guard) = stream.next().await { - /// entries.push((*guard.key(), guard.value().unwrap().clone())); - /// } - /// - /// // `entries` now contains both entries, but in an arbitrary order - /// assert_eq!(2, entries.len()); - /// assert!(entries.contains(&(4, String::from("Value 4")))); - /// assert!(entries.contains(&(5, String::from("Value 5")))); - /// # Ok::<(), lockable::Never>(())}).unwrap(); - /// ``` - pub async fn lock_all_entries_owned( - self: &Arc, - ) -> impl Stream>::OwnedGuard> { - LockableMapImpl::lock_all_entries(Arc::clone(self)).await - } } impl Default for LockableHashMap diff --git a/src/lockable_lru_cache.rs b/src/lockable_lru_cache.rs index d934d36..6c43d33 100644 --- a/src/lockable_lru_cache.rs +++ b/src/lockable_lru_cache.rs @@ -402,6 +402,18 @@ where fn keys_with_entries_or_locked(&self) -> Vec { self.map_impl.keys_with_entries_or_locked() } + + #[inline] + async fn lock_all_entries(&self) -> impl Stream>::Guard<'_>> { + LockableMapImpl::lock_all_entries(&self.map_impl).await + } + + #[inline] + async fn lock_all_entries_owned( + self: &Arc, + ) -> impl Stream>::OwnedGuard> { + LockableMapImpl::lock_all_entries(Arc::clone(self)).await + } } impl LockableLruCache @@ -429,106 +441,6 @@ where } } - /// Lock all entries of the cache once. The result of this is a [Stream] that will - /// produce the corresponding lock guards. If items are locked, the [Stream] will - /// produce them as they become unlocked and can be locked by the stream. - /// - /// The returned stream is `async` and therefore may return items much later than - /// when this function was called, but it only returns an entry if it existed - /// or was locked at the time this function was called, and still exists when - /// the stream is returning the entry. - /// For any entry currently locked by another thread or task while this function - /// is called, the following rules apply: - /// - If that thread/task creates the entry => the stream will return it - /// - If that thread/task removes the entry => the stream will not return it - /// - If the entry was not pre-existing and that thread/task does not create it => the stream will not return it. - /// - /// Examples - /// ----- - /// ``` - /// use futures::stream::StreamExt; - /// use lockable::{AsyncLimit, Lockable, LockableLruCache}; - /// - /// # tokio::runtime::Runtime::new().unwrap().block_on(async { - /// let lockable_map = LockableLruCache::::new(); - /// - /// // Insert two entries - /// lockable_map - /// .async_lock(4, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 4")); - /// lockable_map - /// .async_lock(5, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 5")); - /// - /// // Lock all entries and add them to an `entries` vector - /// let mut entries: Vec<(i64, String)> = Vec::new(); - /// let mut stream = lockable_map.lock_all_entries().await; - /// while let Some(guard) = stream.next().await { - /// entries.push((*guard.key(), guard.value().unwrap().clone())); - /// } - /// - /// // `entries` now contains both entries, but in an arbitrary order - /// assert_eq!(2, entries.len()); - /// assert!(entries.contains(&(4, String::from("Value 4")))); - /// assert!(entries.contains(&(5, String::from("Value 5")))); - /// # Ok::<(), lockable::Never>(())}).unwrap(); - /// ``` - pub async fn lock_all_entries( - &self, - ) -> impl Stream>::Guard<'_>> { - LockableMapImpl::lock_all_entries(&self.map_impl).await - } - - /// Lock all entries of the cache once. The result of this is a [Stream] that will - /// produce the corresponding lock guards. If items are locked, the [Stream] will - /// produce them as they become unlocked and can be locked by the stream. - /// - /// This is identical to [LockableLruCache::lock_all_entries], but it works on - /// an `Arc` instead of a [LockableLruCache] and returns a - /// [Lockable::OwnedGuard] that binds its lifetime to the [LockableLruCache] in that - /// [Arc]. Such a [Lockable::OwnedGuard] can be more easily moved around or cloned. - /// - /// Examples - /// ----- - /// ``` - /// use futures::stream::StreamExt; - /// use lockable::{AsyncLimit, Lockable, LockableLruCache}; - /// use std::sync::Arc; - /// - /// # tokio::runtime::Runtime::new().unwrap().block_on(async { - /// let lockable_map = Arc::new(LockableLruCache::::new()); - /// - /// // Insert two entries - /// lockable_map - /// .async_lock(4, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 4")); - /// lockable_map - /// .async_lock(5, AsyncLimit::no_limit()) - /// .await? - /// .insert(String::from("Value 5")); - /// - /// // Lock all entries and add them to an `entries` vector - /// let mut entries: Vec<(i64, String)> = Vec::new(); - /// let mut stream = lockable_map.lock_all_entries_owned().await; - /// while let Some(guard) = stream.next().await { - /// entries.push((*guard.key(), guard.value().unwrap().clone())); - /// } - /// - /// // `entries` now contains both entries, but in an arbitrary order - /// assert_eq!(2, entries.len()); - /// assert!(entries.contains(&(4, String::from("Value 4")))); - /// assert!(entries.contains(&(5, String::from("Value 5")))); - /// # Ok::<(), lockable::Never>(())}).unwrap(); - /// ``` - pub async fn lock_all_entries_owned( - self: &Arc, - ) -> impl Stream>::OwnedGuard> { - LockableMapImpl::lock_all_entries(Arc::clone(self)).await - } - /// Lock all entries that are currently unlocked and that were unlocked for at least /// the given `duration`. This follows the LRU nature of the cache. /// diff --git a/src/lockable_trait.rs b/src/lockable_trait.rs index a5a77ba..13b2f49 100644 --- a/src/lockable_trait.rs +++ b/src/lockable_trait.rs @@ -1,3 +1,4 @@ +use futures::stream::Stream; use std::future::Future; use std::sync::Arc; @@ -781,4 +782,167 @@ pub trait Lockable { /// # Ok::<(), lockable::Never>(())}).unwrap(); /// ``` fn keys_with_entries_or_locked(&self) -> Vec; + + /// Lock all entries of the cache once. The result of this is a [Stream] that will + /// produce the corresponding lock guards. If items are locked, the [Stream] will + /// produce them as they become unlocked and can be locked by the stream. + /// + /// The returned stream is `async` and therefore may return items much later than + /// when this function was called, but it only returns an entry if it existed + /// or was locked at the time this function was called, and still exists when + /// the stream is returning the entry. + /// For any entry currently locked by another thread or task while this function + /// is called, the following rules apply: + /// - If that thread/task creates the entry => the stream will return it + /// - If that thread/task removes the entry => the stream will not return it + /// - If the entry was not pre-existing and that thread/task does not create it => the stream will not return it. + /// + /// Example (LockableHashMap) + /// ----- + /// ``` + /// use futures::stream::StreamExt; + /// use lockable::{AsyncLimit, Lockable, LockableHashMap}; + /// + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// let lockable_map = LockableHashMap::::new(); + /// + /// // Insert two entries + /// lockable_map + /// .async_lock(4, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 4")); + /// lockable_map + /// .async_lock(5, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 5")); + /// + /// // Lock all entries and add them to an `entries` vector + /// let mut entries: Vec<(i64, String)> = Vec::new(); + /// let mut stream = lockable_map.lock_all_entries().await; + /// while let Some(guard) = stream.next().await { + /// entries.push((*guard.key(), guard.value().unwrap().clone())); + /// } + /// + /// // `entries` now contains both entries, but in an arbitrary order + /// assert_eq!(2, entries.len()); + /// assert!(entries.contains(&(4, String::from("Value 4")))); + /// assert!(entries.contains(&(5, String::from("Value 5")))); + /// # Ok::<(), lockable::Never>(())}).unwrap(); + /// ``` + /// + /// Example (LockableLruCache) + /// ----- + /// ``` + #[cfg_attr(not(feature = "lru"), doc = "```\n```ignore")] + /// use futures::stream::StreamExt; + /// use lockable::{AsyncLimit, Lockable, LockableLruCache}; + /// + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// let lockable_map = LockableLruCache::::new(); + /// + /// // Insert two entries + /// lockable_map + /// .async_lock(4, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 4")); + /// lockable_map + /// .async_lock(5, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 5")); + /// + /// // Lock all entries and add them to an `entries` vector + /// let mut entries: Vec<(i64, String)> = Vec::new(); + /// let mut stream = lockable_map.lock_all_entries().await; + /// while let Some(guard) = stream.next().await { + /// entries.push((*guard.key(), guard.value().unwrap().clone())); + /// } + /// + /// // `entries` now contains both entries, but in an arbitrary order + /// assert_eq!(2, entries.len()); + /// assert!(entries.contains(&(4, String::from("Value 4")))); + /// assert!(entries.contains(&(5, String::from("Value 5")))); + /// # Ok::<(), lockable::Never>(())}).unwrap(); + /// ``` + async fn lock_all_entries(&self) -> impl Stream>::Guard<'_>>; + + /// Lock all entries of the cache once. The result of this is a [Stream] that will + /// produce the corresponding lock guards. If items are locked, the [Stream] will + /// produce them as they become unlocked and can be locked by the stream. + /// + /// This is identical to [Lockable::lock_all_entries], but but it works on + /// an `Arc` instead of a [Lockable] and returns a + /// [Lockable::OwnedGuard] that binds its lifetime to the [Lockable] in that + /// [Arc]. Such a [Lockable::OwnedGuard] can be more easily moved around or cloned. + /// + /// Example (LockableHashMap) + /// ----- + /// ``` + /// use futures::stream::StreamExt; + /// use lockable::{AsyncLimit, Lockable, LockableHashMap}; + /// use std::sync::Arc; + /// + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// let lockable_map = Arc::new(LockableHashMap::::new()); + /// + /// // Insert two entries + /// lockable_map + /// .async_lock(4, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 4")); + /// lockable_map + /// .async_lock(5, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 5")); + /// + /// // Lock all entries and add them to an `entries` vector + /// let mut entries: Vec<(i64, String)> = Vec::new(); + /// let mut stream = lockable_map.lock_all_entries_owned().await; + /// while let Some(guard) = stream.next().await { + /// entries.push((*guard.key(), guard.value().unwrap().clone())); + /// } + /// + /// // `entries` now contains both entries, but in an arbitrary order + /// assert_eq!(2, entries.len()); + /// assert!(entries.contains(&(4, String::from("Value 4")))); + /// assert!(entries.contains(&(5, String::from("Value 5")))); + /// # Ok::<(), lockable::Never>(())}).unwrap(); + /// ``` + /// + /// Example (LockableLruCache) + /// ----- + /// ``` + #[cfg_attr(not(feature = "lru"), doc = "```\n```ignore")] + /// use futures::stream::StreamExt; + /// use lockable::{AsyncLimit, Lockable, LockableLruCache}; + /// use std::sync::Arc; + /// + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// let lockable_map = Arc::new(LockableLruCache::::new()); + /// + /// // Insert two entries + /// lockable_map + /// .async_lock(4, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 4")); + /// lockable_map + /// .async_lock(5, AsyncLimit::no_limit()) + /// .await? + /// .insert(String::from("Value 5")); + /// + /// // Lock all entries and add them to an `entries` vector + /// let mut entries: Vec<(i64, String)> = Vec::new(); + /// let mut stream = lockable_map.lock_all_entries_owned().await; + /// while let Some(guard) = stream.next().await { + /// entries.push((*guard.key(), guard.value().unwrap().clone())); + /// } + /// + /// // `entries` now contains both entries, but in an arbitrary order + /// assert_eq!(2, entries.len()); + /// assert!(entries.contains(&(4, String::from("Value 4")))); + /// assert!(entries.contains(&(5, String::from("Value 5")))); + /// # Ok::<(), lockable::Never>(())}).unwrap(); + /// ``` + async fn lock_all_entries_owned( + self: &Arc, + ) -> impl Stream>::OwnedGuard>; } diff --git a/src/lockpool.rs b/src/lockpool.rs index 59aaf40..dd01275 100644 --- a/src/lockpool.rs +++ b/src/lockpool.rs @@ -1,3 +1,4 @@ +use futures::stream::Stream; use std::future::Future; use std::hash::Hash; use std::sync::Arc; @@ -198,6 +199,22 @@ where fn keys_with_entries_or_locked(&self) -> Vec { self.map.keys_with_entries_or_locked() } + + #[inline] + async fn lock_all_entries(&self) -> impl Stream>::Guard<'_>> { + panic!("lock_all_entries() doesn't make sense for LockPool"); + // Return expression needed to infer the return type + futures::stream::empty() + } + + #[inline] + async fn lock_all_entries_owned( + self: &Arc, + ) -> impl Stream>::OwnedGuard> { + panic!("lock_all_entries_owned() doesn't make sense for LockPool"); + // Return expression needed to infer the return type + futures::stream::empty() + } } impl LockPool