Skip to content

Commit

Permalink
Also add lock_all_entries to trait
Browse files Browse the repository at this point in the history
[wip] this part currently makes the compiler ICE
  • Loading branch information
smessmer committed Dec 3, 2022
1 parent 503eecf commit 10923f1
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 200 deletions.
112 changes: 12 additions & 100 deletions src/lockable_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,18 @@ where
fn keys_with_entries_or_locked(&self) -> Vec<K> {
self.map_impl.keys_with_entries_or_locked()
}

#[inline]
async fn lock_all_entries(&self) -> impl Stream<Item = <Self as Lockable<K, V>>::Guard<'_>> {
LockableMapImpl::lock_all_entries(&self.map_impl).await
}

#[inline]
async fn lock_all_entries_owned(
self: &Arc<Self>,
) -> impl Stream<Item = <Self as Lockable<K, V>>::OwnedGuard> {
LockableMapImpl::lock_all_entries(Arc::clone(self)).await
}
}

impl<K, V> LockableHashMap<K, V>
Expand All @@ -355,106 +367,6 @@ where
map_impl: LockableMapImpl::new(),
}
}

/// Lock all entries of the cache once. The result of this is a [Stream] that will
/// produce the corresponding lock guards. If items are locked, the [Stream] will
/// produce them as they become unlocked and can be locked by the stream.
///
/// The returned stream is `async` and therefore may return items much later than
/// when this function was called, but it only returns an entry if it existed
/// or was locked at the time this function was called, and still exists when
/// the stream is returning the entry.
/// For any entry currently locked by another thread or task while this function
/// is called, the following rules apply:
/// - If that thread/task creates the entry => the stream will return it
/// - If that thread/task removes the entry => the stream will not return it
/// - If the entry was not pre-existing and that thread/task does not create it => the stream will not return it.
///
/// Examples
/// -----
/// ```
/// use futures::stream::StreamExt;
/// use lockable::{AsyncLimit, Lockable, LockableHashMap};
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let lockable_map = LockableHashMap::<i64, String>::new();
///
/// // Insert two entries
/// lockable_map
/// .async_lock(4, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 4"));
/// lockable_map
/// .async_lock(5, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 5"));
///
/// // Lock all entries and add them to an `entries` vector
/// let mut entries: Vec<(i64, String)> = Vec::new();
/// let mut stream = lockable_map.lock_all_entries().await;
/// while let Some(guard) = stream.next().await {
/// entries.push((*guard.key(), guard.value().unwrap().clone()));
/// }
///
/// // `entries` now contains both entries, but in an arbitrary order
/// assert_eq!(2, entries.len());
/// assert!(entries.contains(&(4, String::from("Value 4"))));
/// assert!(entries.contains(&(5, String::from("Value 5"))));
/// # Ok::<(), lockable::Never>(())}).unwrap();
/// ```
pub async fn lock_all_entries(
&self,
) -> impl Stream<Item = <Self as Lockable<K, V>>::Guard<'_>> {
LockableMapImpl::lock_all_entries(&self.map_impl).await
}

/// Lock all entries of the cache once. The result of this is a [Stream] that will
/// produce the corresponding lock guards. If items are locked, the [Stream] will
/// produce them as they become unlocked and can be locked by the stream.
///
/// This is identical to [LockableHashMap::lock_all_entries], but but it works on
/// an `Arc<LockableHashMap>` instead of a [LockableHashMap] and returns a
/// [Lockable::OwnedGuard] that binds its lifetime to the [LockableHashMap] in that
/// [Arc]. Such a [Lockable::OwnedGuard] can be more easily moved around or cloned.
///
/// Examples
/// -----
/// ```
/// use futures::stream::StreamExt;
/// use lockable::{AsyncLimit, Lockable, LockableHashMap};
/// use std::sync::Arc;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let lockable_map = Arc::new(LockableHashMap::<i64, String>::new());
///
/// // Insert two entries
/// lockable_map
/// .async_lock(4, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 4"));
/// lockable_map
/// .async_lock(5, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 5"));
///
/// // Lock all entries and add them to an `entries` vector
/// let mut entries: Vec<(i64, String)> = Vec::new();
/// let mut stream = lockable_map.lock_all_entries_owned().await;
/// while let Some(guard) = stream.next().await {
/// entries.push((*guard.key(), guard.value().unwrap().clone()));
/// }
///
/// // `entries` now contains both entries, but in an arbitrary order
/// assert_eq!(2, entries.len());
/// assert!(entries.contains(&(4, String::from("Value 4"))));
/// assert!(entries.contains(&(5, String::from("Value 5"))));
/// # Ok::<(), lockable::Never>(())}).unwrap();
/// ```
pub async fn lock_all_entries_owned(
self: &Arc<Self>,
) -> impl Stream<Item = <Self as Lockable<K, V>>::OwnedGuard> {
LockableMapImpl::lock_all_entries(Arc::clone(self)).await
}
}

impl<K, V> Default for LockableHashMap<K, V>
Expand Down
112 changes: 12 additions & 100 deletions src/lockable_lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,18 @@ where
fn keys_with_entries_or_locked(&self) -> Vec<K> {
self.map_impl.keys_with_entries_or_locked()
}

#[inline]
async fn lock_all_entries(&self) -> impl Stream<Item = <Self as Lockable<K, V>>::Guard<'_>> {
LockableMapImpl::lock_all_entries(&self.map_impl).await
}

#[inline]
async fn lock_all_entries_owned(
self: &Arc<Self>,
) -> impl Stream<Item = <Self as Lockable<K, V>>::OwnedGuard> {
LockableMapImpl::lock_all_entries(Arc::clone(self)).await
}
}

impl<K, V, Time> LockableLruCache<K, V, Time>
Expand Down Expand Up @@ -429,106 +441,6 @@ where
}
}

/// Lock all entries of the cache once. The result of this is a [Stream] that will
/// produce the corresponding lock guards. If items are locked, the [Stream] will
/// produce them as they become unlocked and can be locked by the stream.
///
/// The returned stream is `async` and therefore may return items much later than
/// when this function was called, but it only returns an entry if it existed
/// or was locked at the time this function was called, and still exists when
/// the stream is returning the entry.
/// For any entry currently locked by another thread or task while this function
/// is called, the following rules apply:
/// - If that thread/task creates the entry => the stream will return it
/// - If that thread/task removes the entry => the stream will not return it
/// - If the entry was not pre-existing and that thread/task does not create it => the stream will not return it.
///
/// Examples
/// -----
/// ```
/// use futures::stream::StreamExt;
/// use lockable::{AsyncLimit, Lockable, LockableLruCache};
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let lockable_map = LockableLruCache::<i64, String>::new();
///
/// // Insert two entries
/// lockable_map
/// .async_lock(4, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 4"));
/// lockable_map
/// .async_lock(5, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 5"));
///
/// // Lock all entries and add them to an `entries` vector
/// let mut entries: Vec<(i64, String)> = Vec::new();
/// let mut stream = lockable_map.lock_all_entries().await;
/// while let Some(guard) = stream.next().await {
/// entries.push((*guard.key(), guard.value().unwrap().clone()));
/// }
///
/// // `entries` now contains both entries, but in an arbitrary order
/// assert_eq!(2, entries.len());
/// assert!(entries.contains(&(4, String::from("Value 4"))));
/// assert!(entries.contains(&(5, String::from("Value 5"))));
/// # Ok::<(), lockable::Never>(())}).unwrap();
/// ```
pub async fn lock_all_entries(
&self,
) -> impl Stream<Item = <Self as Lockable<K, V>>::Guard<'_>> {
LockableMapImpl::lock_all_entries(&self.map_impl).await
}

/// Lock all entries of the cache once. The result of this is a [Stream] that will
/// produce the corresponding lock guards. If items are locked, the [Stream] will
/// produce them as they become unlocked and can be locked by the stream.
///
/// This is identical to [LockableLruCache::lock_all_entries], but it works on
/// an `Arc<LockableLruCache>` instead of a [LockableLruCache] and returns a
/// [Lockable::OwnedGuard] that binds its lifetime to the [LockableLruCache] in that
/// [Arc]. Such a [Lockable::OwnedGuard] can be more easily moved around or cloned.
///
/// Examples
/// -----
/// ```
/// use futures::stream::StreamExt;
/// use lockable::{AsyncLimit, Lockable, LockableLruCache};
/// use std::sync::Arc;
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let lockable_map = Arc::new(LockableLruCache::<i64, String>::new());
///
/// // Insert two entries
/// lockable_map
/// .async_lock(4, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 4"));
/// lockable_map
/// .async_lock(5, AsyncLimit::no_limit())
/// .await?
/// .insert(String::from("Value 5"));
///
/// // Lock all entries and add them to an `entries` vector
/// let mut entries: Vec<(i64, String)> = Vec::new();
/// let mut stream = lockable_map.lock_all_entries_owned().await;
/// while let Some(guard) = stream.next().await {
/// entries.push((*guard.key(), guard.value().unwrap().clone()));
/// }
///
/// // `entries` now contains both entries, but in an arbitrary order
/// assert_eq!(2, entries.len());
/// assert!(entries.contains(&(4, String::from("Value 4"))));
/// assert!(entries.contains(&(5, String::from("Value 5"))));
/// # Ok::<(), lockable::Never>(())}).unwrap();
/// ```
pub async fn lock_all_entries_owned(
self: &Arc<Self>,
) -> impl Stream<Item = <Self as Lockable<K, V>>::OwnedGuard> {
LockableMapImpl::lock_all_entries(Arc::clone(self)).await
}

/// Lock all entries that are currently unlocked and that were unlocked for at least
/// the given `duration`. This follows the LRU nature of the cache.
///
Expand Down
Loading

0 comments on commit 10923f1

Please sign in to comment.